wasmer_wasix/runners/dproxy/
networking.rs

1use std::sync::LazyLock;
2use std::{
3    net::{IpAddr, SocketAddr},
4    sync::{Arc, Mutex},
5    task::{Context, Poll, Waker},
6    time::Duration,
7};
8
9use virtual_net::{
10    IpCidr, IpRoute, NetworkError, StreamSecurity, VirtualIcmpSocket, VirtualNetworking,
11    VirtualRawSocket, VirtualTcpListener, VirtualTcpSocket, VirtualUdpSocket,
12    host::LocalNetworking, loopback::LoopbackNetworking,
13};
14
15#[derive(Debug, Default)]
16struct LocalWithLoopbackNetworkingListening {
17    addresses: Vec<SocketAddr>,
18    wakers: Vec<Waker>,
19}
20
21#[derive(Debug, Clone)]
22pub struct LocalWithLoopbackNetworking {
23    inner_networking: Arc<dyn VirtualNetworking + Send + Sync + 'static>,
24    local_listening: Arc<Mutex<LocalWithLoopbackNetworkingListening>>,
25    loopback_networking: LoopbackNetworking,
26}
27
28impl LocalWithLoopbackNetworking {
29    pub fn new() -> Self {
30        static LOCAL_NETWORKING: LazyLock<Arc<LocalNetworking>> = LazyLock::new(Arc::default);
31        Self {
32            local_listening: Default::default(),
33            inner_networking: LOCAL_NETWORKING.clone(),
34            loopback_networking: LoopbackNetworking::new(),
35        }
36    }
37
38    pub fn poll_listening(&self, cx: &mut Context<'_>) -> Poll<SocketAddr> {
39        let mut listening = self.local_listening.lock().unwrap();
40
41        if let Some(addr) = listening.addresses.first() {
42            return Poll::Ready(*addr);
43        }
44
45        if !listening.wakers.iter().any(|w| w.will_wake(cx.waker())) {
46            listening.wakers.push(cx.waker().clone());
47        }
48
49        Poll::Pending
50    }
51
52    pub fn register_listener(&self, addr: SocketAddr) {
53        let mut listening = self.local_listening.lock().unwrap();
54        listening.addresses.push(addr);
55        listening.addresses.sort_by_key(|a| a.port());
56        listening.wakers.drain(..).for_each(|w| w.wake());
57    }
58
59    pub fn loopback_networking(&self) -> LoopbackNetworking {
60        self.loopback_networking.clone()
61    }
62}
63
64#[allow(unused_variables)]
65#[async_trait::async_trait]
66impl VirtualNetworking for LocalWithLoopbackNetworking {
67    /// Bridges this local network with a remote network, which is required in
68    /// order to make lower level networking calls (such as UDP/TCP)
69    async fn bridge(
70        &self,
71        network: &str,
72        access_token: &str,
73        security: StreamSecurity,
74    ) -> Result<(), NetworkError> {
75        self.inner_networking
76            .bridge(network, access_token, security)
77            .await
78    }
79
80    /// Disconnects from the remote network essentially unbridging it
81    async fn unbridge(&self) -> Result<(), NetworkError> {
82        self.inner_networking.unbridge().await
83    }
84
85    /// Acquires an IP address on the network and configures the routing tables
86    async fn dhcp_acquire(&self) -> Result<Vec<IpAddr>, NetworkError> {
87        self.inner_networking.dhcp_acquire().await
88    }
89
90    /// Adds a static IP address to the interface with a netmask prefix
91    async fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<(), NetworkError> {
92        self.inner_networking.ip_add(ip, prefix).await
93    }
94
95    /// Removes a static (or dynamic) IP address from the interface
96    async fn ip_remove(&self, ip: IpAddr) -> Result<(), NetworkError> {
97        self.inner_networking.ip_remove(ip).await
98    }
99
100    /// Clears all the assigned IP addresses for this interface
101    async fn ip_clear(&self) -> Result<(), NetworkError> {
102        self.inner_networking.ip_clear().await
103    }
104
105    /// Lists all the IP addresses currently assigned to this interface
106    async fn ip_list(&self) -> Result<Vec<IpCidr>, NetworkError> {
107        self.inner_networking.ip_list().await
108    }
109
110    /// Returns the hardware MAC address for this interface
111    async fn mac(&self) -> Result<[u8; 6], NetworkError> {
112        self.inner_networking.mac().await
113    }
114
115    /// Adds a default gateway to the routing table
116    async fn gateway_set(&self, ip: IpAddr) -> Result<(), NetworkError> {
117        self.inner_networking.gateway_set(ip).await
118    }
119
120    /// Adds a specific route to the routing table
121    async fn route_add(
122        &self,
123        cidr: IpCidr,
124        via_router: IpAddr,
125        preferred_until: Option<Duration>,
126        expires_at: Option<Duration>,
127    ) -> Result<(), NetworkError> {
128        self.inner_networking
129            .route_add(cidr, via_router, preferred_until, expires_at)
130            .await
131    }
132
133    /// Removes a routing rule from the routing table
134    async fn route_remove(&self, cidr: IpAddr) -> Result<(), NetworkError> {
135        self.inner_networking.route_remove(cidr).await
136    }
137
138    /// Clears the routing table for this interface
139    async fn route_clear(&self) -> Result<(), NetworkError> {
140        self.inner_networking.route_clear().await
141    }
142
143    /// Lists all the routes defined in the routing table for this interface
144    async fn route_list(&self) -> Result<Vec<IpRoute>, NetworkError> {
145        self.inner_networking.route_list().await
146    }
147
148    /// Creates a low level socket that can read and write Ethernet packets
149    /// directly to the interface
150    async fn bind_raw(&self) -> Result<Box<dyn VirtualRawSocket + Sync>, NetworkError> {
151        self.inner_networking.bind_raw().await
152    }
153
154    /// Listens for TCP connections on a specific IP and Port combination
155    /// Multiple servers (processes or threads) can bind to the same port if they each set
156    /// the reuse-port and-or reuse-addr flags
157    async fn listen_tcp(
158        &self,
159        addr: SocketAddr,
160        only_v6: bool,
161        reuse_port: bool,
162        reuse_addr: bool,
163    ) -> Result<Box<dyn VirtualTcpListener + Sync>, NetworkError> {
164        let backlog = 1024;
165
166        tracing::debug!("registering listener on loopback networking");
167
168        let ret: Result<Box<dyn VirtualTcpListener + Sync>, NetworkError> = self
169            .loopback_networking
170            .listen_tcp(addr, only_v6, reuse_port, reuse_addr)
171            .await;
172
173        if ret.is_ok() {
174            tracing::debug!("registering listener on loopback networking");
175            self.register_listener(addr);
176        }
177
178        ret
179    }
180
181    /// Opens a UDP socket that listens on a specific IP and Port combination
182    /// Multiple servers (processes or threads) can bind to the same port if they each set
183    /// the reuse-port and-or reuse-addr flags
184    async fn bind_udp(
185        &self,
186        addr: SocketAddr,
187        reuse_port: bool,
188        reuse_addr: bool,
189    ) -> Result<Box<dyn VirtualUdpSocket + Sync>, NetworkError> {
190        self.inner_networking
191            .bind_udp(addr, reuse_port, reuse_addr)
192            .await
193    }
194
195    /// Creates a socket that can be used to send and receive ICMP packets
196    /// from a paritcular IP address
197    async fn bind_icmp(
198        &self,
199        addr: IpAddr,
200    ) -> Result<Box<dyn VirtualIcmpSocket + Sync>, NetworkError> {
201        self.inner_networking.bind_icmp(addr).await
202    }
203
204    /// Opens a TCP connection to a particular destination IP address and port
205    async fn connect_tcp(
206        &self,
207        addr: SocketAddr,
208        peer: SocketAddr,
209    ) -> Result<Box<dyn VirtualTcpSocket + Sync>, NetworkError> {
210        self.inner_networking.connect_tcp(addr, peer).await
211    }
212
213    /// Performs DNS resolution for a specific hostname
214    async fn resolve(
215        &self,
216        host: &str,
217        port: Option<u16>,
218        dns_server: Option<IpAddr>,
219    ) -> Result<Vec<IpAddr>, NetworkError> {
220        self.inner_networking.resolve(host, port, dns_server).await
221    }
222}