virtual_net/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![allow(clippy::multiple_bound_locations)]
3#[cfg(feature = "remote")]
4pub mod client;
5pub mod composite;
6#[cfg(feature = "host-net")]
7pub mod host;
8pub mod loopback;
9pub mod meta;
10pub mod ruleset;
11#[cfg(feature = "remote")]
12pub mod rx_tx;
13#[cfg(feature = "remote")]
14pub mod server;
15pub mod tcp_pair;
16#[cfg(feature = "tokio")]
17#[cfg(test)]
18mod tests;
19
20#[cfg(feature = "remote")]
21pub use client::{RemoteNetworkingClient, RemoteNetworkingClientDriver};
22pub use composite::CompositeTcpListener;
23pub use loopback::LoopbackNetworking;
24use pin_project_lite::pin_project;
25#[cfg(feature = "rkyv")]
26use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
27#[cfg(feature = "remote")]
28pub use server::{RemoteNetworkingServer, RemoteNetworkingServerDriver};
29use std::fmt;
30use std::mem::MaybeUninit;
31use std::net::IpAddr;
32use std::net::Ipv4Addr;
33use std::net::Ipv6Addr;
34use std::net::Shutdown;
35use std::net::SocketAddr;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::Context;
39use std::task::Poll;
40use std::time::Duration;
41use thiserror::Error;
42#[cfg(feature = "tokio")]
43use tokio::io::AsyncRead;
44#[cfg(feature = "tokio")]
45use tokio::io::AsyncWrite;
46
47pub use bytes::Bytes;
48pub use bytes::BytesMut;
49use serde::{Deserialize, Serialize};
50#[cfg(feature = "host-net")]
51pub use virtual_mio::{InterestGuard, InterestHandlerWaker, InterestType};
52pub use virtual_mio::{InterestHandler, handler_into_waker};
53
54pub type Result<T> = std::result::Result<T, NetworkError>;
55
56/// Represents an IP address and its netmask
57#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
58#[cfg_attr(feature = "rkyv", derive(RkyvSerialize, RkyvDeserialize, Archive))]
59pub struct IpCidr {
60    pub ip: IpAddr,
61    pub prefix: u8,
62}
63
64/// Represents a routing entry in the routing table of the interface
65#[derive(Clone, Debug, Serialize, Deserialize)]
66#[cfg_attr(feature = "rkyv", derive(RkyvSerialize, RkyvDeserialize, Archive))]
67pub struct IpRoute {
68    pub cidr: IpCidr,
69    pub via_router: IpAddr,
70    pub preferred_until: Option<Duration>,
71    pub expires_at: Option<Duration>,
72}
73
74/// Represents an IO source
75pub trait VirtualIoSource: fmt::Debug + Send + Sync + 'static {
76    /// Removes a previously registered waker using a token
77    fn remove_handler(&mut self);
78
79    /// Polls the source to see if there is data waiting
80    fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>>;
81
82    /// Polls the source to see if data can be sent
83    fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>>;
84}
85
86/// An implementation of virtual networking
87#[async_trait::async_trait]
88#[allow(unused_variables)]
89pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
90    /// Bridges this local network with a remote network, which is required in
91    /// order to make lower level networking calls (such as UDP/TCP)
92    async fn bridge(
93        &self,
94        network: &str,
95        access_token: &str,
96        security: StreamSecurity,
97    ) -> Result<()> {
98        Err(NetworkError::Unsupported)
99    }
100
101    /// Disconnects from the remote network essentially unbridging it
102    async fn unbridge(&self) -> Result<()> {
103        Err(NetworkError::Unsupported)
104    }
105
106    /// Acquires an IP address on the network and configures the routing tables
107    async fn dhcp_acquire(&self) -> Result<Vec<IpAddr>> {
108        Err(NetworkError::Unsupported)
109    }
110
111    /// Adds a static IP address to the interface with a netmask prefix
112    async fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<()> {
113        Err(NetworkError::Unsupported)
114    }
115
116    /// Removes a static (or dynamic) IP address from the interface
117    async fn ip_remove(&self, ip: IpAddr) -> Result<()> {
118        Err(NetworkError::Unsupported)
119    }
120
121    /// Clears all the assigned IP addresses for this interface
122    async fn ip_clear(&self) -> Result<()> {
123        Err(NetworkError::Unsupported)
124    }
125
126    /// Lists all the IP addresses currently assigned to this interface
127    async fn ip_list(&self) -> Result<Vec<IpCidr>> {
128        Err(NetworkError::Unsupported)
129    }
130
131    /// Returns the hardware MAC address for this interface
132    async fn mac(&self) -> Result<[u8; 6]> {
133        Err(NetworkError::Unsupported)
134    }
135
136    /// Adds a default gateway to the routing table
137    async fn gateway_set(&self, ip: IpAddr) -> Result<()> {
138        Err(NetworkError::Unsupported)
139    }
140
141    /// Adds a specific route to the routing table
142    async fn route_add(
143        &self,
144        cidr: IpCidr,
145        via_router: IpAddr,
146        preferred_until: Option<Duration>,
147        expires_at: Option<Duration>,
148    ) -> Result<()> {
149        Err(NetworkError::Unsupported)
150    }
151
152    /// Removes a routing rule from the routing table
153    async fn route_remove(&self, cidr: IpAddr) -> Result<()> {
154        Err(NetworkError::Unsupported)
155    }
156
157    /// Clears the routing table for this interface
158    async fn route_clear(&self) -> Result<()> {
159        Err(NetworkError::Unsupported)
160    }
161
162    /// Lists all the routes defined in the routing table for this interface
163    async fn route_list(&self) -> Result<Vec<IpRoute>> {
164        Err(NetworkError::Unsupported)
165    }
166
167    /// Creates a low level socket that can read and write Ethernet packets
168    /// directly to the interface
169    async fn bind_raw(&self) -> Result<Box<dyn VirtualRawSocket + Sync>> {
170        Err(NetworkError::Unsupported)
171    }
172
173    /// Lists for TCP connections on a specific IP and Port combination
174    /// Multiple servers (processes or threads) can bind to the same port if they each set
175    /// the reuse-port and-or reuse-addr flags
176    async fn listen_tcp(
177        &self,
178        addr: SocketAddr,
179        only_v6: bool,
180        reuse_port: bool,
181        reuse_addr: bool,
182    ) -> Result<Box<dyn VirtualTcpListener + Sync>> {
183        Err(NetworkError::Unsupported)
184    }
185
186    /// Binds a TCP socket to a specific IP and port without immediately
187    /// listening for connections or connecting to a peer.
188    async fn bind_tcp(
189        &self,
190        addr: SocketAddr,
191        only_v6: bool,
192        reuse_port: bool,
193        reuse_addr: bool,
194    ) -> Result<Box<dyn VirtualTcpBoundSocket + Sync>> {
195        Err(NetworkError::Unsupported)
196    }
197
198    /// Opens a UDP socket that listens on a specific IP and Port combination
199    /// Multiple servers (processes or threads) can bind to the same port if they each set
200    /// the reuse-port and-or reuse-addr flags
201    async fn bind_udp(
202        &self,
203        addr: SocketAddr,
204        reuse_port: bool,
205        reuse_addr: bool,
206    ) -> Result<Box<dyn VirtualUdpSocket + Sync>> {
207        Err(NetworkError::Unsupported)
208    }
209
210    /// Creates a socket that can be used to send and receive ICMP packets
211    /// from a paritcular IP address
212    async fn bind_icmp(&self, addr: IpAddr) -> Result<Box<dyn VirtualIcmpSocket + Sync>> {
213        Err(NetworkError::Unsupported)
214    }
215
216    /// Opens a TCP connection to a particular destination IP address and port
217    async fn connect_tcp(
218        &self,
219        addr: SocketAddr,
220        peer: SocketAddr,
221    ) -> Result<Box<dyn VirtualTcpSocket + Sync>> {
222        Err(NetworkError::Unsupported)
223    }
224
225    /// Performs DNS resolution for a specific hostname
226    async fn resolve(
227        &self,
228        host: &str,
229        port: Option<u16>,
230        dns_server: Option<IpAddr>,
231    ) -> Result<Vec<IpAddr>> {
232        Err(NetworkError::Unsupported)
233    }
234}
235
236pub type DynVirtualNetworking = Arc<dyn VirtualNetworking>;
237
238pub trait VirtualTcpListener: VirtualIoSource + fmt::Debug + Send + Sync + 'static {
239    /// Tries to accept a new connection
240    fn try_accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
241
242    /// Registers a waker for when a new connection has arrived. This uses
243    /// a stack machine which means more than one waker can be registered
244    fn set_handler(&mut self, handler: Box<dyn InterestHandler + Send + Sync>) -> Result<()>;
245
246    /// Returns the local address of this TCP listener
247    fn addr_local(&self) -> Result<SocketAddr>;
248
249    /// Sets how many network hops the packets are permitted for new connections
250    fn set_ttl(&mut self, ttl: u8) -> Result<()>;
251
252    /// Returns the maximum number of network hops before packets are dropped
253    fn ttl(&self) -> Result<u8>;
254}
255
256pub trait VirtualTcpBoundSocket: fmt::Debug + Send + Sync + 'static {
257    /// Returns the local address of this bound TCP socket.
258    fn addr_local(&self) -> Result<SocketAddr>;
259
260    /// Places the socket into listening mode.
261    fn listen(&mut self) -> Result<Box<dyn VirtualTcpListener + Sync>>;
262
263    /// Initiates a TCP connection using the already-bound local address.
264    fn connect(&mut self, peer: SocketAddr) -> Result<Box<dyn VirtualTcpSocket + Sync>>;
265
266    /// Sets how many network hops the packets are permitted for this socket.
267    fn set_ttl(&mut self, ttl: u32) -> Result<()>;
268
269    /// Returns the maximum number of network hops before packets are dropped.
270    fn ttl(&self) -> Result<u32>;
271}
272
273#[async_trait::async_trait]
274pub trait VirtualTcpListenerExt: VirtualTcpListener {
275    /// Accepts a new connection from the TCP listener
276    async fn accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
277}
278
279#[async_trait::async_trait]
280impl<R: VirtualTcpListener + ?Sized> VirtualTcpListenerExt for R {
281    async fn accept(&mut self) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)> {
282        struct Poller<'a, R>
283        where
284            R: VirtualTcpListener + ?Sized,
285        {
286            listener: &'a mut R,
287        }
288        impl<R> std::future::Future for Poller<'_, R>
289        where
290            R: VirtualTcpListener + ?Sized,
291        {
292            type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>;
293            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
295                if let Err(err) = self.listener.set_handler(handler) {
296                    return Poll::Ready(Err(err));
297                }
298                match self.listener.try_accept() {
299                    Ok(ret) => Poll::Ready(Ok(ret)),
300                    Err(NetworkError::WouldBlock) => Poll::Pending,
301                    Err(err) => Poll::Ready(Err(err)),
302                }
303            }
304        }
305        Poller { listener: self }.await
306    }
307}
308
309pub trait VirtualSocket: VirtualIoSource + fmt::Debug + Send + Sync + 'static {
310    /// Sets how many network hops the packets are permitted for new connections
311    fn set_ttl(&mut self, ttl: u32) -> Result<()>;
312
313    /// Returns the maximum number of network hops before packets are dropped
314    fn ttl(&self) -> Result<u32>;
315
316    /// Returns the local address for this socket
317    fn addr_local(&self) -> Result<SocketAddr>;
318
319    /// Returns the status/state of the socket
320    fn status(&self) -> Result<SocketStatus>;
321
322    /// Registers a waker for when this connection is ready to receive
323    /// more data. Uses a stack machine which means more than one waker
324    /// can be registered
325    fn set_handler(&mut self, handler: Box<dyn InterestHandler + Send + Sync>) -> Result<()>;
326}
327
328#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
329pub enum SocketStatus {
330    Opening,
331    Opened,
332    Closed,
333    Failed,
334}
335
336#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
337pub enum StreamSecurity {
338    Unencrypted,
339    AnyEncryption,
340    ClassicEncryption,
341    DoubleEncryption,
342}
343
344/// Connected sockets have a persistent connection to a remote peer
345pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
346    /// Determines how long the socket will remain in a TIME_WAIT
347    /// after it disconnects (only the one that initiates the close will
348    /// be in a TIME_WAIT state thus the clients should always do this rather
349    /// than the server)
350    fn set_linger(&mut self, linger: Option<Duration>) -> Result<()>;
351
352    /// Returns how long the socket will remain in a TIME_WAIT
353    /// after it disconnects
354    fn linger(&self) -> Result<Option<Duration>>;
355
356    /// Tries to send out a datagram or stream of bytes on this socket
357    fn try_send(&mut self, data: &[u8]) -> Result<usize>;
358
359    // Tries to flush any data in the local buffers
360    fn try_flush(&mut self) -> Result<()>;
361
362    /// Closes the socket
363    fn close(&mut self) -> Result<()>;
364
365    /// Tries to read a packet from the socket
366    fn try_recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
367}
368
369#[async_trait::async_trait]
370pub trait VirtualConnectedSocketExt: VirtualConnectedSocket {
371    async fn send(&mut self, data: &[u8]) -> Result<usize>;
372
373    async fn recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
374
375    async fn flush(&mut self) -> Result<()>;
376}
377
378#[async_trait::async_trait]
379impl<R: VirtualConnectedSocket + ?Sized> VirtualConnectedSocketExt for R {
380    async fn send(&mut self, data: &[u8]) -> Result<usize> {
381        pin_project! {
382            struct Poller<'a, 'b, R: ?Sized>
383            where
384                R: VirtualConnectedSocket,
385            {
386                socket: &'a mut R,
387                data: &'b [u8],
388            }
389        }
390        impl<R> std::future::Future for Poller<'_, '_, R>
391        where
392            R: VirtualConnectedSocket + ?Sized,
393        {
394            type Output = Result<usize>;
395            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
396                let this = self.project();
397
398                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
399                if let Err(err) = this.socket.set_handler(handler) {
400                    return Poll::Ready(Err(err));
401                }
402                match this.socket.try_send(this.data) {
403                    Ok(ret) => Poll::Ready(Ok(ret)),
404                    Err(NetworkError::WouldBlock) => Poll::Pending,
405                    Err(err) => Poll::Ready(Err(err)),
406                }
407            }
408        }
409        Poller { socket: self, data }.await
410    }
411
412    async fn recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize> {
413        pin_project! {
414            struct Poller<'a, 'b, R: ?Sized>
415            where
416                R: VirtualConnectedSocket,
417            {
418                socket: &'a mut R,
419                buf: &'b mut [MaybeUninit<u8>],
420                peek: bool,
421            }
422        }
423        impl<R> std::future::Future for Poller<'_, '_, R>
424        where
425            R: VirtualConnectedSocket + ?Sized,
426        {
427            type Output = Result<usize>;
428            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
429                let this = self.project();
430
431                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
432                if let Err(err) = this.socket.set_handler(handler) {
433                    return Poll::Ready(Err(err));
434                }
435                match this.socket.try_recv(this.buf, *this.peek) {
436                    Ok(ret) => Poll::Ready(Ok(ret)),
437                    Err(NetworkError::WouldBlock) => Poll::Pending,
438                    Err(err) => Poll::Ready(Err(err)),
439                }
440            }
441        }
442        Poller {
443            socket: self,
444            buf,
445            peek,
446        }
447        .await
448    }
449
450    async fn flush(&mut self) -> Result<()> {
451        struct Poller<'a, R>
452        where
453            R: VirtualConnectedSocket + ?Sized,
454        {
455            socket: &'a mut R,
456        }
457        impl<R> std::future::Future for Poller<'_, R>
458        where
459            R: VirtualConnectedSocket + ?Sized,
460        {
461            type Output = Result<()>;
462            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
463                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
464                if let Err(err) = self.socket.set_handler(handler) {
465                    return Poll::Ready(Err(err));
466                }
467                match self.socket.try_flush() {
468                    Ok(ret) => Poll::Ready(Ok(ret)),
469                    Err(NetworkError::WouldBlock) => Poll::Pending,
470                    Err(err) => Poll::Ready(Err(err)),
471                }
472            }
473        }
474        Poller { socket: self }.await
475    }
476}
477
478/// Connectionless sockets are able to send and receive datagrams and stream
479/// bytes to multiple addresses at the same time (peer-to-peer)
480pub trait VirtualConnectionlessSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
481    /// Sends out a datagram or stream of bytes on this socket
482    /// to a specific address
483    fn try_send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
484
485    /// Recv a packet from the socket
486    fn try_recv_from(
487        &mut self,
488        buf: &mut [MaybeUninit<u8>],
489        peek: bool,
490    ) -> Result<(usize, SocketAddr)>;
491}
492
493#[async_trait::async_trait]
494pub trait VirtualConnectionlessSocketExt: VirtualConnectionlessSocket {
495    async fn send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
496
497    async fn recv_from(
498        &mut self,
499        buf: &mut [MaybeUninit<u8>],
500        peek: bool,
501    ) -> Result<(usize, SocketAddr)>;
502}
503
504#[async_trait::async_trait]
505impl<R: VirtualConnectionlessSocket + ?Sized> VirtualConnectionlessSocketExt for R {
506    async fn send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize> {
507        pin_project! {
508            struct Poller<'a, 'b, R: ?Sized>
509            where
510                R: VirtualConnectionlessSocket,
511            {
512                socket: &'a mut R,
513                data: &'b [u8],
514                addr: SocketAddr,
515            }
516        }
517        impl<R> std::future::Future for Poller<'_, '_, R>
518        where
519            R: VirtualConnectionlessSocket + ?Sized,
520        {
521            type Output = Result<usize>;
522            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
523                let this = self.project();
524
525                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
526                if let Err(err) = this.socket.set_handler(handler) {
527                    return Poll::Ready(Err(err));
528                }
529                match this.socket.try_send_to(this.data, *this.addr) {
530                    Ok(ret) => Poll::Ready(Ok(ret)),
531                    Err(NetworkError::WouldBlock) => Poll::Pending,
532                    Err(err) => Poll::Ready(Err(err)),
533                }
534            }
535        }
536        Poller {
537            socket: self,
538            data,
539            addr,
540        }
541        .await
542    }
543
544    async fn recv_from(
545        &mut self,
546        buf: &mut [MaybeUninit<u8>],
547        peek: bool,
548    ) -> Result<(usize, SocketAddr)> {
549        pin_project! {
550            struct Poller<'a, 'b, R: ?Sized>
551            where
552                R: VirtualConnectionlessSocket,
553            {
554                socket: &'a mut R,
555                buf: &'b mut [MaybeUninit<u8>],
556                peek: bool,
557            }
558        }
559        impl<R> std::future::Future for Poller<'_, '_, R>
560        where
561            R: VirtualConnectionlessSocket + ?Sized,
562        {
563            type Output = Result<(usize, SocketAddr)>;
564            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565                let this = self.project();
566
567                let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
568                if let Err(err) = this.socket.set_handler(handler) {
569                    return Poll::Ready(Err(err));
570                }
571                match this.socket.try_recv_from(this.buf, *this.peek) {
572                    Ok(ret) => Poll::Ready(Ok(ret)),
573                    Err(NetworkError::WouldBlock) => Poll::Pending,
574                    Err(err) => Poll::Ready(Err(err)),
575                }
576            }
577        }
578        Poller {
579            socket: self,
580            buf,
581            peek,
582        }
583        .await
584    }
585}
586
587/// ICMP sockets are low level devices bound to a specific address
588/// that can send and receive ICMP packets
589pub trait VirtualIcmpSocket:
590    VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
591{
592}
593
594#[async_trait::async_trait]
595pub trait VirtualRawSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
596    /// Sends out a datagram or stream of bytes on this socket
597    fn try_send(&mut self, data: &[u8]) -> Result<usize>;
598
599    /// Attempts to flush the object, ensuring that any buffered data reach
600    /// their destination.
601    fn try_flush(&mut self) -> Result<()>;
602
603    /// Recv a packet from the socket
604    fn try_recv(&mut self, buf: &mut [MaybeUninit<u8>], peek: bool) -> Result<usize>;
605
606    /// Tells the raw socket and its backing switch that all packets
607    /// should be received by this socket even if they are not
608    /// destined for this device
609    fn set_promiscuous(&mut self, promiscuous: bool) -> Result<()>;
610
611    /// Returns if the socket is running in promiscuous mode whereby it
612    /// will receive all packets even if they are not destined for the
613    /// local interface
614    fn promiscuous(&self) -> Result<bool>;
615}
616
617pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync + 'static {
618    /// Sets the receive buffer size which acts as a throttle for how
619    /// much data is buffered on this side of the pipe
620    fn set_recv_buf_size(&mut self, size: usize) -> Result<()>;
621
622    /// Size of the receive buffer that holds all data that has not
623    /// yet been read
624    fn recv_buf_size(&self) -> Result<usize>;
625
626    /// Sets the size of the send buffer which will hold the bytes of
627    /// data while they are being sent over to the peer
628    fn set_send_buf_size(&mut self, size: usize) -> Result<()>;
629
630    /// Size of the send buffer that holds all data that is currently
631    /// being transmitted.
632    fn send_buf_size(&self) -> Result<usize>;
633
634    /// When NO_DELAY is set the data that needs to be transmitted to
635    /// the peer is sent immediately rather than waiting for a bigger
636    /// batch of data, this reduces latency but increases encapsulation
637    /// overhead.
638    fn set_nodelay(&mut self, reuse: bool) -> Result<()>;
639
640    /// Indicates if the NO_DELAY flag is set which means that data
641    /// is immediately sent to the peer without waiting. This reduces
642    /// latency but increases encapsulation overhead.
643    fn nodelay(&self) -> Result<bool>;
644
645    /// When KEEP_ALIVE is set the connection will periodically send
646    /// an empty data packet to the server to make sure the connection
647    /// stays alive.
648    fn set_keepalive(&mut self, keepalive: bool) -> Result<()>;
649
650    /// Indicates if the KEEP_ALIVE flag is set which means that the
651    /// socket will periodically send an empty data packet to keep
652    /// the connection alive.
653    fn keepalive(&self) -> Result<bool>;
654
655    /// When DONT_ROUTE is set the packet will be sent directly
656    /// to the interface without passing through the routing logic.
657    fn set_dontroute(&mut self, keepalive: bool) -> Result<()>;
658
659    /// Indicates if the packet will pass straight through to
660    /// the interface bypassing the routing logic.
661    fn dontroute(&self) -> Result<bool>;
662
663    /// Returns the address (IP and Port) of the peer socket that this
664    /// is connected to
665    fn addr_peer(&self) -> Result<SocketAddr>;
666
667    /// Shuts down either the READER or WRITER sides of the socket
668    /// connection.
669    fn shutdown(&mut self, how: Shutdown) -> Result<()>;
670
671    /// Return true if the socket is closed
672    fn is_closed(&self) -> bool;
673}
674
675#[cfg(feature = "tokio")]
676impl<'a> AsyncRead for Box<dyn VirtualTcpSocket + Sync + 'a> {
677    fn poll_read(
678        self: Pin<&mut Self>,
679        cx: &mut Context<'_>,
680        buf: &mut tokio::io::ReadBuf<'_>,
681    ) -> Poll<std::io::Result<()>> {
682        let this = self.get_mut();
683        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
684        if let Err(err) = this.set_handler(handler) {
685            return Poll::Ready(Err(net_error_into_io_err(err)));
686        }
687        let buf_unsafe = unsafe { buf.unfilled_mut() };
688        match this.try_recv(buf_unsafe, false) {
689            Ok(ret) => {
690                unsafe { buf.assume_init(ret) };
691                buf.set_filled(ret);
692                Poll::Ready(Ok(()))
693            }
694            Err(NetworkError::WouldBlock) => Poll::Pending,
695            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
696        }
697    }
698}
699
700#[cfg(feature = "tokio")]
701impl<'a> AsyncWrite for Box<dyn VirtualTcpSocket + Sync + 'a> {
702    fn poll_write(
703        self: Pin<&mut Self>,
704        cx: &mut Context<'_>,
705        buf: &[u8],
706    ) -> Poll<std::io::Result<usize>> {
707        let this = self.get_mut();
708        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
709        if let Err(err) = this.set_handler(handler) {
710            return Poll::Ready(Err(net_error_into_io_err(err)));
711        }
712        match this.try_send(buf) {
713            Ok(ret) => Poll::Ready(Ok(ret)),
714            Err(NetworkError::WouldBlock) => Poll::Pending,
715            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
716        }
717    }
718
719    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
720        let this = self.get_mut();
721        let handler: Box<dyn InterestHandler + Send + Sync> = cx.waker().into();
722        if let Err(err) = this.set_handler(handler) {
723            return Poll::Ready(Err(net_error_into_io_err(err)));
724        }
725        match this.try_flush() {
726            Ok(()) => Poll::Ready(Ok(())),
727            Err(NetworkError::WouldBlock) => Poll::Pending,
728            Err(err) => Poll::Ready(Err(net_error_into_io_err(err))),
729        }
730    }
731
732    fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
733        Poll::Ready(
734            self.shutdown(Shutdown::Write)
735                .map_err(net_error_into_io_err),
736        )
737    }
738}
739
740pub trait VirtualUdpSocket:
741    VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
742{
743    /// Sets a flag that means that the UDP socket is able
744    /// to receive and process broadcast packets.
745    fn set_broadcast(&mut self, broadcast: bool) -> Result<()>;
746
747    /// Indicates if the SO_BROADCAST flag is set which means
748    /// that the UDP socket will receive and process broadcast
749    /// packets
750    fn broadcast(&self) -> Result<bool>;
751
752    /// Sets a flag that indicates if multicast packets that
753    /// this socket is a member of will be looped back to
754    /// the sending socket. This applies to IPv4 addresses
755    fn set_multicast_loop_v4(&mut self, val: bool) -> Result<()>;
756
757    /// Gets a flag that indicates if multicast packets that
758    /// this socket is a member of will be looped back to
759    /// the sending socket. This applies to IPv4 addresses
760    fn multicast_loop_v4(&self) -> Result<bool>;
761
762    /// Sets a flag that indicates if multicast packets that
763    /// this socket is a member of will be looped back to
764    /// the sending socket. This applies to IPv6 addresses
765    fn set_multicast_loop_v6(&mut self, val: bool) -> Result<()>;
766
767    /// Gets a flag that indicates if multicast packets that
768    /// this socket is a member of will be looped back to
769    /// the sending socket. This applies to IPv6 addresses
770    fn multicast_loop_v6(&self) -> Result<bool>;
771
772    /// Sets the TTL for IPv4 multicast packets which is the
773    /// number of network hops before the packet is dropped
774    fn set_multicast_ttl_v4(&mut self, ttl: u32) -> Result<()>;
775
776    /// Gets the TTL for IPv4 multicast packets which is the
777    /// number of network hops before the packet is dropped
778    fn multicast_ttl_v4(&self) -> Result<u32>;
779
780    /// Tells this interface that it will subscribe to a
781    /// particular multicast address. This applies to IPv4 addresses
782    fn join_multicast_v4(&mut self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<()>;
783
784    /// Tells this interface that it will unsubscribe to a
785    /// particular multicast address. This applies to IPv4 addresses
786    fn leave_multicast_v4(&mut self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<()>;
787
788    /// Tells this interface that it will subscribe to a
789    /// particular multicast address. This applies to IPv6 addresses
790    fn join_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<()>;
791
792    /// Tells this interface that it will unsubscribe to a
793    /// particular multicast address. This applies to IPv6 addresses
794    fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<()>;
795
796    /// Returns the remote address of this UDP socket if it has been
797    /// connected to a specific target destination address
798    fn addr_peer(&self) -> Result<Option<SocketAddr>>;
799}
800
801#[derive(Debug, Default)]
802pub struct UnsupportedVirtualNetworking {}
803
804#[async_trait::async_trait]
805impl VirtualNetworking for UnsupportedVirtualNetworking {}
806
807#[derive(Error, Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
808pub enum NetworkError {
809    /// The handle given was not usable
810    #[error("invalid fd")]
811    InvalidFd,
812    /// File exists
813    #[error("file exists")]
814    AlreadyExists,
815    /// The filesystem has failed to lock a resource.
816    #[error("lock error")]
817    Lock,
818    /// Something failed when doing IO. These errors can generally not be handled.
819    /// It may work if tried again.
820    #[error("io error")]
821    IOError,
822    /// The address was in use
823    #[error("address is in use")]
824    AddressInUse,
825    /// The address could not be found
826    #[error("address could not be found")]
827    AddressNotAvailable,
828    /// A pipe was closed
829    #[error("broken pipe (was closed)")]
830    BrokenPipe,
831    /// Insufficient memory
832    #[error("Insufficient memory")]
833    InsufficientMemory,
834    /// The connection was aborted
835    #[error("connection aborted")]
836    ConnectionAborted,
837    /// The connection request was refused
838    #[error("connection refused")]
839    ConnectionRefused,
840    /// The connection was reset
841    #[error("connection reset")]
842    ConnectionReset,
843    /// The operation was interrupted before it could finish
844    #[error("operation interrupted")]
845    Interrupted,
846    /// Invalid internal data, if the argument data is invalid, use `InvalidInput`
847    #[error("invalid internal data")]
848    InvalidData,
849    /// The provided data is invalid
850    #[error("invalid input")]
851    InvalidInput,
852    /// Could not perform the operation because there was not an open connection
853    #[error("connection is not open")]
854    NotConnected,
855    /// The requested device couldn't be accessed
856    #[error("can't access device")]
857    NoDevice,
858    /// Caller was not allowed to perform this operation
859    #[error("permission denied")]
860    PermissionDenied,
861    /// The operation did not complete within the given amount of time
862    #[error("time out")]
863    TimedOut,
864    /// Found EOF when EOF was not expected
865    #[error("unexpected eof")]
866    UnexpectedEof,
867    /// Operation would block, this error lets the caller know that they can try again
868    #[error("blocking operation. try again")]
869    WouldBlock,
870    /// A call to write returned 0
871    #[error("write returned 0")]
872    WriteZero,
873    /// Too many open files
874    #[error("too many open files")]
875    TooManyOpenFiles,
876    /// The operation is not supported.
877    #[error("unsupported")]
878    Unsupported,
879    /// Some other unhandled error. If you see this, it's probably a bug.
880    #[error("unknown error found")]
881    UnknownError,
882}
883
884pub fn io_err_into_net_error(net_error: std::io::Error) -> NetworkError {
885    use std::io::ErrorKind;
886    match net_error.kind() {
887        ErrorKind::BrokenPipe => NetworkError::BrokenPipe,
888        ErrorKind::AlreadyExists => NetworkError::AlreadyExists,
889        ErrorKind::AddrInUse => NetworkError::AddressInUse,
890        ErrorKind::AddrNotAvailable => NetworkError::AddressNotAvailable,
891        ErrorKind::ConnectionAborted => NetworkError::ConnectionAborted,
892        ErrorKind::ConnectionRefused => NetworkError::ConnectionRefused,
893        ErrorKind::ConnectionReset => NetworkError::ConnectionReset,
894        ErrorKind::Interrupted => NetworkError::Interrupted,
895        ErrorKind::InvalidData => NetworkError::InvalidData,
896        ErrorKind::InvalidInput => NetworkError::InvalidInput,
897        ErrorKind::NotConnected => NetworkError::NotConnected,
898        ErrorKind::PermissionDenied => NetworkError::PermissionDenied,
899        ErrorKind::TimedOut => NetworkError::TimedOut,
900        ErrorKind::UnexpectedEof => NetworkError::UnexpectedEof,
901        ErrorKind::WouldBlock => NetworkError::WouldBlock,
902        ErrorKind::WriteZero => NetworkError::WriteZero,
903        ErrorKind::Unsupported => NetworkError::Unsupported,
904
905        #[cfg(all(target_family = "unix", feature = "libc"))]
906        _ => {
907            if let Some(code) = net_error.raw_os_error() {
908                match code {
909                    libc::EPERM => NetworkError::PermissionDenied,
910                    libc::EBADF => NetworkError::InvalidFd,
911                    libc::ECHILD => NetworkError::InvalidFd,
912                    libc::EMFILE => NetworkError::TooManyOpenFiles,
913                    libc::EINTR => NetworkError::Interrupted,
914                    libc::EIO => NetworkError::IOError,
915                    libc::ENXIO => NetworkError::IOError,
916                    libc::EAGAIN => NetworkError::WouldBlock,
917                    libc::ENOMEM => NetworkError::InsufficientMemory,
918                    libc::EACCES => NetworkError::PermissionDenied,
919                    libc::ENODEV => NetworkError::NoDevice,
920                    libc::EINVAL => NetworkError::InvalidInput,
921                    libc::EPIPE => NetworkError::BrokenPipe,
922                    err => {
923                        tracing::trace!("unknown os error {}", err);
924                        NetworkError::UnknownError
925                    }
926                }
927            } else {
928                NetworkError::UnknownError
929            }
930        }
931        #[cfg(not(all(target_family = "unix", feature = "libc")))]
932        _ => NetworkError::UnknownError,
933    }
934}
935
936pub fn net_error_into_io_err(net_error: NetworkError) -> std::io::Error {
937    use std::io::ErrorKind;
938    match net_error {
939        NetworkError::InvalidFd => ErrorKind::BrokenPipe.into(),
940        NetworkError::AlreadyExists => ErrorKind::AlreadyExists.into(),
941        NetworkError::Lock => ErrorKind::BrokenPipe.into(),
942        NetworkError::IOError => ErrorKind::BrokenPipe.into(),
943        NetworkError::AddressInUse => ErrorKind::AddrInUse.into(),
944        NetworkError::AddressNotAvailable => ErrorKind::AddrNotAvailable.into(),
945        NetworkError::BrokenPipe => ErrorKind::BrokenPipe.into(),
946        NetworkError::ConnectionAborted => ErrorKind::ConnectionAborted.into(),
947        NetworkError::ConnectionRefused => ErrorKind::ConnectionRefused.into(),
948        NetworkError::ConnectionReset => ErrorKind::ConnectionReset.into(),
949        NetworkError::Interrupted => ErrorKind::Interrupted.into(),
950        NetworkError::InvalidData => ErrorKind::InvalidData.into(),
951        NetworkError::InvalidInput => ErrorKind::InvalidInput.into(),
952        NetworkError::NotConnected => ErrorKind::NotConnected.into(),
953        NetworkError::NoDevice => ErrorKind::BrokenPipe.into(),
954        NetworkError::PermissionDenied => ErrorKind::PermissionDenied.into(),
955        NetworkError::TimedOut => ErrorKind::TimedOut.into(),
956        NetworkError::UnexpectedEof => ErrorKind::UnexpectedEof.into(),
957        NetworkError::WouldBlock => ErrorKind::WouldBlock.into(),
958        NetworkError::WriteZero => ErrorKind::WriteZero.into(),
959        NetworkError::Unsupported => ErrorKind::Unsupported.into(),
960        NetworkError::UnknownError => ErrorKind::BrokenPipe.into(),
961        NetworkError::InsufficientMemory => ErrorKind::OutOfMemory.into(),
962        NetworkError::TooManyOpenFiles => {
963            #[cfg(all(target_family = "unix", feature = "libc"))]
964            {
965                std::io::Error::from_raw_os_error(libc::EMFILE)
966            }
967            #[cfg(not(all(target_family = "unix", feature = "libc")))]
968            {
969                ErrorKind::Other.into()
970            }
971        }
972    }
973}