wasmer_wasix/net/
socket.rs

1use std::{
2    future::Future,
3    io,
4    mem::MaybeUninit,
5    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
6    pin::Pin,
7    sync::{Arc, RwLock, RwLockWriteGuard},
8    task::{Context, Poll},
9    time::Duration,
10};
11
12#[cfg(feature = "enable-serde")]
13use serde_derive::{Deserialize, Serialize};
14use virtual_mio::InterestHandler;
15use virtual_net::{
16    NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpListener,
17    VirtualTcpSocket, VirtualUdpSocket, net_error_into_io_err,
18};
19use wasmer_types::MemorySize;
20use wasmer_wasix_types::wasi::{Addressfamily, Errno, Rights, SockProto, Sockoption, Socktype};
21
22use crate::{VirtualTaskManager, net::net_error_into_wasi_err};
23
24#[derive(Debug)]
25#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
26pub enum InodeHttpSocketType {
27    /// Used to feed the bytes into the request itself
28    Request,
29    /// Used to receive the bytes from the HTTP server
30    Response,
31    /// Used to read the headers from the HTTP server
32    Headers,
33}
34
35#[derive(Debug)]
36pub struct SocketProperties {
37    pub family: Addressfamily,
38    pub ty: Socktype,
39    pub pt: SockProto,
40    pub only_v6: bool,
41    pub reuse_port: bool,
42    pub reuse_addr: bool,
43    pub no_delay: Option<bool>,
44    pub keep_alive: Option<bool>,
45    pub dont_route: Option<bool>,
46    pub send_buf_size: Option<usize>,
47    pub recv_buf_size: Option<usize>,
48    pub write_timeout: Option<Duration>,
49    pub read_timeout: Option<Duration>,
50    pub accept_timeout: Option<Duration>,
51    pub connect_timeout: Option<Duration>,
52    pub handler: Option<Box<dyn InterestHandler + Send + Sync>>,
53}
54
55#[derive(Debug)]
56//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
57pub enum InodeSocketKind {
58    PreSocket {
59        props: SocketProperties,
60        addr: Option<SocketAddr>,
61    },
62    Icmp(Box<dyn VirtualIcmpSocket + Sync>),
63    Raw(Box<dyn VirtualRawSocket + Sync>),
64    TcpListener {
65        socket: Box<dyn VirtualTcpListener + Sync>,
66        accept_timeout: Option<Duration>,
67    },
68    TcpStream {
69        socket: Box<dyn VirtualTcpSocket + Sync>,
70        write_timeout: Option<Duration>,
71        read_timeout: Option<Duration>,
72    },
73    UdpSocket {
74        socket: Box<dyn VirtualUdpSocket + Sync>,
75        peer: Option<SocketAddr>,
76    },
77    RemoteSocket {
78        props: SocketProperties,
79        local_addr: SocketAddr,
80        peer_addr: SocketAddr,
81        ttl: u32,
82        multicast_ttl: u32,
83        is_dead: bool,
84    },
85}
86
87pub enum WasiSocketOption {
88    Noop,
89    ReusePort,
90    ReuseAddr,
91    NoDelay,
92    DontRoute,
93    OnlyV6,
94    Broadcast,
95    MulticastLoopV4,
96    MulticastLoopV6,
97    Promiscuous,
98    Listening,
99    LastError,
100    KeepAlive,
101    Linger,
102    OobInline,
103    RecvBufSize,
104    SendBufSize,
105    RecvLowat,
106    SendLowat,
107    RecvTimeout,
108    SendTimeout,
109    ConnectTimeout,
110    AcceptTimeout,
111    Ttl,
112    MulticastTtlV4,
113    Type,
114    Proto,
115}
116
117impl TryFrom<Sockoption> for WasiSocketOption {
118    type Error = Errno;
119
120    fn try_from(opt: Sockoption) -> Result<Self, Self::Error> {
121        use WasiSocketOption::*;
122        match opt {
123            Sockoption::Noop => Ok(Noop),
124            Sockoption::ReusePort => Ok(ReusePort),
125            Sockoption::ReuseAddr => Ok(ReuseAddr),
126            Sockoption::NoDelay => Ok(NoDelay),
127            Sockoption::DontRoute => Ok(DontRoute),
128            Sockoption::OnlyV6 => Ok(OnlyV6),
129            Sockoption::Broadcast => Ok(Broadcast),
130            Sockoption::MulticastLoopV4 => Ok(MulticastLoopV4),
131            Sockoption::MulticastLoopV6 => Ok(MulticastLoopV6),
132            Sockoption::Promiscuous => Ok(Promiscuous),
133            Sockoption::Listening => Ok(Listening),
134            Sockoption::LastError => Ok(LastError),
135            Sockoption::KeepAlive => Ok(KeepAlive),
136            Sockoption::Linger => Ok(Linger),
137            Sockoption::OobInline => Ok(OobInline),
138            Sockoption::RecvBufSize => Ok(RecvBufSize),
139            Sockoption::SendBufSize => Ok(SendBufSize),
140            Sockoption::RecvLowat => Ok(RecvLowat),
141            Sockoption::SendLowat => Ok(SendLowat),
142            Sockoption::RecvTimeout => Ok(RecvTimeout),
143            Sockoption::SendTimeout => Ok(SendTimeout),
144            Sockoption::ConnectTimeout => Ok(ConnectTimeout),
145            Sockoption::AcceptTimeout => Ok(AcceptTimeout),
146            Sockoption::Ttl => Ok(Ttl),
147            Sockoption::MulticastTtlV4 => Ok(MulticastTtlV4),
148            Sockoption::Type => Ok(Type),
149            Sockoption::Proto => Ok(Proto),
150            _ => Err(Errno::Inval),
151        }
152    }
153}
154
155#[derive(Debug)]
156pub enum WasiSocketStatus {
157    Opening,
158    Opened,
159    Closed,
160    Failed,
161}
162
163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
165pub enum TimeType {
166    ReadTimeout,
167    WriteTimeout,
168    AcceptTimeout,
169    ConnectTimeout,
170    BindTimeout,
171    Linger,
172}
173
174impl From<TimeType> for wasmer_journal::SocketOptTimeType {
175    fn from(value: TimeType) -> Self {
176        match value {
177            TimeType::ReadTimeout => Self::ReadTimeout,
178            TimeType::WriteTimeout => Self::WriteTimeout,
179            TimeType::AcceptTimeout => Self::AcceptTimeout,
180            TimeType::ConnectTimeout => Self::ConnectTimeout,
181            TimeType::BindTimeout => Self::BindTimeout,
182            TimeType::Linger => Self::Linger,
183        }
184    }
185}
186
187impl From<wasmer_journal::SocketOptTimeType> for TimeType {
188    fn from(value: wasmer_journal::SocketOptTimeType) -> Self {
189        use wasmer_journal::SocketOptTimeType;
190        match value {
191            SocketOptTimeType::ReadTimeout => TimeType::ReadTimeout,
192            SocketOptTimeType::WriteTimeout => TimeType::WriteTimeout,
193            SocketOptTimeType::AcceptTimeout => TimeType::AcceptTimeout,
194            SocketOptTimeType::ConnectTimeout => TimeType::ConnectTimeout,
195            SocketOptTimeType::BindTimeout => TimeType::BindTimeout,
196            SocketOptTimeType::Linger => TimeType::Linger,
197        }
198    }
199}
200
201#[derive(Debug)]
202//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
203pub(crate) struct InodeSocketProtected {
204    pub kind: InodeSocketKind,
205}
206
207#[derive(Debug)]
208//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
209pub(crate) struct InodeSocketInner {
210    pub protected: RwLock<InodeSocketProtected>,
211}
212
213#[derive(Debug, Clone)]
214//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
215pub struct InodeSocket {
216    pub(crate) inner: Arc<InodeSocketInner>,
217}
218
219impl InodeSocket {
220    pub fn new(kind: InodeSocketKind) -> Self {
221        let protected = InodeSocketProtected { kind };
222        Self {
223            inner: Arc::new(InodeSocketInner {
224                protected: RwLock::new(protected),
225            }),
226        }
227    }
228
229    pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
230        let mut inner = self.inner.protected.write().unwrap();
231        inner.poll_read_ready(cx)
232    }
233
234    pub fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
235        let mut inner = self.inner.protected.write().unwrap();
236        inner.poll_write_ready(cx)
237    }
238
239    // When a sendto or connect call comes in for a UDP "pre-socket", it must be bound to
240    // an ephemeral port automatically.
241    // Apparently, clippy fails to recognize the write-locked guard being passed into
242    // the other function, hence the `allow` attribute.
243    #[allow(clippy::await_holding_lock, clippy::readonly_write_lock)]
244    pub async fn auto_bind_udp(
245        &self,
246        tasks: &dyn VirtualTaskManager,
247        net: &dyn VirtualNetworking,
248    ) -> Result<Option<InodeSocket>, Errno> {
249        let timeout = self
250            .opt_time(TimeType::BindTimeout)
251            .ok()
252            .flatten()
253            .unwrap_or(Duration::from_secs(30));
254        let inner = self.inner.protected.write().unwrap();
255        match &inner.kind {
256            InodeSocketKind::PreSocket { props, .. } if props.ty == Socktype::Dgram => {
257                let addr = match props.family {
258                    Addressfamily::Inet4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
259                    Addressfamily::Inet6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
260                    _ => return Err(Errno::Notsup),
261                };
262                Self::bind_internal(tasks, net, addr, timeout, inner).await
263            }
264            _ => Ok(None),
265        }
266    }
267
268    pub async fn bind(
269        &self,
270        tasks: &dyn VirtualTaskManager,
271        net: &dyn VirtualNetworking,
272        set_addr: SocketAddr,
273    ) -> Result<Option<InodeSocket>, Errno> {
274        let timeout = self
275            .opt_time(TimeType::BindTimeout)
276            .ok()
277            .flatten()
278            .unwrap_or(Duration::from_secs(30));
279        let inner = self.inner.protected.write().unwrap();
280        Self::bind_internal(tasks, net, set_addr, timeout, inner).await
281    }
282
283    // The lock is dropped before awaiting, but clippy doesn't realize it
284    #[allow(clippy::await_holding_lock)]
285    async fn bind_internal(
286        tasks: &dyn VirtualTaskManager,
287        net: &dyn VirtualNetworking,
288        set_addr: SocketAddr,
289        timeout: Duration,
290        mut inner: RwLockWriteGuard<'_, InodeSocketProtected>,
291    ) -> Result<Option<InodeSocket>, Errno> {
292        let socket = {
293            match &mut inner.kind {
294                InodeSocketKind::PreSocket { props, addr, .. } => {
295                    match props.family {
296                        Addressfamily::Inet4 => {
297                            if !set_addr.is_ipv4() {
298                                tracing::debug!(
299                                    "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
300                                );
301                                return Err(Errno::Inval);
302                            }
303                        }
304                        Addressfamily::Inet6 => {
305                            if !set_addr.is_ipv6() {
306                                tracing::debug!(
307                                    "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
308                                );
309                                return Err(Errno::Inval);
310                            }
311                        }
312                        _ => {
313                            return Err(Errno::Notsup);
314                        }
315                    }
316
317                    addr.replace(set_addr);
318                    let addr = (*addr).unwrap();
319
320                    match props.ty {
321                        Socktype::Stream => {
322                            // we already set the socket address - next we need a listen or connect so nothing
323                            // more to do at this time
324                            return Ok(None);
325                        }
326                        Socktype::Dgram => {
327                            let reuse_port = props.reuse_port;
328                            let reuse_addr = props.reuse_addr;
329
330                            net.bind_udp(addr, reuse_port, reuse_addr)
331                        }
332                        _ => return Err(Errno::Inval),
333                    }
334                }
335                InodeSocketKind::RemoteSocket {
336                    props,
337                    local_addr: addr,
338                    ..
339                } => {
340                    match props.family {
341                        Addressfamily::Inet4 => {
342                            if !set_addr.is_ipv4() {
343                                tracing::debug!(
344                                    "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
345                                );
346                                return Err(Errno::Inval);
347                            }
348                        }
349                        Addressfamily::Inet6 => {
350                            if !set_addr.is_ipv6() {
351                                tracing::debug!(
352                                    "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
353                                );
354                                return Err(Errno::Inval);
355                            }
356                        }
357                        _ => {
358                            return Err(Errno::Notsup);
359                        }
360                    }
361
362                    *addr = set_addr;
363                    let addr = *addr;
364
365                    match props.ty {
366                        Socktype::Stream => {
367                            // we already set the socket address - next we need a listen or connect so nothing
368                            // more to do at this time
369                            return Ok(None);
370                        }
371                        Socktype::Dgram => {
372                            let reuse_port = props.reuse_port;
373                            let reuse_addr = props.reuse_addr;
374
375                            net.bind_udp(addr, reuse_port, reuse_addr)
376                        }
377                        _ => return Err(Errno::Inval),
378                    }
379                }
380                _ => return Err(Errno::Notsup),
381            }
382        };
383
384        drop(inner);
385
386        tokio::select! {
387            socket = socket => {
388                let socket = socket.map_err(net_error_into_wasi_err)?;
389                Ok(Some(InodeSocket::new(InodeSocketKind::UdpSocket { socket, peer: None })))
390            },
391            _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
392        }
393    }
394
395    pub async fn listen(
396        &self,
397        tasks: &dyn VirtualTaskManager,
398        net: &dyn VirtualNetworking,
399        _backlog: usize,
400    ) -> Result<Option<InodeSocket>, Errno> {
401        let timeout = self
402            .opt_time(TimeType::AcceptTimeout)
403            .ok()
404            .flatten()
405            .unwrap_or(Duration::from_secs(30));
406
407        let socket = {
408            let inner = self.inner.protected.read().unwrap();
409            match &inner.kind {
410                InodeSocketKind::PreSocket { props, addr, .. } => match props.ty {
411                    Socktype::Stream => {
412                        if addr.is_none() {
413                            tracing::warn!("wasi[?]::sock_listen - failed - address not set");
414                            return Err(Errno::Inval);
415                        }
416                        let addr = *addr.as_ref().unwrap();
417                        let only_v6 = props.only_v6;
418                        let reuse_port = props.reuse_port;
419                        let reuse_addr = props.reuse_addr;
420                        drop(inner);
421
422                        net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
423                    }
424                    ty => {
425                        tracing::warn!(
426                            "wasi[?]::sock_listen - failed - not supported(pre-socket:{:?})",
427                            ty
428                        );
429                        return Err(Errno::Notsup);
430                    }
431                },
432                InodeSocketKind::RemoteSocket {
433                    props,
434                    local_addr: addr,
435                    ..
436                } => match props.ty {
437                    Socktype::Stream => {
438                        let addr = *addr;
439                        let only_v6 = props.only_v6;
440                        let reuse_port = props.reuse_port;
441                        let reuse_addr = props.reuse_addr;
442                        drop(inner);
443
444                        net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
445                    }
446                    ty => {
447                        tracing::warn!(
448                            "wasi[?]::sock_listen - failed - not supported(remote-socket:{:?})",
449                            ty
450                        );
451                        return Err(Errno::Notsup);
452                    }
453                },
454                InodeSocketKind::Icmp(_) => {
455                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(icmp)");
456                    return Err(Errno::Notsup);
457                }
458                InodeSocketKind::Raw(_) => {
459                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(raw)");
460                    return Err(Errno::Notsup);
461                }
462                InodeSocketKind::TcpListener { .. } => {
463                    tracing::warn!(
464                        "wasi[?]::sock_listen - failed - already listening (tcp-listener)"
465                    );
466                    return Err(Errno::Notsup);
467                }
468                InodeSocketKind::TcpStream { .. } => {
469                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(tcp-stream)");
470                    return Err(Errno::Notsup);
471                }
472                InodeSocketKind::UdpSocket { .. } => {
473                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(udp-socket)");
474                    return Err(Errno::Notsup);
475                }
476            }
477        };
478
479        tokio::select! {
480            socket = socket => {
481                let socket = socket.map_err(net_error_into_wasi_err)?;
482                Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
483                    socket,
484                    accept_timeout: Some(timeout),
485                })))
486            },
487            _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
488        }
489    }
490
491    pub async fn accept(
492        &self,
493        tasks: &dyn VirtualTaskManager,
494        nonblocking: bool,
495        timeout: Option<Duration>,
496    ) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno> {
497        struct SocketAccepter<'a> {
498            sock: &'a InodeSocket,
499            nonblocking: bool,
500            handler_registered: bool,
501        }
502        impl Drop for SocketAccepter<'_> {
503            fn drop(&mut self) {
504                if self.handler_registered {
505                    let mut inner = self.sock.inner.protected.write().unwrap();
506                    inner.remove_handler();
507                }
508            }
509        }
510        impl Future for SocketAccepter<'_> {
511            type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno>;
512            fn poll(
513                mut self: Pin<&mut Self>,
514                cx: &mut std::task::Context<'_>,
515            ) -> std::task::Poll<Self::Output> {
516                loop {
517                    let mut inner = self.sock.inner.protected.write().unwrap();
518                    return match &mut inner.kind {
519                        InodeSocketKind::TcpListener { socket, .. } => match socket.try_accept() {
520                            Ok((child, addr)) => Poll::Ready(Ok((child, addr))),
521                            Err(NetworkError::WouldBlock) if self.nonblocking => {
522                                Poll::Ready(Err(Errno::Again))
523                            }
524                            Err(NetworkError::WouldBlock) if !self.handler_registered => {
525                                let res = socket.set_handler(cx.waker().into());
526                                if let Err(err) = res {
527                                    return Poll::Ready(Err(net_error_into_wasi_err(err)));
528                                }
529                                drop(inner);
530                                self.handler_registered = true;
531                                continue;
532                            }
533                            Err(NetworkError::WouldBlock) => Poll::Pending,
534                            Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
535                        },
536                        InodeSocketKind::PreSocket { .. } => Poll::Ready(Err(Errno::Notconn)),
537                        _ => Poll::Ready(Err(Errno::Notsup)),
538                    };
539                }
540            }
541        }
542
543        let acceptor = SocketAccepter {
544            sock: self,
545            nonblocking,
546            handler_registered: false,
547        };
548        if let Some(timeout) = timeout {
549            tokio::select! {
550                res = acceptor => res,
551                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
552            }
553        } else {
554            acceptor.await
555        }
556    }
557
558    pub fn close(&self) -> Result<(), Errno> {
559        let mut inner = self.inner.protected.write().unwrap();
560        match &mut inner.kind {
561            InodeSocketKind::TcpListener { .. } => {}
562            InodeSocketKind::TcpStream { socket, .. } => {
563                socket.close().map_err(net_error_into_wasi_err)?;
564            }
565            InodeSocketKind::Icmp(_) => {}
566            InodeSocketKind::UdpSocket { .. } => {}
567            InodeSocketKind::Raw(_) => {}
568            InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
569            InodeSocketKind::RemoteSocket { .. } => {}
570        };
571        Ok(())
572    }
573
574    pub async fn connect(
575        &mut self,
576        tasks: &dyn VirtualTaskManager,
577        net: &dyn VirtualNetworking,
578        peer: SocketAddr,
579        timeout: Option<std::time::Duration>,
580        nonblocking: bool,
581    ) -> Result<Option<InodeSocket>, Errno> {
582        let new_write_timeout;
583        let new_read_timeout;
584
585        let timeout = timeout.unwrap_or(Duration::from_secs(30));
586
587        let handler;
588        let connect = {
589            let mut inner = self.inner.protected.write().unwrap();
590            match &mut inner.kind {
591                InodeSocketKind::PreSocket { props, addr, .. } => {
592                    handler = props.handler.take();
593                    new_write_timeout = props.write_timeout;
594                    new_read_timeout = props.read_timeout;
595                    match props.ty {
596                        Socktype::Stream => {
597                            let no_delay = props.no_delay;
598                            let keep_alive = props.keep_alive;
599                            let dont_route = props.dont_route;
600                            let addr = match addr {
601                                Some(a) => *a,
602                                None => {
603                                    let ip = match peer.is_ipv4() {
604                                        true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
605                                        false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
606                                    };
607                                    SocketAddr::new(ip, 0)
608                                }
609                            };
610                            Box::pin(async move {
611                                let mut ret = net.connect_tcp(addr, peer).await?;
612                                if let Some(no_delay) = no_delay {
613                                    ret.set_nodelay(no_delay).ok();
614                                }
615                                if let Some(keep_alive) = keep_alive {
616                                    ret.set_keepalive(keep_alive).ok();
617                                }
618                                if let Some(dont_route) = dont_route {
619                                    ret.set_dontroute(dont_route).ok();
620                                }
621                                if !nonblocking {
622                                    futures::future::poll_fn(|cx| ret.poll_write_ready(cx)).await?;
623                                }
624                                Ok(ret)
625                            })
626                        }
627                        Socktype::Dgram => return Err(Errno::Inval),
628                        _ => return Err(Errno::Notsup),
629                    }
630                }
631                InodeSocketKind::UdpSocket {
632                    peer: target_peer, ..
633                } => {
634                    target_peer.replace(peer);
635                    return Ok(None);
636                }
637                InodeSocketKind::RemoteSocket { peer_addr, .. } => {
638                    *peer_addr = peer;
639                    return Ok(None);
640                }
641                _ => return Err(Errno::Notsup),
642            }
643        };
644
645        let mut socket = tokio::select! {
646            res = connect => res.map_err(net_error_into_wasi_err)?,
647            _ = tasks.sleep_now(timeout) => return Err(Errno::Timedout)
648        };
649
650        if let Some(handler) = handler {
651            socket
652                .set_handler(handler)
653                .map_err(net_error_into_wasi_err)?;
654        }
655
656        let socket = InodeSocket::new(InodeSocketKind::TcpStream {
657            socket,
658            write_timeout: new_write_timeout,
659            read_timeout: new_read_timeout,
660        });
661
662        Ok(Some(socket))
663    }
664
665    pub fn status(&self) -> Result<WasiSocketStatus, Errno> {
666        let inner = self.inner.protected.read().unwrap();
667        Ok(match &inner.kind {
668            InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
669            InodeSocketKind::TcpListener { .. } => WasiSocketStatus::Opened,
670            InodeSocketKind::TcpStream { socket, .. } => match socket.status() {
671                Ok(virtual_net::SocketStatus::Opening) => WasiSocketStatus::Opening,
672                Ok(virtual_net::SocketStatus::Opened) => WasiSocketStatus::Opened,
673                Ok(virtual_net::SocketStatus::Closed) => WasiSocketStatus::Closed,
674                Ok(virtual_net::SocketStatus::Failed) => WasiSocketStatus::Failed,
675                Err(_) => WasiSocketStatus::Failed,
676            },
677            InodeSocketKind::UdpSocket { .. } => WasiSocketStatus::Opened,
678            InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
679                true => WasiSocketStatus::Closed,
680                false => WasiSocketStatus::Opened,
681            },
682            _ => WasiSocketStatus::Failed,
683        })
684    }
685
686    pub fn addr_local(&self) -> Result<SocketAddr, Errno> {
687        let inner = self.inner.protected.read().unwrap();
688        Ok(match &inner.kind {
689            InodeSocketKind::PreSocket { props, addr, .. } => {
690                if let Some(addr) = addr {
691                    *addr
692                } else {
693                    SocketAddr::new(
694                        match props.family {
695                            Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
696                            Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
697                            _ => return Err(Errno::Inval),
698                        },
699                        0,
700                    )
701                }
702            }
703            InodeSocketKind::Icmp(sock) => sock.addr_local().map_err(net_error_into_wasi_err)?,
704            InodeSocketKind::TcpListener { socket, .. } => {
705                socket.addr_local().map_err(net_error_into_wasi_err)?
706            }
707            InodeSocketKind::TcpStream { socket, .. } => {
708                socket.addr_local().map_err(net_error_into_wasi_err)?
709            }
710            InodeSocketKind::UdpSocket { socket, .. } => {
711                socket.addr_local().map_err(net_error_into_wasi_err)?
712            }
713            InodeSocketKind::RemoteSocket {
714                local_addr: addr, ..
715            } => *addr,
716            _ => return Err(Errno::Notsup),
717        })
718    }
719
720    pub fn addr_peer(&self) -> Result<SocketAddr, Errno> {
721        let inner = self.inner.protected.read().unwrap();
722        Ok(match &inner.kind {
723            InodeSocketKind::PreSocket { props, .. } => SocketAddr::new(
724                match props.family {
725                    Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
726                    Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
727                    _ => return Err(Errno::Inval),
728                },
729                0,
730            ),
731            InodeSocketKind::TcpStream { socket, .. } => {
732                socket.addr_peer().map_err(net_error_into_wasi_err)?
733            }
734            InodeSocketKind::UdpSocket { socket, .. } => socket
735                .addr_peer()
736                .map_err(net_error_into_wasi_err)?
737                .map(Ok)
738                .unwrap_or_else(|| {
739                    socket
740                        .addr_local()
741                        .map_err(net_error_into_wasi_err)
742                        .map(|addr| {
743                            SocketAddr::new(
744                                match addr {
745                                    SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
746                                    SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
747                                },
748                                0,
749                            )
750                        })
751                })?,
752            InodeSocketKind::RemoteSocket { peer_addr, .. } => *peer_addr,
753            _ => return Err(Errno::Notsup),
754        })
755    }
756
757    pub fn set_opt_flag(&mut self, option: WasiSocketOption, val: bool) -> Result<(), Errno> {
758        let mut inner = self.inner.protected.write().unwrap();
759        match &mut inner.kind {
760            InodeSocketKind::PreSocket { props, .. }
761            | InodeSocketKind::RemoteSocket { props, .. } => {
762                match option {
763                    WasiSocketOption::OnlyV6 => props.only_v6 = val,
764                    WasiSocketOption::ReusePort => props.reuse_port = val,
765                    WasiSocketOption::ReuseAddr => props.reuse_addr = val,
766                    WasiSocketOption::NoDelay => props.no_delay = Some(val),
767                    WasiSocketOption::KeepAlive => props.keep_alive = Some(val),
768                    WasiSocketOption::DontRoute => props.dont_route = Some(val),
769                    _ => return Err(Errno::Inval),
770                };
771            }
772            InodeSocketKind::Raw(sock) => match option {
773                WasiSocketOption::Promiscuous => {
774                    sock.set_promiscuous(val).map_err(net_error_into_wasi_err)?
775                }
776                _ => return Err(Errno::Inval),
777            },
778            InodeSocketKind::TcpStream { socket, .. } => match option {
779                WasiSocketOption::NoDelay => {
780                    socket.set_nodelay(val).map_err(net_error_into_wasi_err)?
781                }
782                WasiSocketOption::KeepAlive => {
783                    socket.set_keepalive(val).map_err(net_error_into_wasi_err)?
784                }
785                WasiSocketOption::DontRoute => {
786                    socket.set_dontroute(val).map_err(net_error_into_wasi_err)?
787                }
788                _ => return Err(Errno::Inval),
789            },
790            InodeSocketKind::TcpListener { .. } => return Err(Errno::Inval),
791            InodeSocketKind::UdpSocket { socket, .. } => match option {
792                WasiSocketOption::Broadcast => {
793                    socket.set_broadcast(val).map_err(net_error_into_wasi_err)?
794                }
795                WasiSocketOption::MulticastLoopV4 => socket
796                    .set_multicast_loop_v4(val)
797                    .map_err(net_error_into_wasi_err)?,
798                WasiSocketOption::MulticastLoopV6 => socket
799                    .set_multicast_loop_v6(val)
800                    .map_err(net_error_into_wasi_err)?,
801                _ => return Err(Errno::Inval),
802            },
803            _ => return Err(Errno::Notsup),
804        }
805        Ok(())
806    }
807
808    pub fn get_opt_flag(&self, option: WasiSocketOption) -> Result<bool, Errno> {
809        let mut inner = self.inner.protected.write().unwrap();
810        Ok(match &mut inner.kind {
811            InodeSocketKind::PreSocket { props, .. }
812            | InodeSocketKind::RemoteSocket { props, .. } => match option {
813                WasiSocketOption::OnlyV6 => props.only_v6,
814                WasiSocketOption::ReusePort => props.reuse_port,
815                WasiSocketOption::ReuseAddr => props.reuse_addr,
816                WasiSocketOption::NoDelay => props.no_delay.unwrap_or_default(),
817                WasiSocketOption::KeepAlive => props.keep_alive.unwrap_or_default(),
818                _ => return Err(Errno::Inval),
819            },
820            InodeSocketKind::Raw(sock) => match option {
821                WasiSocketOption::Promiscuous => {
822                    sock.promiscuous().map_err(net_error_into_wasi_err)?
823                }
824                _ => return Err(Errno::Inval),
825            },
826            InodeSocketKind::TcpStream { socket, .. } => match option {
827                WasiSocketOption::NoDelay => socket.nodelay().map_err(net_error_into_wasi_err)?,
828                WasiSocketOption::KeepAlive => {
829                    socket.keepalive().map_err(net_error_into_wasi_err)?
830                }
831                WasiSocketOption::DontRoute => {
832                    socket.dontroute().map_err(net_error_into_wasi_err)?
833                }
834                _ => return Err(Errno::Inval),
835            },
836            InodeSocketKind::UdpSocket { socket, .. } => match option {
837                WasiSocketOption::Broadcast => {
838                    socket.broadcast().map_err(net_error_into_wasi_err)?
839                }
840                WasiSocketOption::MulticastLoopV4 => socket
841                    .multicast_loop_v4()
842                    .map_err(net_error_into_wasi_err)?,
843                WasiSocketOption::MulticastLoopV6 => socket
844                    .multicast_loop_v6()
845                    .map_err(net_error_into_wasi_err)?,
846                _ => return Err(Errno::Inval),
847            },
848            _ => return Err(Errno::Notsup),
849        })
850    }
851
852    pub fn set_send_buf_size(&mut self, size: usize) -> Result<(), Errno> {
853        let mut inner = self.inner.protected.write().unwrap();
854        match &mut inner.kind {
855            InodeSocketKind::PreSocket { props, .. }
856            | InodeSocketKind::RemoteSocket { props, .. } => {
857                props.send_buf_size = Some(size);
858            }
859            InodeSocketKind::TcpStream { socket, .. } => {
860                socket
861                    .set_send_buf_size(size)
862                    .map_err(net_error_into_wasi_err)?;
863            }
864            _ => return Err(Errno::Notsup),
865        }
866        Ok(())
867    }
868
869    pub fn send_buf_size(&self) -> Result<usize, Errno> {
870        let inner = self.inner.protected.read().unwrap();
871        match &inner.kind {
872            InodeSocketKind::PreSocket { props, .. }
873            | InodeSocketKind::RemoteSocket { props, .. } => {
874                Ok(props.send_buf_size.unwrap_or_default())
875            }
876            InodeSocketKind::TcpStream { socket, .. } => {
877                socket.send_buf_size().map_err(net_error_into_wasi_err)
878            }
879            _ => Err(Errno::Notsup),
880        }
881    }
882
883    pub fn set_recv_buf_size(&mut self, size: usize) -> Result<(), Errno> {
884        let mut inner = self.inner.protected.write().unwrap();
885        match &mut inner.kind {
886            InodeSocketKind::PreSocket { props, .. }
887            | InodeSocketKind::RemoteSocket { props, .. } => {
888                props.recv_buf_size = Some(size);
889            }
890            InodeSocketKind::TcpStream { socket, .. } => {
891                socket
892                    .set_recv_buf_size(size)
893                    .map_err(net_error_into_wasi_err)?;
894            }
895            _ => return Err(Errno::Notsup),
896        }
897        Ok(())
898    }
899
900    pub fn recv_buf_size(&self) -> Result<usize, Errno> {
901        let inner = self.inner.protected.read().unwrap();
902        match &inner.kind {
903            InodeSocketKind::PreSocket { props, .. }
904            | InodeSocketKind::RemoteSocket { props, .. } => {
905                Ok(props.recv_buf_size.unwrap_or_default())
906            }
907            InodeSocketKind::TcpStream { socket, .. } => {
908                socket.recv_buf_size().map_err(net_error_into_wasi_err)
909            }
910            _ => Err(Errno::Notsup),
911        }
912    }
913
914    pub fn set_linger(&mut self, linger: Option<std::time::Duration>) -> Result<(), Errno> {
915        let mut inner = self.inner.protected.write().unwrap();
916        match &mut inner.kind {
917            InodeSocketKind::TcpStream { socket, .. } => {
918                socket.set_linger(linger).map_err(net_error_into_wasi_err)
919            }
920            InodeSocketKind::RemoteSocket { .. } => Ok(()),
921            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
922            _ => Err(Errno::Notsup),
923        }
924    }
925
926    pub fn linger(&self) -> Result<Option<std::time::Duration>, Errno> {
927        let inner = self.inner.protected.read().unwrap();
928        match &inner.kind {
929            InodeSocketKind::TcpStream { socket, .. } => {
930                socket.linger().map_err(net_error_into_wasi_err)
931            }
932            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
933            _ => Err(Errno::Notsup),
934        }
935    }
936
937    pub fn set_opt_time(
938        &self,
939        ty: TimeType,
940        timeout: Option<std::time::Duration>,
941    ) -> Result<(), Errno> {
942        let mut inner = self.inner.protected.write().unwrap();
943        match &mut inner.kind {
944            InodeSocketKind::TcpStream {
945                write_timeout,
946                read_timeout,
947                ..
948            } => {
949                match ty {
950                    TimeType::WriteTimeout => *write_timeout = timeout,
951                    TimeType::ReadTimeout => *read_timeout = timeout,
952                    _ => return Err(Errno::Inval),
953                }
954                Ok(())
955            }
956            InodeSocketKind::TcpListener { accept_timeout, .. } => {
957                match ty {
958                    TimeType::AcceptTimeout => *accept_timeout = timeout,
959                    _ => return Err(Errno::Inval),
960                }
961                Ok(())
962            }
963            InodeSocketKind::PreSocket { props, .. }
964            | InodeSocketKind::RemoteSocket { props, .. } => {
965                match ty {
966                    TimeType::ConnectTimeout => props.connect_timeout = timeout,
967                    TimeType::AcceptTimeout => props.accept_timeout = timeout,
968                    TimeType::ReadTimeout => props.read_timeout = timeout,
969                    TimeType::WriteTimeout => props.write_timeout = timeout,
970                    _ => return Err(Errno::Io),
971                }
972                Ok(())
973            }
974            _ => Err(Errno::Notsup),
975        }
976    }
977
978    pub fn opt_time(&self, ty: TimeType) -> Result<Option<std::time::Duration>, Errno> {
979        let inner = self.inner.protected.read().unwrap();
980        match &inner.kind {
981            InodeSocketKind::TcpStream {
982                read_timeout,
983                write_timeout,
984                ..
985            } => Ok(match ty {
986                TimeType::ReadTimeout => *read_timeout,
987                TimeType::WriteTimeout => *write_timeout,
988                _ => return Err(Errno::Inval),
989            }),
990            InodeSocketKind::TcpListener { accept_timeout, .. } => Ok(match ty {
991                TimeType::AcceptTimeout => *accept_timeout,
992                _ => return Err(Errno::Inval),
993            }),
994            InodeSocketKind::PreSocket { props, .. }
995            | InodeSocketKind::RemoteSocket { props, .. } => match ty {
996                TimeType::ConnectTimeout => Ok(props.connect_timeout),
997                TimeType::AcceptTimeout => Ok(props.accept_timeout),
998                TimeType::ReadTimeout => Ok(props.read_timeout),
999                TimeType::WriteTimeout => Ok(props.write_timeout),
1000                _ => Err(Errno::Inval),
1001            },
1002            _ => Err(Errno::Notsup),
1003        }
1004    }
1005
1006    pub fn set_ttl(&self, ttl: u32) -> Result<(), Errno> {
1007        let mut inner = self.inner.protected.write().unwrap();
1008        match &mut inner.kind {
1009            InodeSocketKind::TcpStream { socket, .. } => {
1010                socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1011            }
1012            InodeSocketKind::UdpSocket { socket, .. } => {
1013                socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1014            }
1015            InodeSocketKind::RemoteSocket { ttl: set_ttl, .. } => {
1016                *set_ttl = ttl;
1017                Ok(())
1018            }
1019            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1020            _ => Err(Errno::Notsup),
1021        }
1022    }
1023
1024    pub fn ttl(&self) -> Result<u32, Errno> {
1025        let inner = self.inner.protected.read().unwrap();
1026        match &inner.kind {
1027            InodeSocketKind::TcpStream { socket, .. } => {
1028                socket.ttl().map_err(net_error_into_wasi_err)
1029            }
1030            InodeSocketKind::UdpSocket { socket, .. } => {
1031                socket.ttl().map_err(net_error_into_wasi_err)
1032            }
1033            InodeSocketKind::RemoteSocket { ttl, .. } => Ok(*ttl),
1034            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1035            _ => Err(Errno::Notsup),
1036        }
1037    }
1038
1039    pub fn set_multicast_ttl_v4(&self, ttl: u32) -> Result<(), Errno> {
1040        let mut inner = self.inner.protected.write().unwrap();
1041        match &mut inner.kind {
1042            InodeSocketKind::UdpSocket { socket, .. } => socket
1043                .set_multicast_ttl_v4(ttl)
1044                .map_err(net_error_into_wasi_err),
1045            InodeSocketKind::RemoteSocket {
1046                multicast_ttl: set_ttl,
1047                ..
1048            } => {
1049                *set_ttl = ttl;
1050                Ok(())
1051            }
1052            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1053            _ => Err(Errno::Notsup),
1054        }
1055    }
1056
1057    pub fn multicast_ttl_v4(&self) -> Result<u32, Errno> {
1058        let inner = self.inner.protected.read().unwrap();
1059        match &inner.kind {
1060            InodeSocketKind::UdpSocket { socket, .. } => {
1061                socket.multicast_ttl_v4().map_err(net_error_into_wasi_err)
1062            }
1063            InodeSocketKind::RemoteSocket { multicast_ttl, .. } => Ok(*multicast_ttl),
1064            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1065            _ => Err(Errno::Notsup),
1066        }
1067    }
1068
1069    pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1070        let mut inner = self.inner.protected.write().unwrap();
1071        match &mut inner.kind {
1072            InodeSocketKind::UdpSocket { socket, .. } => socket
1073                .join_multicast_v4(multiaddr, iface)
1074                .map_err(net_error_into_wasi_err),
1075            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1076            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1077            _ => Err(Errno::Notsup),
1078        }
1079    }
1080
1081    pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1082        let mut inner = self.inner.protected.write().unwrap();
1083        match &mut inner.kind {
1084            InodeSocketKind::UdpSocket { socket, .. } => socket
1085                .leave_multicast_v4(multiaddr, iface)
1086                .map_err(net_error_into_wasi_err),
1087            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1088            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1089            _ => Err(Errno::Notsup),
1090        }
1091    }
1092
1093    pub fn join_multicast_v6(&self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1094        let mut inner = self.inner.protected.write().unwrap();
1095        match &mut inner.kind {
1096            InodeSocketKind::UdpSocket { socket, .. } => socket
1097                .join_multicast_v6(multiaddr, iface)
1098                .map_err(net_error_into_wasi_err),
1099            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1100            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1101            _ => Err(Errno::Notsup),
1102        }
1103    }
1104
1105    pub fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1106        let mut inner = self.inner.protected.write().unwrap();
1107        match &mut inner.kind {
1108            InodeSocketKind::UdpSocket { socket, .. } => socket
1109                .leave_multicast_v6(multiaddr, iface)
1110                .map_err(net_error_into_wasi_err),
1111            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1112            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1113            _ => Err(Errno::Notsup),
1114        }
1115    }
1116
1117    pub async fn send(
1118        &self,
1119        tasks: &dyn VirtualTaskManager,
1120        buf: &[u8],
1121        timeout: Option<Duration>,
1122        nonblocking: bool,
1123    ) -> Result<usize, Errno> {
1124        struct SocketSender<'a, 'b> {
1125            inner: &'a InodeSocketInner,
1126            data: &'b [u8],
1127            nonblocking: bool,
1128            handler_registered: bool,
1129        }
1130        impl Drop for SocketSender<'_, '_> {
1131            fn drop(&mut self) {
1132                if self.handler_registered {
1133                    let mut inner = self.inner.protected.write().unwrap();
1134                    inner.remove_handler();
1135                }
1136            }
1137        }
1138        impl Future for SocketSender<'_, '_> {
1139            type Output = Result<usize, Errno>;
1140            fn poll(
1141                mut self: Pin<&mut Self>,
1142                cx: &mut std::task::Context<'_>,
1143            ) -> Poll<Self::Output> {
1144                loop {
1145                    let mut inner = self.inner.protected.write().unwrap();
1146                    let res = match &mut inner.kind {
1147                        InodeSocketKind::Raw(socket) => socket.try_send(self.data),
1148                        InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1149                        InodeSocketKind::UdpSocket { socket, peer } => {
1150                            if let Some(peer) = peer {
1151                                socket.try_send_to(self.data, *peer)
1152                            } else {
1153                                Err(NetworkError::NotConnected)
1154                            }
1155                        }
1156                        InodeSocketKind::PreSocket { .. } => {
1157                            return Poll::Ready(Err(Errno::Notconn));
1158                        }
1159                        InodeSocketKind::RemoteSocket { is_dead, .. } => {
1160                            return match is_dead {
1161                                true => Poll::Ready(Err(Errno::Connreset)),
1162                                false => Poll::Ready(Ok(self.data.len())),
1163                            };
1164                        }
1165                        _ => return Poll::Ready(Err(Errno::Notsup)),
1166                    };
1167                    return match res {
1168                        Ok(amt) => Poll::Ready(Ok(amt)),
1169                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1170                            Poll::Ready(Err(Errno::Again))
1171                        }
1172                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1173                            inner
1174                                .set_handler(cx.waker().into())
1175                                .map_err(net_error_into_wasi_err)?;
1176                            drop(inner);
1177                            self.handler_registered = true;
1178                            continue;
1179                        }
1180                        Err(NetworkError::WouldBlock) => Poll::Pending,
1181                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1182                    };
1183                }
1184            }
1185        }
1186
1187        let poller = SocketSender {
1188            inner: &self.inner,
1189            data: buf,
1190            nonblocking,
1191            handler_registered: false,
1192        };
1193        if let Some(timeout) = timeout {
1194            tokio::select! {
1195                res = poller => res,
1196                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1197            }
1198        } else {
1199            poller.await
1200        }
1201    }
1202
1203    pub async fn send_to<M: MemorySize>(
1204        &self,
1205        tasks: &dyn VirtualTaskManager,
1206        buf: &[u8],
1207        addr: SocketAddr,
1208        timeout: Option<Duration>,
1209        nonblocking: bool,
1210    ) -> Result<usize, Errno> {
1211        struct SocketSender<'a, 'b> {
1212            inner: &'a InodeSocketInner,
1213            data: &'b [u8],
1214            addr: SocketAddr,
1215            nonblocking: bool,
1216            handler_registered: bool,
1217        }
1218        impl Drop for SocketSender<'_, '_> {
1219            fn drop(&mut self) {
1220                if self.handler_registered {
1221                    let mut inner = self.inner.protected.write().unwrap();
1222                    inner.remove_handler();
1223                }
1224            }
1225        }
1226        impl Future for SocketSender<'_, '_> {
1227            type Output = Result<usize, Errno>;
1228            fn poll(
1229                mut self: Pin<&mut Self>,
1230                cx: &mut std::task::Context<'_>,
1231            ) -> Poll<Self::Output> {
1232                loop {
1233                    let mut inner = self.inner.protected.write().unwrap();
1234                    let res = match &mut inner.kind {
1235                        InodeSocketKind::Icmp(socket) => socket.try_send_to(self.data, self.addr),
1236                        InodeSocketKind::UdpSocket { socket, .. } => {
1237                            socket.try_send_to(self.data, self.addr)
1238                        }
1239                        InodeSocketKind::PreSocket { .. } => {
1240                            return Poll::Ready(Err(Errno::Notconn));
1241                        }
1242                        InodeSocketKind::RemoteSocket { is_dead, .. } => {
1243                            return match is_dead {
1244                                true => Poll::Ready(Err(Errno::Connreset)),
1245                                false => Poll::Ready(Ok(self.data.len())),
1246                            };
1247                        }
1248                        _ => return Poll::Ready(Err(Errno::Notsup)),
1249                    };
1250                    return match res {
1251                        Ok(amt) => Poll::Ready(Ok(amt)),
1252                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1253                            Poll::Ready(Err(Errno::Again))
1254                        }
1255                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1256                            inner
1257                                .set_handler(cx.waker().into())
1258                                .map_err(net_error_into_wasi_err)?;
1259                            self.handler_registered = true;
1260                            drop(inner);
1261                            continue;
1262                        }
1263                        Err(NetworkError::WouldBlock) => Poll::Pending,
1264                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1265                    };
1266                }
1267            }
1268        }
1269
1270        let poller = SocketSender {
1271            inner: &self.inner,
1272            data: buf,
1273            addr,
1274            nonblocking,
1275            handler_registered: false,
1276        };
1277        if let Some(timeout) = timeout {
1278            tokio::select! {
1279                res = poller => res,
1280                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1281            }
1282        } else {
1283            poller.await
1284        }
1285    }
1286
1287    pub async fn recv(
1288        &self,
1289        tasks: &dyn VirtualTaskManager,
1290        buf: &mut [MaybeUninit<u8>],
1291        timeout: Option<Duration>,
1292        nonblocking: bool,
1293        peek: bool,
1294    ) -> Result<usize, Errno> {
1295        struct SocketReceiver<'a, 'b> {
1296            inner: &'a InodeSocketInner,
1297            data: &'b mut [MaybeUninit<u8>],
1298            nonblocking: bool,
1299            peek: bool,
1300            handler_registered: bool,
1301        }
1302        impl Drop for SocketReceiver<'_, '_> {
1303            fn drop(&mut self) {
1304                if self.handler_registered {
1305                    let mut inner = self.inner.protected.write().unwrap();
1306                    inner.remove_handler();
1307                }
1308            }
1309        }
1310        impl Future for SocketReceiver<'_, '_> {
1311            type Output = Result<usize, Errno>;
1312            fn poll(
1313                mut self: Pin<&mut Self>,
1314                cx: &mut std::task::Context<'_>,
1315            ) -> Poll<Self::Output> {
1316                loop {
1317                    let peek = self.peek;
1318                    let mut inner = self.inner.protected.write().unwrap();
1319                    let res = match &mut inner.kind {
1320                        InodeSocketKind::Raw(socket) => socket.try_recv(self.data, peek),
1321                        InodeSocketKind::TcpStream { socket, .. } => {
1322                            socket.try_recv(self.data, peek)
1323                        }
1324                        InodeSocketKind::UdpSocket { socket, peer } => {
1325                            if let Some(peer) = peer {
1326                                match socket.try_recv_from(self.data, peek) {
1327                                    Ok((amt, addr)) if addr == *peer => Ok(amt),
1328                                    Ok(_) => Err(NetworkError::WouldBlock),
1329                                    Err(err) => Err(err),
1330                                }
1331                            } else {
1332                                match socket.try_recv_from(self.data, peek) {
1333                                    Ok((amt, _)) => Ok(amt),
1334                                    Err(err) => Err(err),
1335                                }
1336                            }
1337                        }
1338                        InodeSocketKind::RemoteSocket { is_dead, .. } => {
1339                            return match is_dead {
1340                                true => Poll::Ready(Ok(0)),
1341                                false => Poll::Pending,
1342                            };
1343                        }
1344                        InodeSocketKind::PreSocket { .. } => {
1345                            return Poll::Ready(Err(Errno::Notconn));
1346                        }
1347                        _ => return Poll::Ready(Err(Errno::Notsup)),
1348                    };
1349                    return match res {
1350                        Ok(amt) => Poll::Ready(Ok(amt)),
1351                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1352                            Poll::Ready(Err(Errno::Again))
1353                        }
1354                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1355                            inner
1356                                .set_handler(cx.waker().into())
1357                                .map_err(net_error_into_wasi_err)?;
1358                            self.handler_registered = true;
1359                            drop(inner);
1360                            continue;
1361                        }
1362
1363                        Err(NetworkError::WouldBlock) => Poll::Pending,
1364                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1365                    };
1366                }
1367            }
1368        }
1369
1370        let poller = SocketReceiver {
1371            inner: &self.inner,
1372            data: buf,
1373            nonblocking,
1374            peek,
1375            handler_registered: false,
1376        };
1377        if let Some(timeout) = timeout {
1378            tokio::select! {
1379                res = poller => res,
1380                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1381            }
1382        } else {
1383            poller.await
1384        }
1385    }
1386
1387    pub async fn recv_from(
1388        &self,
1389        tasks: &dyn VirtualTaskManager,
1390        buf: &mut [MaybeUninit<u8>],
1391        timeout: Option<Duration>,
1392        nonblocking: bool,
1393        peek: bool,
1394    ) -> Result<(usize, SocketAddr), Errno> {
1395        struct SocketReceiver<'a, 'b> {
1396            inner: &'a InodeSocketInner,
1397            data: &'b mut [MaybeUninit<u8>],
1398            nonblocking: bool,
1399            peek: bool,
1400            handler_registered: bool,
1401        }
1402        impl Drop for SocketReceiver<'_, '_> {
1403            fn drop(&mut self) {
1404                if self.handler_registered {
1405                    let mut inner = self.inner.protected.write().unwrap();
1406                    inner.remove_handler();
1407                }
1408            }
1409        }
1410        impl Future for SocketReceiver<'_, '_> {
1411            type Output = Result<(usize, SocketAddr), Errno>;
1412            fn poll(
1413                mut self: Pin<&mut Self>,
1414                cx: &mut std::task::Context<'_>,
1415            ) -> Poll<Self::Output> {
1416                let peek = self.peek;
1417                let mut inner = self.inner.protected.write().unwrap();
1418                loop {
1419                    let res = match &mut inner.kind {
1420                        InodeSocketKind::Icmp(socket) => socket.try_recv_from(self.data, peek),
1421                        InodeSocketKind::UdpSocket { socket, .. } => {
1422                            socket.try_recv_from(self.data, peek)
1423                        }
1424                        InodeSocketKind::RemoteSocket {
1425                            is_dead, peer_addr, ..
1426                        } => {
1427                            return match is_dead {
1428                                true => Poll::Ready(Ok((0, *peer_addr))),
1429                                false => Poll::Pending,
1430                            };
1431                        }
1432                        InodeSocketKind::PreSocket { .. } => {
1433                            return Poll::Ready(Err(Errno::Notconn));
1434                        }
1435                        _ => return Poll::Ready(Err(Errno::Notsup)),
1436                    };
1437                    return match res {
1438                        Ok((amt, addr)) => Poll::Ready(Ok((amt, addr))),
1439                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1440                            Poll::Ready(Err(Errno::Again))
1441                        }
1442                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1443                            inner
1444                                .set_handler(cx.waker().into())
1445                                .map_err(net_error_into_wasi_err)?;
1446                            self.handler_registered = true;
1447                            continue;
1448                        }
1449                        Err(NetworkError::WouldBlock) => Poll::Pending,
1450                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1451                    };
1452                }
1453            }
1454        }
1455
1456        let poller = SocketReceiver {
1457            inner: &self.inner,
1458            data: buf,
1459            nonblocking,
1460            peek,
1461            handler_registered: false,
1462        };
1463        if let Some(timeout) = timeout {
1464            tokio::select! {
1465                res = poller => res,
1466                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1467            }
1468        } else {
1469            poller.await
1470        }
1471    }
1472
1473    pub fn shutdown(&mut self, how: std::net::Shutdown) -> Result<(), Errno> {
1474        let mut inner = self.inner.protected.write().unwrap();
1475        match &mut inner.kind {
1476            InodeSocketKind::TcpStream { socket, .. } => {
1477                socket.shutdown(how).map_err(net_error_into_wasi_err)?;
1478            }
1479            InodeSocketKind::RemoteSocket { .. } => return Ok(()),
1480            InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
1481            _ => return Err(Errno::Notsup),
1482        }
1483        Ok(())
1484    }
1485
1486    pub async fn can_write(&self) -> bool {
1487        if let Ok(mut guard) = self.inner.protected.try_write() {
1488            #[allow(clippy::match_like_matches_macro)]
1489            match &mut guard.kind {
1490                InodeSocketKind::TcpStream { .. }
1491                | InodeSocketKind::UdpSocket { .. }
1492                | InodeSocketKind::Raw(..) => true,
1493                InodeSocketKind::RemoteSocket { is_dead, .. } => !(*is_dead),
1494                _ => false,
1495            }
1496        } else {
1497            false
1498        }
1499    }
1500}
1501
1502impl InodeSocketProtected {
1503    pub fn remove_handler(&mut self) {
1504        match &mut self.kind {
1505            InodeSocketKind::TcpListener { socket, .. } => socket.remove_handler(),
1506            InodeSocketKind::TcpStream { socket, .. } => socket.remove_handler(),
1507            InodeSocketKind::UdpSocket { socket, .. } => socket.remove_handler(),
1508            InodeSocketKind::Raw(socket) => socket.remove_handler(),
1509            InodeSocketKind::Icmp(socket) => socket.remove_handler(),
1510            InodeSocketKind::PreSocket { props, .. } => {
1511                props.handler.take();
1512            }
1513            InodeSocketKind::RemoteSocket { props, .. } => {
1514                props.handler.take();
1515            }
1516        }
1517    }
1518
1519    pub fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1520        match &mut self.kind {
1521            InodeSocketKind::TcpListener { socket, .. } => socket.poll_read_ready(cx),
1522            InodeSocketKind::TcpStream { socket, .. } => socket.poll_read_ready(cx),
1523            InodeSocketKind::UdpSocket { socket, .. } => socket.poll_read_ready(cx),
1524            InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
1525            InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
1526            InodeSocketKind::PreSocket { .. } => Poll::Pending,
1527            InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1528                true => Poll::Ready(Ok(0)),
1529                false => Poll::Pending,
1530            },
1531        }
1532        .map_err(net_error_into_io_err)
1533    }
1534
1535    pub fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1536        match &mut self.kind {
1537            InodeSocketKind::TcpListener { socket, .. } => socket.poll_write_ready(cx),
1538            InodeSocketKind::TcpStream { socket, .. } => socket.poll_write_ready(cx),
1539            InodeSocketKind::UdpSocket { socket, .. } => socket.poll_write_ready(cx),
1540            InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
1541            InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
1542            InodeSocketKind::PreSocket { .. } => Poll::Pending,
1543            InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1544                true => Poll::Ready(Ok(0)),
1545                false => Poll::Pending,
1546            },
1547        }
1548        .map_err(net_error_into_io_err)
1549    }
1550
1551    pub fn set_handler(
1552        &mut self,
1553        handler: Box<dyn InterestHandler + Send + Sync>,
1554    ) -> virtual_net::Result<()> {
1555        match &mut self.kind {
1556            InodeSocketKind::TcpListener { socket, .. } => socket.set_handler(handler),
1557            InodeSocketKind::TcpStream { socket, .. } => socket.set_handler(handler),
1558            InodeSocketKind::UdpSocket { socket, .. } => socket.set_handler(handler),
1559            InodeSocketKind::Raw(socket) => socket.set_handler(handler),
1560            InodeSocketKind::Icmp(socket) => socket.set_handler(handler),
1561            InodeSocketKind::PreSocket { props, .. }
1562            | InodeSocketKind::RemoteSocket { props, .. } => {
1563                props.handler.replace(handler);
1564                Ok(())
1565            }
1566        }
1567    }
1568}
1569
1570// TODO: review allow...
1571#[allow(dead_code)]
1572pub(crate) fn all_socket_rights() -> Rights {
1573    Rights::FD_FDSTAT_SET_FLAGS
1574        .union(Rights::FD_FILESTAT_GET)
1575        .union(Rights::FD_READ)
1576        .union(Rights::FD_WRITE)
1577        .union(Rights::POLL_FD_READWRITE)
1578        .union(Rights::SOCK_SHUTDOWN)
1579        .union(Rights::SOCK_CONNECT)
1580        .union(Rights::SOCK_LISTEN)
1581        .union(Rights::SOCK_BIND)
1582        .union(Rights::SOCK_ACCEPT)
1583        .union(Rights::SOCK_RECV)
1584        .union(Rights::SOCK_SEND)
1585        .union(Rights::SOCK_ADDR_LOCAL)
1586        .union(Rights::SOCK_ADDR_REMOTE)
1587        .union(Rights::SOCK_RECV_FROM)
1588        .union(Rights::SOCK_SEND_TO)
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593    use super::{InodeSocket, InodeSocketKind, WasiSocketStatus};
1594    use std::{
1595        mem::MaybeUninit,
1596        net::{Ipv4Addr, Shutdown, SocketAddr},
1597        pin::Pin,
1598        sync::{
1599            Arc,
1600            atomic::{AtomicUsize, Ordering},
1601        },
1602        task::{Context, Poll},
1603        time::Duration,
1604    };
1605    use virtual_mio::InterestHandler;
1606    use virtual_net::{
1607        NetworkError, Result as NetResult, SocketStatus, VirtualConnectedSocket, VirtualIoSource,
1608        VirtualSocket, VirtualTcpSocket,
1609    };
1610
1611    #[derive(Debug)]
1612    struct MockTcpSocket {
1613        read_calls: Arc<AtomicUsize>,
1614        write_calls: Arc<AtomicUsize>,
1615        status: Arc<AtomicUsize>,
1616    }
1617
1618    const MOCK_STATUS_OPENING: usize = 0;
1619    const MOCK_STATUS_OPENED: usize = 1;
1620
1621    fn decode_mock_status(value: usize) -> SocketStatus {
1622        match value {
1623            MOCK_STATUS_OPENED => SocketStatus::Opened,
1624            _ => SocketStatus::Opening,
1625        }
1626    }
1627
1628    impl VirtualIoSource for MockTcpSocket {
1629        fn remove_handler(&mut self) {}
1630
1631        fn poll_read_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1632            self.read_calls.fetch_add(1, Ordering::Relaxed);
1633            Poll::Ready(Ok(3))
1634        }
1635
1636        fn poll_write_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1637            self.write_calls.fetch_add(1, Ordering::Relaxed);
1638            self.status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1639            Poll::Ready(Ok(7))
1640        }
1641    }
1642
1643    impl VirtualSocket for MockTcpSocket {
1644        fn set_ttl(&mut self, _ttl: u32) -> NetResult<()> {
1645            Ok(())
1646        }
1647
1648        fn ttl(&self) -> NetResult<u32> {
1649            Ok(64)
1650        }
1651
1652        fn addr_local(&self) -> NetResult<SocketAddr> {
1653            Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)))
1654        }
1655
1656        fn status(&self) -> NetResult<SocketStatus> {
1657            Ok(decode_mock_status(self.status.load(Ordering::Relaxed)))
1658        }
1659
1660        fn set_handler(
1661            &mut self,
1662            _handler: Box<dyn InterestHandler + Send + Sync>,
1663        ) -> NetResult<()> {
1664            Ok(())
1665        }
1666    }
1667
1668    impl VirtualConnectedSocket for MockTcpSocket {
1669        fn set_linger(&mut self, _linger: Option<Duration>) -> NetResult<()> {
1670            Ok(())
1671        }
1672
1673        fn linger(&self) -> NetResult<Option<Duration>> {
1674            Ok(None)
1675        }
1676
1677        fn try_send(&mut self, _data: &[u8]) -> NetResult<usize> {
1678            Err(NetworkError::Unsupported)
1679        }
1680
1681        fn try_flush(&mut self) -> NetResult<()> {
1682            Err(NetworkError::Unsupported)
1683        }
1684
1685        fn close(&mut self) -> NetResult<()> {
1686            Ok(())
1687        }
1688
1689        fn try_recv(&mut self, _buf: &mut [MaybeUninit<u8>], _peek: bool) -> NetResult<usize> {
1690            Err(NetworkError::Unsupported)
1691        }
1692    }
1693
1694    impl VirtualTcpSocket for MockTcpSocket {
1695        fn set_recv_buf_size(&mut self, _size: usize) -> NetResult<()> {
1696            Ok(())
1697        }
1698
1699        fn recv_buf_size(&self) -> NetResult<usize> {
1700            Ok(0)
1701        }
1702
1703        fn set_send_buf_size(&mut self, _size: usize) -> NetResult<()> {
1704            Ok(())
1705        }
1706
1707        fn send_buf_size(&self) -> NetResult<usize> {
1708            Ok(0)
1709        }
1710
1711        fn set_nodelay(&mut self, _reuse: bool) -> NetResult<()> {
1712            Ok(())
1713        }
1714
1715        fn nodelay(&self) -> NetResult<bool> {
1716            Ok(true)
1717        }
1718
1719        fn set_keepalive(&mut self, _keepalive: bool) -> NetResult<()> {
1720            Ok(())
1721        }
1722
1723        fn keepalive(&self) -> NetResult<bool> {
1724            Ok(false)
1725        }
1726
1727        fn set_dontroute(&mut self, _keepalive: bool) -> NetResult<()> {
1728            Ok(())
1729        }
1730
1731        fn dontroute(&self) -> NetResult<bool> {
1732            Ok(false)
1733        }
1734
1735        fn addr_peer(&self) -> NetResult<SocketAddr> {
1736            Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 80)))
1737        }
1738
1739        fn shutdown(&mut self, _how: Shutdown) -> NetResult<()> {
1740            Ok(())
1741        }
1742
1743        fn is_closed(&self) -> bool {
1744            false
1745        }
1746    }
1747
1748    #[test]
1749    fn inode_socket_poll_write_ready_uses_write_path() {
1750        let read_calls = Arc::new(AtomicUsize::new(0));
1751        let write_calls = Arc::new(AtomicUsize::new(0));
1752        let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENED));
1753        let mut inode = InodeSocket::new(InodeSocketKind::TcpStream {
1754            socket: Box::new(MockTcpSocket {
1755                read_calls: read_calls.clone(),
1756                write_calls: write_calls.clone(),
1757                status,
1758            }),
1759            write_timeout: None,
1760            read_timeout: None,
1761        });
1762
1763        let waker = futures::task::noop_waker();
1764        let mut cx = Context::from_waker(&waker);
1765        let ready = Pin::new(&mut inode).poll_write_ready(&mut cx);
1766
1767        assert!(matches!(ready, Poll::Ready(Ok(7))));
1768        assert_eq!(read_calls.load(Ordering::Relaxed), 0);
1769        assert_eq!(write_calls.load(Ordering::Relaxed), 1);
1770    }
1771
1772    #[test]
1773    fn inode_socket_status_tracks_tcp_socket_status() {
1774        let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENING));
1775        let inode = InodeSocket::new(InodeSocketKind::TcpStream {
1776            socket: Box::new(MockTcpSocket {
1777                read_calls: Arc::new(AtomicUsize::new(0)),
1778                write_calls: Arc::new(AtomicUsize::new(0)),
1779                status: status.clone(),
1780            }),
1781            write_timeout: None,
1782            read_timeout: None,
1783        });
1784
1785        assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opening));
1786        status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1787        assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opened));
1788    }
1789}