virtual_net/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![allow(clippy::multiple_bound_locations)]
3#[cfg(feature = "remote")]
4pub mod client;
5pub mod composite;
6#[cfg(feature = "host-net")]
7pub mod host;
8pub mod loopback;
9pub mod meta;
10pub mod ruleset;
11#[cfg(feature = "remote")]
12pub mod rx_tx;
13#[cfg(feature = "remote")]
14pub mod server;
15pub mod tcp_pair;
16#[cfg(feature = "tokio")]
17#[cfg(test)]
18mod tests;
19
20#[cfg(feature = "remote")]
21pub use client::{RemoteNetworkingClient, RemoteNetworkingClientDriver};
22pub use composite::CompositeTcpListener;
23pub use loopback::LoopbackNetworking;
24use pin_project_lite::pin_project;
25#[cfg(feature = "rkyv")]
26use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
27#[cfg(feature = "remote")]
28pub use server::{RemoteNetworkingServer, RemoteNetworkingServerDriver};
29use std::fmt;
30use std::mem::MaybeUninit;
31use std::net::IpAddr;
32use std::net::Ipv4Addr;
33use std::net::Ipv6Addr;
34use std::net::Shutdown;
35use std::net::SocketAddr;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::Context;
39use std::task::Poll;
40use std::time::Duration;
41use thiserror::Error;
42#[cfg(feature = "tokio")]
43use tokio::io::AsyncRead;
44#[cfg(feature = "tokio")]
45use tokio::io::AsyncWrite;
46
47pub use bytes::Bytes;
48pub use bytes::BytesMut;
49use serde::{Deserialize, Serialize};
50#[cfg(feature = "host-net")]
51pub use virtual_mio::{InterestGuard, InterestHandlerWaker, InterestType};
52pub use virtual_mio::{InterestHandler, handler_into_waker};
53
54pub type Result<T> = std::result::Result<T, NetworkError>;
55
56/// Largest datagram payload (65535 - UDP header).
57/// Caps host allocation across address families; the OS rejects oversize packets.
58pub const MAX_SOCKET_PAYLOAD: usize = 65535 - 8;
59
60/// Represents an IP address and its netmask
61#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
62#[cfg_attr(feature = "rkyv", derive(RkyvSerialize, RkyvDeserialize, Archive))]
63pub struct IpCidr {
64    pub ip: IpAddr,
65    pub prefix: u8,
66}
67
68/// Represents a routing entry in the routing table of the interface
69#[derive(Clone, Debug, Serialize, Deserialize)]
70#[cfg_attr(feature = "rkyv", derive(RkyvSerialize, RkyvDeserialize, Archive))]
71pub struct IpRoute {
72    pub cidr: IpCidr,
73    pub via_router: IpAddr,
74    pub preferred_until: Option<Duration>,
75    pub expires_at: Option<Duration>,
76}
77
78/// Represents an IO source
79pub trait VirtualIoSource: fmt::Debug + Send + Sync + 'static {
80    /// Removes a previously registered waker using a token
81    fn remove_handler(&mut self);
82
83    /// Polls the source to see if there is data waiting
84    fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>>;
85
86    /// Polls the source to see if data can be sent
87    fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>>;
88}
89
90/// An implementation of virtual networking
91#[async_trait::async_trait]
92#[allow(unused_variables)]
93pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
94    /// Bridges this local network with a remote network, which is required in
95    /// order to make lower level networking calls (such as UDP/TCP)
96    async fn bridge(
97        &self,
98        network: &str,
99        access_token: &str,
100        security: StreamSecurity,
101    ) -> Result<()> {
102        Err(NetworkError::Unsupported)
103    }
104
105    /// Disconnects from the remote network essentially unbridging it
106    async fn unbridge(&self) -> Result<()> {
107        Err(NetworkError::Unsupported)
108    }
109
110    /// Acquires an IP address on the network and configures the routing tables
111    async fn dhcp_acquire(&self) -> Result<Vec<IpAddr>> {
112        Err(NetworkError::Unsupported)
113    }
114
115    /// Adds a static IP address to the interface with a netmask prefix
116    async fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<()> {
117        Err(NetworkError::Unsupported)
118    }
119
120    /// Removes a static (or dynamic) IP address from the interface
121    async fn ip_remove(&self, ip: IpAddr) -> Result<()> {
122        Err(NetworkError::Unsupported)
123    }
124
125    /// Clears all the assigned IP addresses for this interface
126    async fn ip_clear(&self) -> Result<()> {
127        Err(NetworkError::Unsupported)
128    }
129
130    /// Lists all the IP addresses currently assigned to this interface
131    async fn ip_list(&self) -> Result<Vec<IpCidr>> {
132        Err(NetworkError::Unsupported)
133    }
134
135    /// Returns the hardware MAC address for this interface
136    async fn mac(&self) -> Result<[u8; 6]> {
137        Err(NetworkError::Unsupported)
138    }
139
140    /// Adds a default gateway to the routing table
141    async fn gateway_set(&self, ip: IpAddr) -> Result<()> {
142        Err(NetworkError::Unsupported)
143    }
144
145    /// Adds a specific route to the routing table
146    async fn route_add(
147        &self,
148        cidr: IpCidr,
149        via_router: IpAddr,
150        preferred_until: Option<Duration>,
151        expires_at: Option<Duration>,
152    ) -> Result<()> {
153        Err(NetworkError::Unsupported)
154    }
155
156    /// Removes a routing rule from the routing table
157    async fn route_remove(&self, cidr: IpAddr) -> Result<()> {
158        Err(NetworkError::Unsupported)
159    }
160
161    /// Clears the routing table for this interface
162    async fn route_clear(&self) -> Result<()> {
163        Err(NetworkError::Unsupported)
164    }
165
166    /// Lists all the routes defined in the routing table for this interface
167    async fn route_list(&self) -> Result<Vec<IpRoute>> {
168        Err(NetworkError::Unsupported)
169    }
170
171    /// Creates a low level socket that can read and write Ethernet packets
172    /// directly to the interface
173    async fn bind_raw(&self) -> Result<Box<dyn VirtualRawSocket + Sync>> {
174        Err(NetworkError::Unsupported)
175    }
176
177    /// Lists for TCP connections on a specific IP and Port combination
178    /// Multiple servers (processes or threads) can bind to the same port if they each set
179    /// the reuse-port and-or reuse-addr flags
180    async fn listen_tcp(
181        &self,
182        addr: SocketAddr,
183        only_v6: bool,
184        reuse_port: bool,
185        reuse_addr: bool,
186    ) -> Result<Box<dyn VirtualTcpListener + Sync>> {
187        Err(NetworkError::Unsupported)
188    }
189
190    /// Binds a TCP socket to a specific IP and port without immediately
191    /// listening for connections or connecting to a peer.
192    async fn bind_tcp(
193        &self,
194        addr: SocketAddr,
195        only_v6: bool,
196        reuse_port: bool,
197        reuse_addr: bool,
198    ) -> Result<Box<dyn VirtualTcpBoundSocket + Sync>> {
199        Err(NetworkError::Unsupported)
200    }
201
202    /// Opens a UDP socket that listens on a specific IP and Port combination
203    /// Multiple servers (processes or threads) can bind to the same port if they each set
204    /// the reuse-port and-or reuse-addr flags
205    async fn bind_udp(
206        &self,
207        addr: SocketAddr,
208        reuse_port: bool,
209        reuse_addr: bool,
210    ) -> Result<Box<dyn VirtualUdpSocket + Sync>> {
211        Err(NetworkError::Unsupported)
212    }
213
214    /// Creates a socket that can be used to send and receive ICMP packets
215    /// from a paritcular IP address
216    async fn bind_icmp(&self, addr: IpAddr) -> Result<Box<dyn VirtualIcmpSocket + Sync>> {
217        Err(NetworkError::Unsupported)
218    }
219
220    /// Opens a TCP connection to a particular destination IP address and port
221    async fn connect_tcp(
222        &self,
223        addr: SocketAddr,
224        peer: SocketAddr,
225    ) -> Result<Box<dyn VirtualTcpSocket + Sync>> {
226        Err(NetworkError::Unsupported)
227    }
228
229    /// Performs DNS resolution for a specific hostname
230    async fn resolve(
231        &self,
232        host: &str,
233        port: Option<u16>,
234        dns_server: Option<IpAddr>,
235    ) -> Result<Vec<IpAddr>> {
236        Err(NetworkError::Unsupported)
237    }
238}
239
240pub type DynVirtualNetworking = Arc<dyn VirtualNetworking>;
241
242pub trait VirtualTcpListener: VirtualIoSource + fmt::Debug + Send + Sync + 'static {
243    /// Tries to accept a new connection
244    fn try_accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
245
246    /// Registers a waker for when a new connection has arrived. This uses
247    /// a stack machine which means more than one waker can be registered
248    fn set_handler(&mut self, handler: Box<dyn InterestHandler + Send + Sync>) -> Result<()>;
249
250    /// Returns the local address of this TCP listener
251    fn addr_local(&self) -> Result<SocketAddr>;
252
253    /// Sets how many network hops the packets are permitted for new connections
254    fn set_ttl(&mut self, ttl: u8) -> Result<()>;
255
256    /// Returns the maximum number of network hops before packets are dropped
257    fn ttl(&self) -> Result<u8>;
258}
259
260pub trait VirtualTcpBoundSocket: fmt::Debug + Send + Sync + 'static {
261    /// Returns the local address of this bound TCP socket.
262    fn addr_local(&self) -> Result<SocketAddr>;
263
264    /// Places the socket into listening mode.
265    fn listen(&mut self) -> Result<Box<dyn VirtualTcpListener + Sync>>;
266
267    /// Initiates a TCP connection using the already-bound local address.
268    fn connect(&mut self, peer: SocketAddr) -> Result<Box<dyn VirtualTcpSocket + Sync>>;
269
270    /// Sets how many network hops the packets are permitted for this socket.
271    fn set_ttl(&mut self, ttl: u32) -> Result<()>;
272
273    /// Returns the maximum number of network hops before packets are dropped.
274    fn ttl(&self) -> Result<u32>;
275}
276
277#[async_trait::async_trait]
278pub trait VirtualTcpListenerExt: VirtualTcpListener {
279    /// Accepts a new connection from the TCP listener
280    async fn accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
281}
282
283#[async_trait::async_trait]
284impl<R: VirtualTcpListener + ?Sized> VirtualTcpListenerExt for R {
285    async fn accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)> {
286        struct Poller<'a, R>
287        where
288            R: VirtualTcpListener + ?Sized,
289        {
290            listener: &'a mut R,
291        }
292        impl<R> std::future::Future for Poller<'_, R>
293        where
294            R: VirtualTcpListener + ?Sized,
295        {
296            type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
297            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
298                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
299                if let Err(err) = self.listener.set_handler(handler) {
300                    return Poll::Ready(Err(err));
301                }
302                match self.listener.try_accept() {
303                    Ok(ret) => Poll::Ready(Ok(ret)),
304                    Err(NetworkError::WouldBlock) => Poll::Pending,
305                    Err(err) => Poll::Ready(Err(err)),
306                }
307            }
308        }
309        Poller { listener: self }.await
310    }
311}
312
313pub trait VirtualSocket: VirtualIoSource + fmt::Debug + Send + Sync + 'static {
314    /// Sets how many network hops the packets are permitted for new connections
315    fn set_ttl(&mut self, ttl: u32) -> Result<()>;
316
317    /// Returns the maximum number of network hops before packets are dropped
318    fn ttl(&self) -> Result<u32>;
319
320    /// Returns the local address for this socket
321    fn addr_local(&self) -> Result<SocketAddr>;
322
323    /// Returns the status/state of the socket
324    fn status(&self) -> Result<SocketStatus>;
325
326    /// Returns and clears the last socket error when the backend can report one.
327    fn last_error(&self) -> Result<Option<NetworkError>> {
328        Ok(None)
329    }
330
331    /// Registers a waker for when this connection is ready to receive
332    /// more data. Uses a stack machine which means more than one waker
333    /// can be registered
334    fn set_handler(&mut self, handler: Box<dyn InterestHandler + Send + Sync>) -> Result<()>;
335}
336
337#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
338pub enum SocketStatus {
339    Opening,
340    Opened,
341    Closed,
342    Failed,
343}
344
345#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
346pub enum StreamSecurity {
347    Unencrypted,
348    AnyEncryption,
349    ClassicEncryption,
350    DoubleEncryption,
351}
352
353/// Connected sockets have a persistent connection to a remote peer
354pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
355    /// Determines how long the socket will remain in a TIME_WAIT
356    /// after it disconnects (only the one that initiates the close will
357    /// be in a TIME_WAIT state thus the clients should always do this rather
358    /// than the server)
359    fn set_linger(&mut self, linger: Option<Duration>) -> Result<()>;
360
361    /// Returns how long the socket will remain in a TIME_WAIT
362    /// after it disconnects
363    fn linger(&self) -> Result<Option<Duration>>;
364
365    /// Tries to send out a datagram or stream of bytes on this socket
366    fn try_send(&mut self, data: &[u8]) -> Result<usize>;
367
368    // Tries to flush any data in the local buffers
369    fn try_flush(&mut self) -> Result<()>;
370
371    /// Closes the socket
372    fn close(&mut self) -> Result<()>;
373
374    /// Tries to read a packet from the socket
375    fn try_recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
376}
377
378#[async_trait::async_trait]
379pub trait VirtualConnectedSocketExt: VirtualConnectedSocket {
380    async fn send(&mut self, data: &[u8]) -> Result<usize>;
381
382    async fn recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
383
384    async fn flush(&mut self) -> Result<()>;
385}
386
387#[async_trait::async_trait]
388impl<R: VirtualConnectedSocket + ?Sized> VirtualConnectedSocketExt for R {
389    async fn send(&mut self, data: &[u8]) -> Result<usize> {
390        pin_project! {
391            struct Poller<'a, 'b, R: ?Sized>
392            where
393                R: VirtualConnectedSocket,
394            {
395                socket: &'a mut R,
396                data: &'b [u8],
397            }
398        }
399        impl<R> std::future::Future for Poller<'_, '_, R>
400        where
401            R: VirtualConnectedSocket + ?Sized,
402        {
403            type Output = Result<usize>;
404            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
405                let this = self.project();
406
407                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
408                if let Err(err) = this.socket.set_handler(handler) {
409                    return Poll::Ready(Err(err));
410                }
411                match this.socket.try_send(this.data) {
412                    Ok(ret) => Poll::Ready(Ok(ret)),
413                    Err(NetworkError::WouldBlock) => Poll::Pending,
414                    Err(err) => Poll::Ready(Err(err)),
415                }
416            }
417        }
418        Poller { socket: self, data }.await
419    }
420
421    async fn recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize> {
422        pin_project! {
423            struct Poller<'a, 'b, R: ?Sized>
424            where
425                R: VirtualConnectedSocket,
426            {
427                socket: &'a mut R,
428                buf: &'b mut [MaybeUninit<u8>],
429                peek: bool,
430            }
431        }
432        impl<R> std::future::Future for Poller<'_, '_, R>
433        where
434            R: VirtualConnectedSocket + ?Sized,
435        {
436            type Output = Result<usize>;
437            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
438                let this = self.project();
439
440                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
441                if let Err(err) = this.socket.set_handler(handler) {
442                    return Poll::Ready(Err(err));
443                }
444                match this.socket.try_recv(this.buf, *this.peek) {
445                    Ok(ret) => Poll::Ready(Ok(ret)),
446                    Err(NetworkError::WouldBlock) => Poll::Pending,
447                    Err(err) => Poll::Ready(Err(err)),
448                }
449            }
450        }
451        Poller {
452            socket: self,
453            buf,
454            peek,
455        }
456        .await
457    }
458
459    async fn flush(&mut self) -> Result<()> {
460        struct Poller<'a, R>
461        where
462            R: VirtualConnectedSocket + ?Sized,
463        {
464            socket: &'a mut R,
465        }
466        impl<R> std::future::Future for Poller<'_, R>
467        where
468            R: VirtualConnectedSocket + ?Sized,
469        {
470            type Output = Result<()>;
471            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
472                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
473                if let Err(err) = self.socket.set_handler(handler) {
474                    return Poll::Ready(Err(err));
475                }
476                match self.socket.try_flush() {
477                    Ok(ret) => Poll::Ready(Ok(ret)),
478                    Err(NetworkError::WouldBlock) => Poll::Pending,
479                    Err(err) => Poll::Ready(Err(err)),
480                }
481            }
482        }
483        Poller { socket: self }.await
484    }
485}
486
487/// Connectionless sockets are able to send and receive datagrams and stream
488/// bytes to multiple addresses at the same time (peer-to-peer)
489pub trait VirtualConnectionlessSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
490    /// Sends out a datagram or stream of bytes on this socket
491    /// to a specific address
492    fn try_send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
493
494    /// Recv a packet from the socket
495    fn try_recv_from(
496        &mut self,
497        buf: &mut [MaybeUninit<u8>],
498        peek: bool,
499    ) -> Result<(usize, SocketAddr)>;
500}
501
502#[async_trait::async_trait]
503pub trait VirtualConnectionlessSocketExt: VirtualConnectionlessSocket {
504    async fn send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
505
506    async fn recv_from(
507        &mut self,
508        buf: &mut [MaybeUninit<u8>],
509        peek: bool,
510    ) -> Result<(usize, SocketAddr)>;
511}
512
513#[async_trait::async_trait]
514impl<R: VirtualConnectionlessSocket + ?Sized> VirtualConnectionlessSocketExt for R {
515    async fn send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize> {
516        pin_project! {
517            struct Poller<'a, 'b, R: ?Sized>
518            where
519                R: VirtualConnectionlessSocket,
520            {
521                socket: &'a mut R,
522                data: &'b [u8],
523                addr: SocketAddr,
524            }
525        }
526        impl<R> std::future::Future for Poller<'_, '_, R>
527        where
528            R: VirtualConnectionlessSocket + ?Sized,
529        {
530            type Output = Result<usize>;
531            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
532                let this = self.project();
533
534                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
535                if let Err(err) = this.socket.set_handler(handler) {
536                    return Poll::Ready(Err(err));
537                }
538                match this.socket.try_send_to(this.data, *this.addr) {
539                    Ok(ret) => Poll::Ready(Ok(ret)),
540                    Err(NetworkError::WouldBlock) => Poll::Pending,
541                    Err(err) => Poll::Ready(Err(err)),
542                }
543            }
544        }
545        Poller {
546            socket: self,
547            data,
548            addr,
549        }
550        .await
551    }
552
553    async fn recv_from(
554        &mut self,
555        buf: &mut [MaybeUninit<u8>],
556        peek: bool,
557    ) -> Result<(usize, SocketAddr)> {
558        pin_project! {
559            struct Poller<'a, 'b, R: ?Sized>
560            where
561                R: VirtualConnectionlessSocket,
562            {
563                socket: &'a mut R,
564                buf: &'b mut [MaybeUninit<u8>],
565                peek: bool,
566            }
567        }
568        impl<R> std::future::Future for Poller<'_, '_, R>
569        where
570            R: VirtualConnectionlessSocket + ?Sized,
571        {
572            type Output = Result<(usize, SocketAddr)>;
573            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
574                let this = self.project();
575
576                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
577                if let Err(err) = this.socket.set_handler(handler) {
578                    return Poll::Ready(Err(err));
579                }
580                match this.socket.try_recv_from(this.buf, *this.peek) {
581                    Ok(ret) => Poll::Ready(Ok(ret)),
582                    Err(NetworkError::WouldBlock) => Poll::Pending,
583                    Err(err) => Poll::Ready(Err(err)),
584                }
585            }
586        }
587        Poller {
588            socket: self,
589            buf,
590            peek,
591        }
592        .await
593    }
594}
595
596/// ICMP sockets are low level devices bound to a specific address
597/// that can send and receive ICMP packets
598pub trait VirtualIcmpSocket:
599    VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
600{
601}
602
603#[async_trait::async_trait]
604pub trait VirtualRawSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
605    /// Sends out a datagram or stream of bytes on this socket
606    fn try_send(&mut self, data: &[u8]) -> Result<usize>;
607
608    /// Attempts to flush the object, ensuring that any buffered data reach
609    /// their destination.
610    fn try_flush(&mut self) -> Result<()>;
611
612    /// Recv a packet from the socket
613    fn try_recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
614
615    /// Tells the raw socket and its backing switch that all packets
616    /// should be received by this socket even if they are not
617    /// destined for this device
618    fn set_promiscuous(&mut self, promiscuous: bool) -> Result<()>;
619
620    /// Returns if the socket is running in promiscuous mode whereby it
621    /// will receive all packets even if they are not destined for the
622    /// local interface
623    fn promiscuous(&self) -> Result<bool>;
624}
625
626pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync + 'static {
627    /// Sets the receive buffer size which acts as a throttle for how
628    /// much data is buffered on this side of the pipe
629    fn set_recv_buf_size(&mut self, size: usize) -> Result<()>;
630
631    /// Size of the receive buffer that holds all data that has not
632    /// yet been read
633    fn recv_buf_size(&self) -> Result<usize>;
634
635    /// Sets the size of the send buffer which will hold the bytes of
636    /// data while they are being sent over to the peer
637    fn set_send_buf_size(&mut self, size: usize) -> Result<()>;
638
639    /// Size of the send buffer that holds all data that is currently
640    /// being transmitted.
641    fn send_buf_size(&self) -> Result<usize>;
642
643    /// When NO_DELAY is set the data that needs to be transmitted to
644    /// the peer is sent immediately rather than waiting for a bigger
645    /// batch of data, this reduces latency but increases encapsulation
646    /// overhead.
647    fn set_nodelay(&mut self, reuse: bool) -> Result<()>;
648
649    /// Indicates if the NO_DELAY flag is set which means that data
650    /// is immediately sent to the peer without waiting. This reduces
651    /// latency but increases encapsulation overhead.
652    fn nodelay(&self) -> Result<bool>;
653
654    /// When KEEP_ALIVE is set the connection will periodically send
655    /// an empty data packet to the server to make sure the connection
656    /// stays alive.
657    fn set_keepalive(&mut self, keepalive: bool) -> Result<()>;
658
659    /// Indicates if the KEEP_ALIVE flag is set which means that the
660    /// socket will periodically send an empty data packet to keep
661    /// the connection alive.
662    fn keepalive(&self) -> Result<bool>;
663
664    /// When DONT_ROUTE is set the packet will be sent directly
665    /// to the interface without passing through the routing logic.
666    fn set_dontroute(&mut self, keepalive: bool) -> Result<()>;
667
668    /// Indicates if the packet will pass straight through to
669    /// the interface bypassing the routing logic.
670    fn dontroute(&self) -> Result<bool>;
671
672    /// Returns the address (IP and Port) of the peer socket that this
673    /// is connected to
674    fn addr_peer(&self) -> Result<SocketAddr>;
675
676    /// Shuts down either the READER or WRITER sides of the socket
677    /// connection.
678    fn shutdown(&mut self, how: Shutdown) -> Result<()>;
679
680    /// Return true if the socket is closed
681    fn is_closed(&self) -> bool;
682}
683
684#[cfg(feature = "tokio")]
685impl<'a> AsyncRead for Box<dyn VirtualTcpSocket + Sync + 'a> {
686    fn poll_read(
687        self: Pin<&mut Self>,
688        cx: &mut Context<'_>,
689        buf: &mut tokio::io::ReadBuf<'_>,
690    ) -> Poll<std::io::Result<()>> {
691        let this = self.get_mut();
692        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
693        if let Err(err) = this.set_handler(handler) {
694            return Poll::Ready(Err(net_error_into_io_err(err)));
695        }
696        let buf_unsafe = unsafe { buf.unfilled_mut() };
697        match this.try_recv(buf_unsafe, false) {
698            Ok(ret) => {
699                unsafe { buf.assume_init(ret) };
700                buf.set_filled(ret);
701                Poll::Ready(Ok(()))
702            }
703            Err(NetworkError::WouldBlock) => Poll::Pending,
704            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
705        }
706    }
707}
708
709#[cfg(feature = "tokio")]
710impl<'a> AsyncWrite for Box<dyn VirtualTcpSocket + Sync + 'a> {
711    fn poll_write(
712        self: Pin<&mut Self>,
713        cx: &mut Context<'_>,
714        buf: &[u8],
715    ) -> Poll<std::io::Result<usize>> {
716        let this = self.get_mut();
717        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
718        if let Err(err) = this.set_handler(handler) {
719            return Poll::Ready(Err(net_error_into_io_err(err)));
720        }
721        match this.try_send(buf) {
722            Ok(ret) => Poll::Ready(Ok(ret)),
723            Err(NetworkError::WouldBlock) => Poll::Pending,
724            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
725        }
726    }
727
728    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
729        let this = self.get_mut();
730        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
731        if let Err(err) = this.set_handler(handler) {
732            return Poll::Ready(Err(net_error_into_io_err(err)));
733        }
734        match this.try_flush() {
735            Ok(()) => Poll::Ready(Ok(())),
736            Err(NetworkError::WouldBlock) => Poll::Pending,
737            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
738        }
739    }
740
741    fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
742        Poll::Ready(
743            self.shutdown(Shutdown::Write)
744                .map_err(net_error_into_io_err),
745        )
746    }
747}
748
749pub trait VirtualUdpSocket:
750    VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
751{
752    /// Sets a flag that means that the UDP socket is able
753    /// to receive and process broadcast packets.
754    fn set_broadcast(&mut self, broadcast: bool) -> Result<()>;
755
756    /// Indicates if the SO_BROADCAST flag is set which means
757    /// that the UDP socket will receive and process broadcast
758    /// packets
759    fn broadcast(&self) -> Result<bool>;
760
761    /// Sets a flag that indicates if multicast packets that
762    /// this socket is a member of will be looped back to
763    /// the sending socket. This applies to IPv4 addresses
764    fn set_multicast_loop_v4(&mut self, val: bool) -> Result<()>;
765
766    /// Gets a flag that indicates if multicast packets that
767    /// this socket is a member of will be looped back to
768    /// the sending socket. This applies to IPv4 addresses
769    fn multicast_loop_v4(&self) -> Result<bool>;
770
771    /// Sets a flag that indicates if multicast packets that
772    /// this socket is a member of will be looped back to
773    /// the sending socket. This applies to IPv6 addresses
774    fn set_multicast_loop_v6(&mut self, val: bool) -> Result<()>;
775
776    /// Gets a flag that indicates if multicast packets that
777    /// this socket is a member of will be looped back to
778    /// the sending socket. This applies to IPv6 addresses
779    fn multicast_loop_v6(&self) -> Result<bool>;
780
781    /// Sets the TTL for IPv4 multicast packets which is the
782    /// number of network hops before the packet is dropped
783    fn set_multicast_ttl_v4(&mut self, ttl: u32) -> Result<()>;
784
785    /// Gets the TTL for IPv4 multicast packets which is the
786    /// number of network hops before the packet is dropped
787    fn multicast_ttl_v4(&self) -> Result<u32>;
788
789    /// Tells this interface that it will subscribe to a
790    /// particular multicast address. This applies to IPv4 addresses
791    fn join_multicast_v4(&mut self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<()>;
792
793    /// Tells this interface that it will unsubscribe to a
794    /// particular multicast address. This applies to IPv4 addresses
795    fn leave_multicast_v4(&mut self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<()>;
796
797    /// Tells this interface that it will subscribe to a
798    /// particular multicast address. This applies to IPv6 addresses
799    fn join_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<()>;
800
801    /// Tells this interface that it will unsubscribe to a
802    /// particular multicast address. This applies to IPv6 addresses
803    fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<()>;
804
805    /// Returns the remote address of this UDP socket if it has been
806    /// connected to a specific target destination address
807    fn addr_peer(&self) -> Result<Option<SocketAddr>>;
808}
809
810#[derive(Debug, Default)]
811pub struct UnsupportedVirtualNetworking {}
812
813#[async_trait::async_trait]
814impl VirtualNetworking for UnsupportedVirtualNetworking {}
815
816#[derive(Error, Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
817pub enum NetworkError {
818    /// The handle given was not usable
819    #[error("invalid fd")]
820    InvalidFd,
821    /// File exists
822    #[error("file exists")]
823    AlreadyExists,
824    /// The filesystem has failed to lock a resource.
825    #[error("lock error")]
826    Lock,
827    /// Something failed when doing IO. These errors can generally not be handled.
828    /// It may work if tried again.
829    #[error("io error")]
830    IOError,
831    /// The address was in use
832    #[error("address is in use")]
833    AddressInUse,
834    /// The address could not be found
835    #[error("address could not be found")]
836    AddressNotAvailable,
837    /// A pipe was closed
838    #[error("broken pipe (was closed)")]
839    BrokenPipe,
840    /// Insufficient memory
841    #[error("Insufficient memory")]
842    InsufficientMemory,
843    /// The connection was aborted
844    #[error("connection aborted")]
845    ConnectionAborted,
846    /// The connection request was refused
847    #[error("connection refused")]
848    ConnectionRefused,
849    /// The connection was reset
850    #[error("connection reset")]
851    ConnectionReset,
852    /// The operation was interrupted before it could finish
853    #[error("operation interrupted")]
854    Interrupted,
855    /// Invalid internal data, if the argument data is invalid, use `InvalidInput`
856    #[error("invalid internal data")]
857    InvalidData,
858    /// The provided data is invalid
859    #[error("invalid input")]
860    InvalidInput,
861    /// The message is too large to send as one packet
862    #[error("message too large")]
863    MessageSize,
864    /// Could not perform the operation because there was not an open connection
865    #[error("connection is not open")]
866    NotConnected,
867    /// The requested device couldn't be accessed
868    #[error("can't access device")]
869    NoDevice,
870    /// Caller was not allowed to perform this operation
871    #[error("permission denied")]
872    PermissionDenied,
873    /// The operation did not complete within the given amount of time
874    #[error("time out")]
875    TimedOut,
876    /// Found EOF when EOF was not expected
877    #[error("unexpected eof")]
878    UnexpectedEof,
879    /// Operation would block, this error lets the caller know that they can try again
880    #[error("blocking operation. try again")]
881    WouldBlock,
882    /// A call to write returned 0
883    #[error("write returned 0")]
884    WriteZero,
885    /// Too many open files
886    #[error("too many open files")]
887    TooManyOpenFiles,
888    /// The operation is not supported.
889    #[error("unsupported")]
890    Unsupported,
891    /// Some other unhandled error. If you see this, it's probably a bug.
892    #[error("unknown error found")]
893    UnknownError,
894}
895
896pub fn io_err_into_net_error(net_error: std::io::Error) -> NetworkError {
897    use std::io::ErrorKind;
898    match net_error.kind() {
899        ErrorKind::BrokenPipe => NetworkError::BrokenPipe,
900        ErrorKind::AlreadyExists => NetworkError::AlreadyExists,
901        ErrorKind::AddrInUse => NetworkError::AddressInUse,
902        ErrorKind::AddrNotAvailable => NetworkError::AddressNotAvailable,
903        ErrorKind::ConnectionAborted => NetworkError::ConnectionAborted,
904        ErrorKind::ConnectionRefused => NetworkError::ConnectionRefused,
905        ErrorKind::ConnectionReset => NetworkError::ConnectionReset,
906        ErrorKind::Interrupted => NetworkError::Interrupted,
907        ErrorKind::InvalidData => NetworkError::InvalidData,
908        ErrorKind::InvalidInput => {
909            #[cfg(all(target_family = "unix", feature = "libc"))]
910            if net_error.raw_os_error() == Some(libc::EMSGSIZE) {
911                return NetworkError::MessageSize;
912            }
913            NetworkError::InvalidInput
914        }
915        ErrorKind::NotConnected => NetworkError::NotConnected,
916        ErrorKind::PermissionDenied => NetworkError::PermissionDenied,
917        ErrorKind::TimedOut => NetworkError::TimedOut,
918        ErrorKind::UnexpectedEof => NetworkError::UnexpectedEof,
919        ErrorKind::WouldBlock => NetworkError::WouldBlock,
920        ErrorKind::WriteZero => NetworkError::WriteZero,
921        ErrorKind::Unsupported => NetworkError::Unsupported,
922
923        #[cfg(all(target_family = "unix", feature = "libc"))]
924        _ => {
925            if let Some(code) = net_error.raw_os_error() {
926                match code {
927                    libc::EPERM => NetworkError::PermissionDenied,
928                    libc::EBADF => NetworkError::InvalidFd,
929                    libc::ECHILD => NetworkError::InvalidFd,
930                    libc::EMFILE => NetworkError::TooManyOpenFiles,
931                    libc::EINTR => NetworkError::Interrupted,
932                    libc::EIO => NetworkError::IOError,
933                    libc::ENXIO => NetworkError::IOError,
934                    libc::EAGAIN => NetworkError::WouldBlock,
935                    libc::ENOMEM => NetworkError::InsufficientMemory,
936                    libc::EACCES => NetworkError::PermissionDenied,
937                    libc::ENODEV => NetworkError::NoDevice,
938                    libc::EINVAL => NetworkError::InvalidInput,
939                    libc::EMSGSIZE => NetworkError::MessageSize,
940                    libc::EPIPE => NetworkError::BrokenPipe,
941                    err => {
942                        tracing::trace!("unknown os error {}", err);
943                        NetworkError::UnknownError
944                    }
945                }
946            } else {
947                NetworkError::UnknownError
948            }
949        }
950        #[cfg(not(all(target_family = "unix", feature = "libc")))]
951        _ => NetworkError::UnknownError,
952    }
953}
954
955pub fn net_error_into_io_err(net_error: NetworkError) -> std::io::Error {
956    use std::io::ErrorKind;
957    match net_error {
958        NetworkError::InvalidFd => ErrorKind::BrokenPipe.into(),
959        NetworkError::AlreadyExists => ErrorKind::AlreadyExists.into(),
960        NetworkError::Lock => ErrorKind::BrokenPipe.into(),
961        NetworkError::IOError => ErrorKind::BrokenPipe.into(),
962        NetworkError::AddressInUse => ErrorKind::AddrInUse.into(),
963        NetworkError::AddressNotAvailable => ErrorKind::AddrNotAvailable.into(),
964        NetworkError::BrokenPipe => ErrorKind::BrokenPipe.into(),
965        NetworkError::ConnectionAborted => ErrorKind::ConnectionAborted.into(),
966        NetworkError::ConnectionRefused => ErrorKind::ConnectionRefused.into(),
967        NetworkError::ConnectionReset => ErrorKind::ConnectionReset.into(),
968        NetworkError::Interrupted => ErrorKind::Interrupted.into(),
969        NetworkError::InvalidData => ErrorKind::InvalidData.into(),
970        NetworkError::InvalidInput => ErrorKind::InvalidInput.into(),
971        NetworkError::NotConnected => ErrorKind::NotConnected.into(),
972        NetworkError::NoDevice => ErrorKind::BrokenPipe.into(),
973        NetworkError::PermissionDenied => ErrorKind::PermissionDenied.into(),
974        NetworkError::TimedOut => ErrorKind::TimedOut.into(),
975        NetworkError::UnexpectedEof => ErrorKind::UnexpectedEof.into(),
976        NetworkError::WouldBlock => ErrorKind::WouldBlock.into(),
977        NetworkError::WriteZero => ErrorKind::WriteZero.into(),
978        NetworkError::Unsupported => ErrorKind::Unsupported.into(),
979        NetworkError::UnknownError => ErrorKind::BrokenPipe.into(),
980        NetworkError::InsufficientMemory => ErrorKind::OutOfMemory.into(),
981        NetworkError::MessageSize => {
982            #[cfg(all(target_family = "unix", feature = "libc"))]
983            {
984                std::io::Error::from_raw_os_error(libc::EMSGSIZE)
985            }
986            #[cfg(not(all(target_family = "unix", feature = "libc")))]
987            {
988                ErrorKind::Other.into()
989            }
990        }
991        NetworkError::TooManyOpenFiles => {
992            #[cfg(all(target_family = "unix", feature = "libc"))]
993            {
994                std::io::Error::from_raw_os_error(libc::EMFILE)
995            }
996            #[cfg(not(all(target_family = "unix", feature = "libc")))]
997            {
998                ErrorKind::Other.into()
999            }
1000        }
1001    }
1002}