wasmer_wasix/runners/dproxy/
networking.rs1use std::sync::LazyLock;
2use std::{
3 net::{IpAddr, SocketAddr},
4 sync::{Arc, Mutex},
5 task::{Context, Poll, Waker},
6 time::Duration,
7};
8
9use virtual_net::{
10 IpCidr, IpRoute, NetworkError, StreamSecurity, VirtualIcmpSocket, VirtualNetworking,
11 VirtualRawSocket, VirtualTcpListener, VirtualTcpSocket, VirtualUdpSocket,
12 host::LocalNetworking, loopback::LoopbackNetworking,
13};
14
15#[derive(Debug, Default)]
16struct LocalWithLoopbackNetworkingListening {
17 addresses: Vec<SocketAddr>,
18 wakers: Vec<Waker>,
19}
20
21#[derive(Debug, Clone)]
22pub struct LocalWithLoopbackNetworking {
23 inner_networking: Arc<dyn VirtualNetworking + Send + Sync + 'static>,
24 local_listening: Arc<Mutex<LocalWithLoopbackNetworkingListening>>,
25 loopback_networking: LoopbackNetworking,
26}
27
28impl LocalWithLoopbackNetworking {
29 pub fn new() -> Self {
30 static LOCAL_NETWORKING: LazyLock<Arc<LocalNetworking>> = LazyLock::new(Arc::default);
31 Self {
32 local_listening: Default::default(),
33 inner_networking: LOCAL_NETWORKING.clone(),
34 loopback_networking: LoopbackNetworking::new(),
35 }
36 }
37
38 pub fn poll_listening(&self, cx: &mut Context<'_>) -> Poll<SocketAddr> {
39 let mut listening = self.local_listening.lock().unwrap();
40
41 if let Some(addr) = listening.addresses.first() {
42 return Poll::Ready(*addr);
43 }
44
45 if !listening.wakers.iter().any(|w| w.will_wake(cx.waker())) {
46 listening.wakers.push(cx.waker().clone());
47 }
48
49 Poll::Pending
50 }
51
52 pub fn register_listener(&self, addr: SocketAddr) {
53 let mut listening = self.local_listening.lock().unwrap();
54 listening.addresses.push(addr);
55 listening.addresses.sort_by_key(|a| a.port());
56 listening.wakers.drain(..).for_each(|w| w.wake());
57 }
58
59 pub fn loopback_networking(&self) -> LoopbackNetworking {
60 self.loopback_networking.clone()
61 }
62}
63
64#[allow(unused_variables)]
65#[async_trait::async_trait]
66impl VirtualNetworking for LocalWithLoopbackNetworking {
67 async fn bridge(
70 &self,
71 network: &str,
72 access_token: &str,
73 security: StreamSecurity,
74 ) -> Result<(), NetworkError> {
75 self.inner_networking
76 .bridge(network, access_token, security)
77 .await
78 }
79
80 async fn unbridge(&self) -> Result<(), NetworkError> {
82 self.inner_networking.unbridge().await
83 }
84
85 async fn dhcp_acquire(&self) -> Result<Vec<IpAddr>, NetworkError> {
87 self.inner_networking.dhcp_acquire().await
88 }
89
90 async fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<(), NetworkError> {
92 self.inner_networking.ip_add(ip, prefix).await
93 }
94
95 async fn ip_remove(&self, ip: IpAddr) -> Result<(), NetworkError> {
97 self.inner_networking.ip_remove(ip).await
98 }
99
100 async fn ip_clear(&self) -> Result<(), NetworkError> {
102 self.inner_networking.ip_clear().await
103 }
104
105 async fn ip_list(&self) -> Result<Vec<IpCidr>, NetworkError> {
107 self.inner_networking.ip_list().await
108 }
109
110 async fn mac(&self) -> Result<[u8; 6], NetworkError> {
112 self.inner_networking.mac().await
113 }
114
115 async fn gateway_set(&self, ip: IpAddr) -> Result<(), NetworkError> {
117 self.inner_networking.gateway_set(ip).await
118 }
119
120 async fn route_add(
122 &self,
123 cidr: IpCidr,
124 via_router: IpAddr,
125 preferred_until: Option<Duration>,
126 expires_at: Option<Duration>,
127 ) -> Result<(), NetworkError> {
128 self.inner_networking
129 .route_add(cidr, via_router, preferred_until, expires_at)
130 .await
131 }
132
133 async fn route_remove(&self, cidr: IpAddr) -> Result<(), NetworkError> {
135 self.inner_networking.route_remove(cidr).await
136 }
137
138 async fn route_clear(&self) -> Result<(), NetworkError> {
140 self.inner_networking.route_clear().await
141 }
142
143 async fn route_list(&self) -> Result<Vec<IpRoute>, NetworkError> {
145 self.inner_networking.route_list().await
146 }
147
148 async fn bind_raw(&self) -> Result<Box<dyn VirtualRawSocket + Sync>, NetworkError> {
151 self.inner_networking.bind_raw().await
152 }
153
154 async fn listen_tcp(
158 &self,
159 addr: SocketAddr,
160 only_v6: bool,
161 reuse_port: bool,
162 reuse_addr: bool,
163 ) -> Result<Box<dyn VirtualTcpListener + Sync>, NetworkError> {
164 let backlog = 1024;
165
166 tracing::debug!("registering listener on loopback networking");
167
168 let ret: Result<Box<dyn VirtualTcpListener + Sync>, NetworkError> = self
169 .loopback_networking
170 .listen_tcp(addr, only_v6, reuse_port, reuse_addr)
171 .await;
172
173 if ret.is_ok() {
174 tracing::debug!("registering listener on loopback networking");
175 self.register_listener(addr);
176 }
177
178 ret
179 }
180
181 async fn bind_udp(
185 &self,
186 addr: SocketAddr,
187 reuse_port: bool,
188 reuse_addr: bool,
189 ) -> Result<Box<dyn VirtualUdpSocket + Sync>, NetworkError> {
190 self.inner_networking
191 .bind_udp(addr, reuse_port, reuse_addr)
192 .await
193 }
194
195 async fn bind_icmp(
198 &self,
199 addr: IpAddr,
200 ) -> Result<Box<dyn VirtualIcmpSocket + Sync>, NetworkError> {
201 self.inner_networking.bind_icmp(addr).await
202 }
203
204 async fn connect_tcp(
206 &self,
207 addr: SocketAddr,
208 peer: SocketAddr,
209 ) -> Result<Box<dyn VirtualTcpSocket + Sync>, NetworkError> {
210 self.inner_networking.connect_tcp(addr, peer).await
211 }
212
213 async fn resolve(
215 &self,
216 host: &str,
217 port: Option<u16>,
218 dns_server: Option<IpAddr>,
219 ) -> Result<Vec<IpAddr>, NetworkError> {
220 self.inner_networking.resolve(host, port, dns_server).await
221 }
222}