virtual_net/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2#![allow(clippy::multiple_bound_locations)]
3#[cfg(feature = "remote")]
4pub mod client;
5pub mod composite;
6#[cfg(feature = "host-net")]
7pub mod host;
8pub mod loopback;
9pub mod meta;
10pub mod ruleset;
11#[cfg(feature = "remote")]
12pub mod rx_tx;
13#[cfg(feature = "remote")]
14pub mod server;
15pub mod tcp_pair;
16#[cfg(feature = "tokio")]
17#[cfg(test)]
18mod tests;
19
20#[cfg(feature = "remote")]
21pub use client::{RemoteNetworkingClient, RemoteNetworkingClientDriver};
22pub use composite::CompositeTcpListener;
23pub use loopback::LoopbackNetworking;
24use pin_project_lite::pin_project;
25#[cfg(feature = "rkyv")]
26use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
27#[cfg(feature = "remote")]
28pub use server::{RemoteNetworkingServer, RemoteNetworkingServerDriver};
29use std::fmt;
30use std::mem::MaybeUninit;
31use std::net::IpAddr;
32use std::net::Ipv4Addr;
33use std::net::Ipv6Addr;
34use std::net::Shutdown;
35use std::net::SocketAddr;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::Context;
39use std::task::Poll;
40use std::time::Duration;
41use thiserror::Error;
42#[cfg(feature = "tokio")]
43use tokio::io::AsyncRead;
44#[cfg(feature = "tokio")]
45use tokio::io::AsyncWrite;
46
47pub use bytes::Bytes;
48pub use bytes::BytesMut;
49use serde::{Deserialize, Serialize};
50#[cfg(feature = "host-net")]
51pub use virtual_mio::{InterestGuard, InterestHandlerWaker, InterestType};
52pub use virtual_mio::{InterestHandler, handler_into_waker};
53
54pub type Result<T> = std::result::Result<T, NetworkError>;
55
56/// Represents an IP address and its netmask
57#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
58#[cfg_attr(feature = "rkyv", derive(RkyvSerialize, RkyvDeserialize, Archive))]
59pub struct IpCidr {
60    pub ip: IpAddr,
61    pub prefix: u8,
62}
63
64/// Represents a routing entry in the routing table of the interface
65#[derive(Clone, Debug, Serialize, Deserialize)]
66#[cfg_attr(feature = "rkyv", derive(RkyvSerialize, RkyvDeserialize, Archive))]
67pub struct IpRoute {
68    pub cidr: IpCidr,
69    pub via_router: IpAddr,
70    pub preferred_until: Option<Duration>,
71    pub expires_at: Option<Duration>,
72}
73
74/// Represents an IO source
75pub trait VirtualIoSource: fmt::Debug + Send + Sync + 'static {
76    /// Removes a previously registered waker using a token
77    fn remove_handler(&mut self);
78
79    /// Polls the source to see if there is data waiting
80    fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>>;
81
82    /// Polls the source to see if data can be sent
83    fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>>;
84}
85
86/// An implementation of virtual networking
87#[async_trait::async_trait]
88#[allow(unused_variables)]
89pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
90    /// Bridges this local network with a remote network, which is required in
91    /// order to make lower level networking calls (such as UDP/TCP)
92    async fn bridge(
93        &self,
94        network: &str,
95        access_token: &str,
96        security: StreamSecurity,
97    ) -> Result<()> {
98        Err(NetworkError::Unsupported)
99    }
100
101    /// Disconnects from the remote network essentially unbridging it
102    async fn unbridge(&self) -> Result<()> {
103        Err(NetworkError::Unsupported)
104    }
105
106    /// Acquires an IP address on the network and configures the routing tables
107    async fn dhcp_acquire(&self) -> Result<Vec<IpAddr>> {
108        Err(NetworkError::Unsupported)
109    }
110
111    /// Adds a static IP address to the interface with a netmask prefix
112    async fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<()> {
113        Err(NetworkError::Unsupported)
114    }
115
116    /// Removes a static (or dynamic) IP address from the interface
117    async fn ip_remove(&self, ip: IpAddr) -> Result<()> {
118        Err(NetworkError::Unsupported)
119    }
120
121    /// Clears all the assigned IP addresses for this interface
122    async fn ip_clear(&self) -> Result<()> {
123        Err(NetworkError::Unsupported)
124    }
125
126    /// Lists all the IP addresses currently assigned to this interface
127    async fn ip_list(&self) -> Result<Vec<IpCidr>> {
128        Err(NetworkError::Unsupported)
129    }
130
131    /// Returns the hardware MAC address for this interface
132    async fn mac(&self) -> Result<[u8; 6]> {
133        Err(NetworkError::Unsupported)
134    }
135
136    /// Adds a default gateway to the routing table
137    async fn gateway_set(&self, ip: IpAddr) -> Result<()> {
138        Err(NetworkError::Unsupported)
139    }
140
141    /// Adds a specific route to the routing table
142    async fn route_add(
143        &self,
144        cidr: IpCidr,
145        via_router: IpAddr,
146        preferred_until: Option<Duration>,
147        expires_at: Option<Duration>,
148    ) -> Result<()> {
149        Err(NetworkError::Unsupported)
150    }
151
152    /// Removes a routing rule from the routing table
153    async fn route_remove(&self, cidr: IpAddr) -> Result<()> {
154        Err(NetworkError::Unsupported)
155    }
156
157    /// Clears the routing table for this interface
158    async fn route_clear(&self) -> Result<()> {
159        Err(NetworkError::Unsupported)
160    }
161
162    /// Lists all the routes defined in the routing table for this interface
163    async fn route_list(&self) -> Result<Vec<IpRoute>> {
164        Err(NetworkError::Unsupported)
165    }
166
167    /// Creates a low level socket that can read and write Ethernet packets
168    /// directly to the interface
169    async fn bind_raw(&self) -> Result<Box<dyn VirtualRawSocket + Sync>> {
170        Err(NetworkError::Unsupported)
171    }
172
173    /// Lists for TCP connections on a specific IP and Port combination
174    /// Multiple servers (processes or threads) can bind to the same port if they each set
175    /// the reuse-port and-or reuse-addr flags
176    async fn listen_tcp(
177        &self,
178        addr: SocketAddr,
179        only_v6: bool,
180        reuse_port: bool,
181        reuse_addr: bool,
182    ) -> Result<Box<dyn VirtualTcpListener + Sync>> {
183        Err(NetworkError::Unsupported)
184    }
185
186    /// Opens a UDP socket that listens on a specific IP and Port combination
187    /// Multiple servers (processes or threads) can bind to the same port if they each set
188    /// the reuse-port and-or reuse-addr flags
189    async fn bind_udp(
190        &self,
191        addr: SocketAddr,
192        reuse_port: bool,
193        reuse_addr: bool,
194    ) -> Result<Box<dyn VirtualUdpSocket + Sync>> {
195        Err(NetworkError::Unsupported)
196    }
197
198    /// Creates a socket that can be used to send and receive ICMP packets
199    /// from a paritcular IP address
200    async fn bind_icmp(&self, addr: IpAddr) -> Result<Box<dyn VirtualIcmpSocket + Sync>> {
201        Err(NetworkError::Unsupported)
202    }
203
204    /// Opens a TCP connection to a particular destination IP address and port
205    async fn connect_tcp(
206        &self,
207        addr: SocketAddr,
208        peer: SocketAddr,
209    ) -> Result<Box<dyn VirtualTcpSocket + Sync>> {
210        Err(NetworkError::Unsupported)
211    }
212
213    /// Performs DNS resolution for a specific hostname
214    async fn resolve(
215        &self,
216        host: &str,
217        port: Option<u16>,
218        dns_server: Option<IpAddr>,
219    ) -> Result<Vec<IpAddr>> {
220        Err(NetworkError::Unsupported)
221    }
222}
223
224pub type DynVirtualNetworking = Arc<dyn VirtualNetworking>;
225
226pub trait VirtualTcpListener: VirtualIoSource + fmt::Debug + Send + Sync + 'static {
227    /// Tries to accept a new connection
228    fn try_accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
229
230    /// Registers a waker for when a new connection has arrived. This uses
231    /// a stack machine which means more than one waker can be registered
232    fn set_handler(&mut self, handler: Box<dyn InterestHandler + Send + Sync>) -> Result<()>;
233
234    /// Returns the local address of this TCP listener
235    fn addr_local(&self) -> Result<SocketAddr>;
236
237    /// Sets how many network hops the packets are permitted for new connections
238    fn set_ttl(&mut self, ttl: u8) -> Result<()>;
239
240    /// Returns the maximum number of network hops before packets are dropped
241    fn ttl(&self) -> Result<u8>;
242}
243
244#[async_trait::async_trait]
245pub trait VirtualTcpListenerExt: VirtualTcpListener {
246    /// Accepts a new connection from the TCP listener
247    async fn accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
248}
249
250#[async_trait::async_trait]
251impl<R: VirtualTcpListener + ?Sized> VirtualTcpListenerExt for R {
252    async fn accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)> {
253        struct Poller<'a, R>
254        where
255            R: VirtualTcpListener + ?Sized,
256        {
257            listener: &'a mut R,
258        }
259        impl<R> std::future::Future for Poller<'_, R>
260        where
261            R: VirtualTcpListener + ?Sized,
262        {
263            type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
264            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
266                if let Err(err) = self.listener.set_handler(handler) {
267                    return Poll::Ready(Err(err));
268                }
269                match self.listener.try_accept() {
270                    Ok(ret) => Poll::Ready(Ok(ret)),
271                    Err(NetworkError::WouldBlock) => Poll::Pending,
272                    Err(err) => Poll::Ready(Err(err)),
273                }
274            }
275        }
276        Poller { listener: self }.await
277    }
278}
279
280pub trait VirtualSocket: VirtualIoSource + fmt::Debug + Send + Sync + 'static {
281    /// Sets how many network hops the packets are permitted for new connections
282    fn set_ttl(&mut self, ttl: u32) -> Result<()>;
283
284    /// Returns the maximum number of network hops before packets are dropped
285    fn ttl(&self) -> Result<u32>;
286
287    /// Returns the local address for this socket
288    fn addr_local(&self) -> Result<SocketAddr>;
289
290    /// Returns the status/state of the socket
291    fn status(&self) -> Result<SocketStatus>;
292
293    /// Registers a waker for when this connection is ready to receive
294    /// more data. Uses a stack machine which means more than one waker
295    /// can be registered
296    fn set_handler(&mut self, handler: Box<dyn InterestHandler + Send + Sync>) -> Result<()>;
297}
298
299#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
300pub enum SocketStatus {
301    Opening,
302    Opened,
303    Closed,
304    Failed,
305}
306
307#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
308pub enum StreamSecurity {
309    Unencrypted,
310    AnyEncyption,
311    ClassicEncryption,
312    DoubleEncryption,
313}
314
315/// Connected sockets have a persistent connection to a remote peer
316pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
317    /// Determines how long the socket will remain in a TIME_WAIT
318    /// after it disconnects (only the one that initiates the close will
319    /// be in a TIME_WAIT state thus the clients should always do this rather
320    /// than the server)
321    fn set_linger(&mut self, linger: Option<Duration>) -> Result<()>;
322
323    /// Returns how long the socket will remain in a TIME_WAIT
324    /// after it disconnects
325    fn linger(&self) -> Result<Option<Duration>>;
326
327    /// Tries to send out a datagram or stream of bytes on this socket
328    fn try_send(&mut self, data: &[u8]) -> Result<usize>;
329
330    // Tries to flush any data in the local buffers
331    fn try_flush(&mut self) -> Result<()>;
332
333    /// Closes the socket
334    fn close(&mut self) -> Result<()>;
335
336    /// Tries to read a packet from the socket
337    fn try_recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
338}
339
340#[async_trait::async_trait]
341pub trait VirtualConnectedSocketExt: VirtualConnectedSocket {
342    async fn send(&mut self, data: &[u8]) -> Result<usize>;
343
344    async fn recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
345
346    async fn flush(&mut self) -> Result<()>;
347}
348
349#[async_trait::async_trait]
350impl<R: VirtualConnectedSocket + ?Sized> VirtualConnectedSocketExt for R {
351    async fn send(&mut self, data: &[u8]) -> Result<usize> {
352        pin_project! {
353            struct Poller<'a, 'b, R: ?Sized>
354            where
355                R: VirtualConnectedSocket,
356            {
357                socket: &'a mut R,
358                data: &'b [u8],
359            }
360        }
361        impl<R> std::future::Future for Poller<'_, '_, R>
362        where
363            R: VirtualConnectedSocket + ?Sized,
364        {
365            type Output = Result<usize>;
366            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367                let this = self.project();
368
369                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
370                if let Err(err) = this.socket.set_handler(handler) {
371                    return Poll::Ready(Err(err));
372                }
373                match this.socket.try_send(this.data) {
374                    Ok(ret) => Poll::Ready(Ok(ret)),
375                    Err(NetworkError::WouldBlock) => Poll::Pending,
376                    Err(err) => Poll::Ready(Err(err)),
377                }
378            }
379        }
380        Poller { socket: self, data }.await
381    }
382
383    async fn recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize> {
384        pin_project! {
385            struct Poller<'a, 'b, R: ?Sized>
386            where
387                R: VirtualConnectedSocket,
388            {
389                socket: &'a mut R,
390                buf: &'b mut [MaybeUninit<u8>],
391                peek: bool,
392            }
393        }
394        impl<R> std::future::Future for Poller<'_, '_, R>
395        where
396            R: VirtualConnectedSocket + ?Sized,
397        {
398            type Output = Result<usize>;
399            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
400                let this = self.project();
401
402                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
403                if let Err(err) = this.socket.set_handler(handler) {
404                    return Poll::Ready(Err(err));
405                }
406                match this.socket.try_recv(this.buf, *this.peek) {
407                    Ok(ret) => Poll::Ready(Ok(ret)),
408                    Err(NetworkError::WouldBlock) => Poll::Pending,
409                    Err(err) => Poll::Ready(Err(err)),
410                }
411            }
412        }
413        Poller {
414            socket: self,
415            buf,
416            peek,
417        }
418        .await
419    }
420
421    async fn flush(&mut self) -> Result<()> {
422        struct Poller<'a, R>
423        where
424            R: VirtualConnectedSocket + ?Sized,
425        {
426            socket: &'a mut R,
427        }
428        impl<R> std::future::Future for Poller<'_, R>
429        where
430            R: VirtualConnectedSocket + ?Sized,
431        {
432            type Output = Result<()>;
433            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
435                if let Err(err) = self.socket.set_handler(handler) {
436                    return Poll::Ready(Err(err));
437                }
438                match self.socket.try_flush() {
439                    Ok(ret) => Poll::Ready(Ok(ret)),
440                    Err(NetworkError::WouldBlock) => Poll::Pending,
441                    Err(err) => Poll::Ready(Err(err)),
442                }
443            }
444        }
445        Poller { socket: self }.await
446    }
447}
448
449/// Connectionless sockets are able to send and receive datagrams and stream
450/// bytes to multiple addresses at the same time (peer-to-peer)
451pub trait VirtualConnectionlessSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
452    /// Sends out a datagram or stream of bytes on this socket
453    /// to a specific address
454    fn try_send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
455
456    /// Recv a packet from the socket
457    fn try_recv_from(
458        &mut self,
459        buf: &mut [MaybeUninit<u8>],
460        peek: bool,
461    ) -> Result<(usize, SocketAddr)>;
462}
463
464#[async_trait::async_trait]
465pub trait VirtualConnectionlessSocketExt: VirtualConnectionlessSocket {
466    async fn send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
467
468    async fn recv_from(
469        &mut self,
470        buf: &mut [MaybeUninit<u8>],
471        peek: bool,
472    ) -> Result<(usize, SocketAddr)>;
473}
474
475#[async_trait::async_trait]
476impl<R: VirtualConnectionlessSocket + ?Sized> VirtualConnectionlessSocketExt for R {
477    async fn send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize> {
478        pin_project! {
479            struct Poller<'a, 'b, R: ?Sized>
480            where
481                R: VirtualConnectionlessSocket,
482            {
483                socket: &'a mut R,
484                data: &'b [u8],
485                addr: SocketAddr,
486            }
487        }
488        impl<R> std::future::Future for Poller<'_, '_, R>
489        where
490            R: VirtualConnectionlessSocket + ?Sized,
491        {
492            type Output = Result<usize>;
493            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
494                let this = self.project();
495
496                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
497                if let Err(err) = this.socket.set_handler(handler) {
498                    return Poll::Ready(Err(err));
499                }
500                match this.socket.try_send_to(this.data, *this.addr) {
501                    Ok(ret) => Poll::Ready(Ok(ret)),
502                    Err(NetworkError::WouldBlock) => Poll::Pending,
503                    Err(err) => Poll::Ready(Err(err)),
504                }
505            }
506        }
507        Poller {
508            socket: self,
509            data,
510            addr,
511        }
512        .await
513    }
514
515    async fn recv_from(
516        &mut self,
517        buf: &mut [MaybeUninit<u8>],
518        peek: bool,
519    ) -> Result<(usize, SocketAddr)> {
520        pin_project! {
521            struct Poller<'a, 'b, R: ?Sized>
522            where
523                R: VirtualConnectionlessSocket,
524            {
525                socket: &'a mut R,
526                buf: &'b mut [MaybeUninit<u8>],
527                peek: bool,
528            }
529        }
530        impl<R> std::future::Future for Poller<'_, '_, R>
531        where
532            R: VirtualConnectionlessSocket + ?Sized,
533        {
534            type Output = Result<(usize, SocketAddr)>;
535            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
536                let this = self.project();
537
538                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
539                if let Err(err) = this.socket.set_handler(handler) {
540                    return Poll::Ready(Err(err));
541                }
542                match this.socket.try_recv_from(this.buf, *this.peek) {
543                    Ok(ret) => Poll::Ready(Ok(ret)),
544                    Err(NetworkError::WouldBlock) => Poll::Pending,
545                    Err(err) => Poll::Ready(Err(err)),
546                }
547            }
548        }
549        Poller {
550            socket: self,
551            buf,
552            peek,
553        }
554        .await
555    }
556}
557
558/// ICMP sockets are low level devices bound to a specific address
559/// that can send and receive ICMP packets
560pub trait VirtualIcmpSocket:
561    VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
562{
563}
564
565#[async_trait::async_trait]
566pub trait VirtualRawSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
567    /// Sends out a datagram or stream of bytes on this socket
568    fn try_send(&mut self, data: &[u8]) -> Result<usize>;
569
570    /// Attempts to flush the object, ensuring that any buffered data reach
571    /// their destination.
572    fn try_flush(&mut self) -> Result<()>;
573
574    /// Recv a packet from the socket
575    fn try_recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
576
577    /// Tells the raw socket and its backing switch that all packets
578    /// should be received by this socket even if they are not
579    /// destined for this device
580    fn set_promiscuous(&mut self, promiscuous: bool) -> Result<()>;
581
582    /// Returns if the socket is running in promiscuous mode whereby it
583    /// will receive all packets even if they are not destined for the
584    /// local interface
585    fn promiscuous(&self) -> Result<bool>;
586}
587
588pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync + 'static {
589    /// Sets the receive buffer size which acts as a trottle for how
590    /// much data is buffered on this side of the pipe
591    fn set_recv_buf_size(&mut self, size: usize) -> Result<()>;
592
593    /// Size of the receive buffer that holds all data that has not
594    /// yet been read
595    fn recv_buf_size(&self) -> Result<usize>;
596
597    /// Sets the size of the send buffer which will hold the bytes of
598    /// data while they are being sent over to the peer
599    fn set_send_buf_size(&mut self, size: usize) -> Result<()>;
600
601    /// Size of the send buffer that holds all data that is currently
602    /// being transmitted.
603    fn send_buf_size(&self) -> Result<usize>;
604
605    /// When NO_DELAY is set the data that needs to be transmitted to
606    /// the peer is sent immediately rather than waiting for a bigger
607    /// batch of data, this reduces latency but increases encapsulation
608    /// overhead.
609    fn set_nodelay(&mut self, reuse: bool) -> Result<()>;
610
611    /// Indicates if the NO_DELAY flag is set which means that data
612    /// is immediately sent to the peer without waiting. This reduces
613    /// latency but increases encapsulation overhead.
614    fn nodelay(&self) -> Result<bool>;
615
616    /// When KEEP_ALIVE is set the connection will periodically send
617    /// an empty data packet to the server to make sure the connection
618    /// stays alive.
619    fn set_keepalive(&mut self, keepalive: bool) -> Result<()>;
620
621    /// Indicates if the KEEP_ALIVE flag is set which means that the
622    /// socket will periodically send an empty data packet to keep
623    /// the connection alive.
624    fn keepalive(&self) -> Result<bool>;
625
626    /// When DONT_ROUTE is set the packet will be sent directly
627    /// to the interface without passing through the routing logic.
628    fn set_dontroute(&mut self, keepalive: bool) -> Result<()>;
629
630    /// Indicates if the packet will pass straight through to
631    /// the interface bypassing the routing logic.
632    fn dontroute(&self) -> Result<bool>;
633
634    /// Returns the address (IP and Port) of the peer socket that this
635    /// is conencted to
636    fn addr_peer(&self) -> Result<SocketAddr>;
637
638    /// Shuts down either the READER or WRITER sides of the socket
639    /// connection.
640    fn shutdown(&mut self, how: Shutdown) -> Result<()>;
641
642    /// Return true if the socket is closed
643    fn is_closed(&self) -> bool;
644}
645
646#[cfg(feature = "tokio")]
647impl<'a> AsyncRead for Box<dyn VirtualTcpSocket + Sync + 'a> {
648    fn poll_read(
649        self: Pin<&mut Self>,
650        cx: &mut Context<'_>,
651        buf: &mut tokio::io::ReadBuf<'_>,
652    ) -> Poll<std::io::Result<()>> {
653        let this = self.get_mut();
654        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
655        if let Err(err) = this.set_handler(handler) {
656            return Poll::Ready(Err(net_error_into_io_err(err)));
657        }
658        let buf_unsafe = unsafe { buf.unfilled_mut() };
659        match this.try_recv(buf_unsafe) {
660            Ok(ret) => {
661                unsafe { buf.assume_init(ret) };
662                buf.set_filled(ret);
663                Poll::Ready(Ok(()))
664            }
665            Err(NetworkError::WouldBlock) => Poll::Pending,
666            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
667        }
668    }
669}
670
671#[cfg(feature = "tokio")]
672impl<'a> AsyncWrite for Box<dyn VirtualTcpSocket + Sync + 'a> {
673    fn poll_write(
674        self: Pin<&mut Self>,
675        cx: &mut Context<'_>,
676        buf: &[u8],
677    ) -> Poll<std::io::Result<usize>> {
678        let this = self.get_mut();
679        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
680        if let Err(err) = this.set_handler(handler) {
681            return Poll::Ready(Err(net_error_into_io_err(err)));
682        }
683        match this.try_send(buf) {
684            Ok(ret) => Poll::Ready(Ok(ret)),
685            Err(NetworkError::WouldBlock) => Poll::Pending,
686            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
687        }
688    }
689
690    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
691        let this = self.get_mut();
692        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
693        if let Err(err) = this.set_handler(handler) {
694            return Poll::Ready(Err(net_error_into_io_err(err)));
695        }
696        match this.try_flush() {
697            Ok(()) => Poll::Ready(Ok(())),
698            Err(NetworkError::WouldBlock) => Poll::Pending,
699            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
700        }
701    }
702
703    fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
704        Poll::Ready(
705            self.shutdown(Shutdown::Write)
706                .map_err(net_error_into_io_err),
707        )
708    }
709}
710
711pub trait VirtualUdpSocket:
712    VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
713{
714    /// Sets a flag that means that the UDP socket is able
715    /// to receive and process broadcast packets.
716    fn set_broadcast(&mut self, broadcast: bool) -> Result<()>;
717
718    /// Indicates if the SO_BROADCAST flag is set which means
719    /// that the UDP socket will receive and process broadcast
720    /// packets
721    fn broadcast(&self) -> Result<bool>;
722
723    /// Sets a flag that indicates if multicast packets that
724    /// this socket is a member of will be looped back to
725    /// the sending socket. This applies to IPv4 addresses
726    fn set_multicast_loop_v4(&mut self, val: bool) -> Result<()>;
727
728    /// Gets a flag that indicates if multicast packets that
729    /// this socket is a member of will be looped back to
730    /// the sending socket. This applies to IPv4 addresses
731    fn multicast_loop_v4(&self) -> Result<bool>;
732
733    /// Sets a flag that indicates if multicast packets that
734    /// this socket is a member of will be looped back to
735    /// the sending socket. This applies to IPv6 addresses
736    fn set_multicast_loop_v6(&mut self, val: bool) -> Result<()>;
737
738    /// Gets a flag that indicates if multicast packets that
739    /// this socket is a member of will be looped back to
740    /// the sending socket. This applies to IPv6 addresses
741    fn multicast_loop_v6(&self) -> Result<bool>;
742
743    /// Sets the TTL for IPv4 multicast packets which is the
744    /// number of network hops before the packet is dropped
745    fn set_multicast_ttl_v4(&mut self, ttl: u32) -> Result<()>;
746
747    /// Gets the TTL for IPv4 multicast packets which is the
748    /// number of network hops before the packet is dropped
749    fn multicast_ttl_v4(&self) -> Result<u32>;
750
751    /// Tells this interface that it will subscribe to a
752    /// particular multicast address. This applies to IPv4 addresses
753    fn join_multicast_v4(&mut self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<()>;
754
755    /// Tells this interface that it will unsubscribe to a
756    /// particular multicast address. This applies to IPv4 addresses
757    fn leave_multicast_v4(&mut self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<()>;
758
759    /// Tells this interface that it will subscribe to a
760    /// particular multicast address. This applies to IPv6 addresses
761    fn join_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<()>;
762
763    /// Tells this interface that it will unsubscribe to a
764    /// particular multicast address. This applies to IPv6 addresses
765    fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<()>;
766
767    /// Returns the remote address of this UDP socket if it has been
768    /// connected to a specific target destination address
769    fn addr_peer(&self) -> Result<Option<SocketAddr>>;
770}
771
772#[derive(Debug, Default)]
773pub struct UnsupportedVirtualNetworking {}
774
775#[async_trait::async_trait]
776impl VirtualNetworking for UnsupportedVirtualNetworking {}
777
778#[derive(Error, Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
779pub enum NetworkError {
780    /// The handle given was not usable
781    #[error("invalid fd")]
782    InvalidFd,
783    /// File exists
784    #[error("file exists")]
785    AlreadyExists,
786    /// The filesystem has failed to lock a resource.
787    #[error("lock error")]
788    Lock,
789    /// Something failed when doing IO. These errors can generally not be handled.
790    /// It may work if tried again.
791    #[error("io error")]
792    IOError,
793    /// The address was in use
794    #[error("address is in use")]
795    AddressInUse,
796    /// The address could not be found
797    #[error("address could not be found")]
798    AddressNotAvailable,
799    /// A pipe was closed
800    #[error("broken pipe (was closed)")]
801    BrokenPipe,
802    /// Insufficient memory
803    #[error("Insufficient memory")]
804    InsufficientMemory,
805    /// The connection was aborted
806    #[error("connection aborted")]
807    ConnectionAborted,
808    /// The connection request was refused
809    #[error("connection refused")]
810    ConnectionRefused,
811    /// The connection was reset
812    #[error("connection reset")]
813    ConnectionReset,
814    /// The operation was interrupted before it could finish
815    #[error("operation interrupted")]
816    Interrupted,
817    /// Invalid internal data, if the argument data is invalid, use `InvalidInput`
818    #[error("invalid internal data")]
819    InvalidData,
820    /// The provided data is invalid
821    #[error("invalid input")]
822    InvalidInput,
823    /// Could not perform the operation because there was not an open connection
824    #[error("connection is not open")]
825    NotConnected,
826    /// The requested device couldn't be accessed
827    #[error("can't access device")]
828    NoDevice,
829    /// Caller was not allowed to perform this operation
830    #[error("permission denied")]
831    PermissionDenied,
832    /// The operation did not complete within the given amount of time
833    #[error("time out")]
834    TimedOut,
835    /// Found EOF when EOF was not expected
836    #[error("unexpected eof")]
837    UnexpectedEof,
838    /// Operation would block, this error lets the caller know that they can try again
839    #[error("blocking operation. try again")]
840    WouldBlock,
841    /// A call to write returned 0
842    #[error("write returned 0")]
843    WriteZero,
844    /// Too many open files
845    #[error("too many open files")]
846    TooManyOpenFiles,
847    /// The operation is not supported.
848    #[error("unsupported")]
849    Unsupported,
850    /// Some other unhandled error. If you see this, it's probably a bug.
851    #[error("unknown error found")]
852    UnknownError,
853}
854
855pub fn io_err_into_net_error(net_error: std::io::Error) -> NetworkError {
856    use std::io::ErrorKind;
857    match net_error.kind() {
858        ErrorKind::BrokenPipe => NetworkError::BrokenPipe,
859        ErrorKind::AlreadyExists => NetworkError::AlreadyExists,
860        ErrorKind::AddrInUse => NetworkError::AddressInUse,
861        ErrorKind::AddrNotAvailable => NetworkError::AddressNotAvailable,
862        ErrorKind::ConnectionAborted => NetworkError::ConnectionAborted,
863        ErrorKind::ConnectionRefused => NetworkError::ConnectionRefused,
864        ErrorKind::ConnectionReset => NetworkError::ConnectionReset,
865        ErrorKind::Interrupted => NetworkError::Interrupted,
866        ErrorKind::InvalidData => NetworkError::InvalidData,
867        ErrorKind::InvalidInput => NetworkError::InvalidInput,
868        ErrorKind::NotConnected => NetworkError::NotConnected,
869        ErrorKind::PermissionDenied => NetworkError::PermissionDenied,
870        ErrorKind::TimedOut => NetworkError::TimedOut,
871        ErrorKind::UnexpectedEof => NetworkError::UnexpectedEof,
872        ErrorKind::WouldBlock => NetworkError::WouldBlock,
873        ErrorKind::WriteZero => NetworkError::WriteZero,
874        ErrorKind::Unsupported => NetworkError::Unsupported,
875
876        #[cfg(all(target_family = "unix", feature = "libc"))]
877        _ => {
878            if let Some(code) = net_error.raw_os_error() {
879                match code {
880                    libc::EPERM => NetworkError::PermissionDenied,
881                    libc::EBADF => NetworkError::InvalidFd,
882                    libc::ECHILD => NetworkError::InvalidFd,
883                    libc::EMFILE => NetworkError::TooManyOpenFiles,
884                    libc::EINTR => NetworkError::Interrupted,
885                    libc::EIO => NetworkError::IOError,
886                    libc::ENXIO => NetworkError::IOError,
887                    libc::EAGAIN => NetworkError::WouldBlock,
888                    libc::ENOMEM => NetworkError::InsufficientMemory,
889                    libc::EACCES => NetworkError::PermissionDenied,
890                    libc::ENODEV => NetworkError::NoDevice,
891                    libc::EINVAL => NetworkError::InvalidInput,
892                    libc::EPIPE => NetworkError::BrokenPipe,
893                    err => {
894                        tracing::trace!("unknown os error {}", err);
895                        NetworkError::UnknownError
896                    }
897                }
898            } else {
899                NetworkError::UnknownError
900            }
901        }
902        #[cfg(not(all(target_family = "unix", feature = "libc")))]
903        _ => NetworkError::UnknownError,
904    }
905}
906
907pub fn net_error_into_io_err(net_error: NetworkError) -> std::io::Error {
908    use std::io::ErrorKind;
909    match net_error {
910        NetworkError::InvalidFd => ErrorKind::BrokenPipe.into(),
911        NetworkError::AlreadyExists => ErrorKind::AlreadyExists.into(),
912        NetworkError::Lock => ErrorKind::BrokenPipe.into(),
913        NetworkError::IOError => ErrorKind::BrokenPipe.into(),
914        NetworkError::AddressInUse => ErrorKind::AddrInUse.into(),
915        NetworkError::AddressNotAvailable => ErrorKind::AddrNotAvailable.into(),
916        NetworkError::BrokenPipe => ErrorKind::BrokenPipe.into(),
917        NetworkError::ConnectionAborted => ErrorKind::ConnectionAborted.into(),
918        NetworkError::ConnectionRefused => ErrorKind::ConnectionRefused.into(),
919        NetworkError::ConnectionReset => ErrorKind::ConnectionReset.into(),
920        NetworkError::Interrupted => ErrorKind::Interrupted.into(),
921        NetworkError::InvalidData => ErrorKind::InvalidData.into(),
922        NetworkError::InvalidInput => ErrorKind::InvalidInput.into(),
923        NetworkError::NotConnected => ErrorKind::NotConnected.into(),
924        NetworkError::NoDevice => ErrorKind::BrokenPipe.into(),
925        NetworkError::PermissionDenied => ErrorKind::PermissionDenied.into(),
926        NetworkError::TimedOut => ErrorKind::TimedOut.into(),
927        NetworkError::UnexpectedEof => ErrorKind::UnexpectedEof.into(),
928        NetworkError::WouldBlock => ErrorKind::WouldBlock.into(),
929        NetworkError::WriteZero => ErrorKind::WriteZero.into(),
930        NetworkError::Unsupported => ErrorKind::Unsupported.into(),
931        NetworkError::UnknownError => ErrorKind::BrokenPipe.into(),
932        NetworkError::InsufficientMemory => ErrorKind::OutOfMemory.into(),
933        NetworkError::TooManyOpenFiles => {
934            #[cfg(all(target_family = "unix", feature = "libc"))]
935            {
936                std::io::Error::from_raw_os_error(libc::EMFILE)
937            }
938            #[cfg(not(all(target_family = "unix", feature = "libc")))]
939            {
940                ErrorKind::Other.into()
941            }
942        }
943    }
944}