wasmer_wasix/net/
socket.rs

1use std::{
2    future::Future,
3    io,
4    mem::MaybeUninit,
5    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
6    pin::Pin,
7    sync::{Arc, RwLock, RwLockWriteGuard},
8    task::{Context, Poll},
9    time::Duration,
10};
11
12#[cfg(feature = "enable-serde")]
13use serde_derive::{Deserialize, Serialize};
14use virtual_mio::InterestHandler;
15use virtual_net::{
16    NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpListener,
17    VirtualTcpSocket, VirtualUdpSocket, net_error_into_io_err,
18};
19use wasmer_types::MemorySize;
20use wasmer_wasix_types::wasi::{Addressfamily, Errno, Rights, SockProto, Sockoption, Socktype};
21
22use crate::{VirtualTaskManager, net::net_error_into_wasi_err};
23
24#[derive(Debug)]
25#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
26pub enum InodeHttpSocketType {
27    /// Used to feed the bytes into the request itself
28    Request,
29    /// Used to receive the bytes from the HTTP server
30    Response,
31    /// Used to read the headers from the HTTP server
32    Headers,
33}
34
35#[derive(Debug)]
36pub struct SocketProperties {
37    pub family: Addressfamily,
38    pub ty: Socktype,
39    pub pt: SockProto,
40    pub only_v6: bool,
41    pub reuse_port: bool,
42    pub reuse_addr: bool,
43    pub no_delay: Option<bool>,
44    pub keep_alive: Option<bool>,
45    pub dont_route: Option<bool>,
46    pub send_buf_size: Option<usize>,
47    pub recv_buf_size: Option<usize>,
48    pub write_timeout: Option<Duration>,
49    pub read_timeout: Option<Duration>,
50    pub accept_timeout: Option<Duration>,
51    pub connect_timeout: Option<Duration>,
52    pub handler: Option<Box<dyn InterestHandler + Send + Sync>>,
53}
54
55#[derive(Debug)]
56//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
57pub enum InodeSocketKind {
58    PreSocket {
59        props: SocketProperties,
60        addr: Option<SocketAddr>,
61    },
62    Icmp(Box<dyn VirtualIcmpSocket + Sync>),
63    Raw(Box<dyn VirtualRawSocket + Sync>),
64    TcpListener {
65        socket: Box<dyn VirtualTcpListener + Sync>,
66        accept_timeout: Option<Duration>,
67    },
68    TcpStream {
69        socket: Box<dyn VirtualTcpSocket + Sync>,
70        write_timeout: Option<Duration>,
71        read_timeout: Option<Duration>,
72    },
73    UdpSocket {
74        socket: Box<dyn VirtualUdpSocket + Sync>,
75        peer: Option<SocketAddr>,
76    },
77    RemoteSocket {
78        props: SocketProperties,
79        local_addr: SocketAddr,
80        peer_addr: SocketAddr,
81        ttl: u32,
82        multicast_ttl: u32,
83        is_dead: bool,
84    },
85}
86
87pub enum WasiSocketOption {
88    Noop,
89    ReusePort,
90    ReuseAddr,
91    NoDelay,
92    DontRoute,
93    OnlyV6,
94    Broadcast,
95    MulticastLoopV4,
96    MulticastLoopV6,
97    Promiscuous,
98    Listening,
99    LastError,
100    KeepAlive,
101    Linger,
102    OobInline,
103    RecvBufSize,
104    SendBufSize,
105    RecvLowat,
106    SendLowat,
107    RecvTimeout,
108    SendTimeout,
109    ConnectTimeout,
110    AcceptTimeout,
111    Ttl,
112    MulticastTtlV4,
113    Type,
114    Proto,
115}
116
117impl TryFrom<Sockoption> for WasiSocketOption {
118    type Error = Errno;
119
120    fn try_from(opt: Sockoption) -> Result<Self, Self::Error> {
121        use WasiSocketOption::*;
122        match opt {
123            Sockoption::Noop => Ok(Noop),
124            Sockoption::ReusePort => Ok(ReusePort),
125            Sockoption::ReuseAddr => Ok(ReuseAddr),
126            Sockoption::NoDelay => Ok(NoDelay),
127            Sockoption::DontRoute => Ok(DontRoute),
128            Sockoption::OnlyV6 => Ok(OnlyV6),
129            Sockoption::Broadcast => Ok(Broadcast),
130            Sockoption::MulticastLoopV4 => Ok(MulticastLoopV4),
131            Sockoption::MulticastLoopV6 => Ok(MulticastLoopV6),
132            Sockoption::Promiscuous => Ok(Promiscuous),
133            Sockoption::Listening => Ok(Listening),
134            Sockoption::LastError => Ok(LastError),
135            Sockoption::KeepAlive => Ok(KeepAlive),
136            Sockoption::Linger => Ok(Linger),
137            Sockoption::OobInline => Ok(OobInline),
138            Sockoption::RecvBufSize => Ok(RecvBufSize),
139            Sockoption::SendBufSize => Ok(SendBufSize),
140            Sockoption::RecvLowat => Ok(RecvLowat),
141            Sockoption::SendLowat => Ok(SendLowat),
142            Sockoption::RecvTimeout => Ok(RecvTimeout),
143            Sockoption::SendTimeout => Ok(SendTimeout),
144            Sockoption::ConnectTimeout => Ok(ConnectTimeout),
145            Sockoption::AcceptTimeout => Ok(AcceptTimeout),
146            Sockoption::Ttl => Ok(Ttl),
147            Sockoption::MulticastTtlV4 => Ok(MulticastTtlV4),
148            Sockoption::Type => Ok(Type),
149            Sockoption::Proto => Ok(Proto),
150            _ => Err(Errno::Inval),
151        }
152    }
153}
154
155#[derive(Debug)]
156pub enum WasiSocketStatus {
157    Opening,
158    Opened,
159    Closed,
160    Failed,
161}
162
163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
165pub enum TimeType {
166    ReadTimeout,
167    WriteTimeout,
168    AcceptTimeout,
169    ConnectTimeout,
170    BindTimeout,
171    Linger,
172}
173
174impl From<TimeType> for wasmer_journal::SocketOptTimeType {
175    fn from(value: TimeType) -> Self {
176        match value {
177            TimeType::ReadTimeout => Self::ReadTimeout,
178            TimeType::WriteTimeout => Self::WriteTimeout,
179            TimeType::AcceptTimeout => Self::AcceptTimeout,
180            TimeType::ConnectTimeout => Self::ConnectTimeout,
181            TimeType::BindTimeout => Self::BindTimeout,
182            TimeType::Linger => Self::Linger,
183        }
184    }
185}
186
187impl From<wasmer_journal::SocketOptTimeType> for TimeType {
188    fn from(value: wasmer_journal::SocketOptTimeType) -> Self {
189        use wasmer_journal::SocketOptTimeType;
190        match value {
191            SocketOptTimeType::ReadTimeout => TimeType::ReadTimeout,
192            SocketOptTimeType::WriteTimeout => TimeType::WriteTimeout,
193            SocketOptTimeType::AcceptTimeout => TimeType::AcceptTimeout,
194            SocketOptTimeType::ConnectTimeout => TimeType::ConnectTimeout,
195            SocketOptTimeType::BindTimeout => TimeType::BindTimeout,
196            SocketOptTimeType::Linger => TimeType::Linger,
197        }
198    }
199}
200
201#[derive(Debug)]
202//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
203pub(crate) struct InodeSocketProtected {
204    pub kind: InodeSocketKind,
205}
206
207#[derive(Debug)]
208//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
209pub(crate) struct InodeSocketInner {
210    pub protected: RwLock<InodeSocketProtected>,
211}
212
213#[derive(Debug, Clone)]
214//#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
215pub struct InodeSocket {
216    pub(crate) inner: Arc<InodeSocketInner>,
217}
218
219impl InodeSocket {
220    pub fn new(kind: InodeSocketKind) -> Self {
221        let protected = InodeSocketProtected { kind };
222        Self {
223            inner: Arc::new(InodeSocketInner {
224                protected: RwLock::new(protected),
225            }),
226        }
227    }
228
229    pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
230        let mut inner = self.inner.protected.write().unwrap();
231        inner.poll_read_ready(cx)
232    }
233
234    pub fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
235        let mut inner = self.inner.protected.write().unwrap();
236        inner.poll_write_ready(cx)
237    }
238
239    // When a sendto or connect call comes in for a UDP "pre-socket", it must be bound to
240    // an ephemeral port automatically.
241    // Apparently, clippy fails to recognize the write-locked guard being passed into
242    // the other function, hence the `allow` attribute.
243    #[allow(clippy::await_holding_lock, clippy::readonly_write_lock)]
244    pub async fn auto_bind_udp(
245        &self,
246        tasks: &dyn VirtualTaskManager,
247        net: &dyn VirtualNetworking,
248    ) -> Result<Option<InodeSocket>, Errno> {
249        let timeout = self
250            .opt_time(TimeType::BindTimeout)
251            .ok()
252            .flatten()
253            .unwrap_or(Duration::from_secs(30));
254        let inner = self.inner.protected.write().unwrap();
255        match &inner.kind {
256            InodeSocketKind::PreSocket { props, .. } if props.ty == Socktype::Dgram => {
257                let addr = match props.family {
258                    Addressfamily::Inet4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
259                    Addressfamily::Inet6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
260                    _ => return Err(Errno::Notsup),
261                };
262                Self::bind_internal(tasks, net, addr, timeout, inner).await
263            }
264            _ => Ok(None),
265        }
266    }
267
268    pub async fn bind(
269        &self,
270        tasks: &dyn VirtualTaskManager,
271        net: &dyn VirtualNetworking,
272        set_addr: SocketAddr,
273    ) -> Result<Option<InodeSocket>, Errno> {
274        let timeout = self
275            .opt_time(TimeType::BindTimeout)
276            .ok()
277            .flatten()
278            .unwrap_or(Duration::from_secs(30));
279        let inner = self.inner.protected.write().unwrap();
280        Self::bind_internal(tasks, net, set_addr, timeout, inner).await
281    }
282
283    // The lock is dropped before awaiting, but clippy doesn't realize it
284    #[allow(clippy::await_holding_lock)]
285    async fn bind_internal(
286        tasks: &dyn VirtualTaskManager,
287        net: &dyn VirtualNetworking,
288        set_addr: SocketAddr,
289        timeout: Duration,
290        mut inner: RwLockWriteGuard<'_, InodeSocketProtected>,
291    ) -> Result<Option<InodeSocket>, Errno> {
292        let socket = {
293            match &mut inner.kind {
294                InodeSocketKind::PreSocket { props, addr, .. } => {
295                    match props.family {
296                        Addressfamily::Inet4 => {
297                            if !set_addr.is_ipv4() {
298                                tracing::debug!(
299                                    "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
300                                );
301                                return Err(Errno::Inval);
302                            }
303                        }
304                        Addressfamily::Inet6 => {
305                            if !set_addr.is_ipv6() {
306                                tracing::debug!(
307                                    "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
308                                );
309                                return Err(Errno::Inval);
310                            }
311                        }
312                        _ => {
313                            return Err(Errno::Notsup);
314                        }
315                    }
316
317                    addr.replace(set_addr);
318                    let addr = (*addr).unwrap();
319
320                    match props.ty {
321                        Socktype::Stream => {
322                            // we already set the socket address - next we need a listen or connect so nothing
323                            // more to do at this time
324                            return Ok(None);
325                        }
326                        Socktype::Dgram => {
327                            let reuse_port = props.reuse_port;
328                            let reuse_addr = props.reuse_addr;
329
330                            net.bind_udp(addr, reuse_port, reuse_addr)
331                        }
332                        _ => return Err(Errno::Inval),
333                    }
334                }
335                InodeSocketKind::RemoteSocket {
336                    props,
337                    local_addr: addr,
338                    ..
339                } => {
340                    match props.family {
341                        Addressfamily::Inet4 => {
342                            if !set_addr.is_ipv4() {
343                                tracing::debug!(
344                                    "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
345                                );
346                                return Err(Errno::Inval);
347                            }
348                        }
349                        Addressfamily::Inet6 => {
350                            if !set_addr.is_ipv6() {
351                                tracing::debug!(
352                                    "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
353                                );
354                                return Err(Errno::Inval);
355                            }
356                        }
357                        _ => {
358                            return Err(Errno::Notsup);
359                        }
360                    }
361
362                    *addr = set_addr;
363                    let addr = *addr;
364
365                    match props.ty {
366                        Socktype::Stream => {
367                            // we already set the socket address - next we need a listen or connect so nothing
368                            // more to do at this time
369                            return Ok(None);
370                        }
371                        Socktype::Dgram => {
372                            let reuse_port = props.reuse_port;
373                            let reuse_addr = props.reuse_addr;
374
375                            net.bind_udp(addr, reuse_port, reuse_addr)
376                        }
377                        _ => return Err(Errno::Inval),
378                    }
379                }
380                _ => return Err(Errno::Notsup),
381            }
382        };
383
384        drop(inner);
385
386        tokio::select! {
387            socket = socket => {
388                let socket = socket.map_err(net_error_into_wasi_err)?;
389                Ok(Some(InodeSocket::new(InodeSocketKind::UdpSocket { socket, peer: None })))
390            },
391            _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
392        }
393    }
394
395    pub async fn listen(
396        &self,
397        tasks: &dyn VirtualTaskManager,
398        net: &dyn VirtualNetworking,
399        _backlog: usize,
400    ) -> Result<Option<InodeSocket>, Errno> {
401        let timeout = self
402            .opt_time(TimeType::AcceptTimeout)
403            .ok()
404            .flatten()
405            .unwrap_or(Duration::from_secs(30));
406
407        let socket = {
408            let inner = self.inner.protected.read().unwrap();
409            match &inner.kind {
410                InodeSocketKind::PreSocket { props, addr, .. } => match props.ty {
411                    Socktype::Stream => {
412                        if addr.is_none() {
413                            tracing::warn!("wasi[?]::sock_listen - failed - address not set");
414                            return Err(Errno::Inval);
415                        }
416                        let addr = *addr.as_ref().unwrap();
417                        let only_v6 = props.only_v6;
418                        let reuse_port = props.reuse_port;
419                        let reuse_addr = props.reuse_addr;
420                        drop(inner);
421
422                        net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
423                    }
424                    ty => {
425                        tracing::warn!(
426                            "wasi[?]::sock_listen - failed - not supported(pre-socket:{:?})",
427                            ty
428                        );
429                        return Err(Errno::Notsup);
430                    }
431                },
432                InodeSocketKind::RemoteSocket {
433                    props,
434                    local_addr: addr,
435                    ..
436                } => match props.ty {
437                    Socktype::Stream => {
438                        let addr = *addr;
439                        let only_v6 = props.only_v6;
440                        let reuse_port = props.reuse_port;
441                        let reuse_addr = props.reuse_addr;
442                        drop(inner);
443
444                        net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
445                    }
446                    ty => {
447                        tracing::warn!(
448                            "wasi[?]::sock_listen - failed - not supported(remote-socket:{:?})",
449                            ty
450                        );
451                        return Err(Errno::Notsup);
452                    }
453                },
454                InodeSocketKind::Icmp(_) => {
455                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(icmp)");
456                    return Err(Errno::Notsup);
457                }
458                InodeSocketKind::Raw(_) => {
459                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(raw)");
460                    return Err(Errno::Notsup);
461                }
462                InodeSocketKind::TcpListener { .. } => {
463                    tracing::warn!(
464                        "wasi[?]::sock_listen - failed - already listening (tcp-listener)"
465                    );
466                    return Err(Errno::Notsup);
467                }
468                InodeSocketKind::TcpStream { .. } => {
469                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(tcp-stream)");
470                    return Err(Errno::Notsup);
471                }
472                InodeSocketKind::UdpSocket { .. } => {
473                    tracing::warn!("wasi[?]::sock_listen - failed - not supported(udp-socket)");
474                    return Err(Errno::Notsup);
475                }
476            }
477        };
478
479        tokio::select! {
480            socket = socket => {
481                let socket = socket.map_err(net_error_into_wasi_err)?;
482                Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
483                    socket,
484                    accept_timeout: Some(timeout),
485                })))
486            },
487            _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
488        }
489    }
490
491    pub async fn accept(
492        &self,
493        tasks: &dyn VirtualTaskManager,
494        nonblocking: bool,
495        timeout: Option<Duration>,
496    ) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno> {
497        struct SocketAccepter<'a> {
498            sock: &'a InodeSocket,
499            nonblocking: bool,
500            handler_registered: bool,
501        }
502        impl Drop for SocketAccepter<'_> {
503            fn drop(&mut self) {
504                if self.handler_registered {
505                    let mut inner = self.sock.inner.protected.write().unwrap();
506                    inner.remove_handler();
507                }
508            }
509        }
510        impl Future for SocketAccepter<'_> {
511            type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno>;
512            fn poll(
513                mut self: Pin<&mut Self>,
514                cx: &mut std::task::Context<'_>,
515            ) -> std::task::Poll<Self::Output> {
516                loop {
517                    let mut inner = self.sock.inner.protected.write().unwrap();
518                    return match &mut inner.kind {
519                        InodeSocketKind::TcpListener { socket, .. } => match socket.try_accept() {
520                            Ok((child, addr)) => Poll::Ready(Ok((child, addr))),
521                            Err(NetworkError::WouldBlock) if self.nonblocking => {
522                                Poll::Ready(Err(Errno::Again))
523                            }
524                            Err(NetworkError::WouldBlock) if !self.handler_registered => {
525                                let res = socket.set_handler(cx.waker().into());
526                                if let Err(err) = res {
527                                    return Poll::Ready(Err(net_error_into_wasi_err(err)));
528                                }
529                                drop(inner);
530                                self.handler_registered = true;
531                                continue;
532                            }
533                            Err(NetworkError::WouldBlock) => Poll::Pending,
534                            Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
535                        },
536                        InodeSocketKind::PreSocket { .. } => Poll::Ready(Err(Errno::Notconn)),
537                        _ => Poll::Ready(Err(Errno::Notsup)),
538                    };
539                }
540            }
541        }
542
543        let acceptor = SocketAccepter {
544            sock: self,
545            nonblocking,
546            handler_registered: false,
547        };
548        if let Some(timeout) = timeout {
549            tokio::select! {
550                res = acceptor => res,
551                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
552            }
553        } else {
554            acceptor.await
555        }
556    }
557
558    pub fn close(&self) -> Result<(), Errno> {
559        let mut inner = self.inner.protected.write().unwrap();
560        match &mut inner.kind {
561            InodeSocketKind::TcpListener { .. } => {}
562            InodeSocketKind::TcpStream { socket, .. } => {
563                socket.close().map_err(net_error_into_wasi_err)?;
564            }
565            InodeSocketKind::Icmp(_) => {}
566            InodeSocketKind::UdpSocket { .. } => {}
567            InodeSocketKind::Raw(_) => {}
568            InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
569            InodeSocketKind::RemoteSocket { .. } => {}
570        };
571        Ok(())
572    }
573
574    pub async fn connect(
575        &mut self,
576        tasks: &dyn VirtualTaskManager,
577        net: &dyn VirtualNetworking,
578        peer: SocketAddr,
579        timeout: Option<std::time::Duration>,
580        nonblocking: bool,
581    ) -> Result<Option<InodeSocket>, Errno> {
582        let new_write_timeout;
583        let new_read_timeout;
584
585        let timeout = timeout
586            .or_else(|| self.opt_time(TimeType::ConnectTimeout).ok().flatten())
587            .unwrap_or(Duration::from_secs(30));
588
589        let handler;
590        let connect = {
591            let mut inner = self.inner.protected.write().unwrap();
592            match &mut inner.kind {
593                InodeSocketKind::PreSocket { props, addr, .. } => {
594                    handler = props.handler.take();
595                    new_write_timeout = props.write_timeout;
596                    new_read_timeout = props.read_timeout;
597                    match props.ty {
598                        Socktype::Stream => {
599                            let no_delay = props.no_delay;
600                            let keep_alive = props.keep_alive;
601                            let dont_route = props.dont_route;
602                            let addr = match addr {
603                                Some(a) => *a,
604                                None => {
605                                    let ip = match peer.is_ipv4() {
606                                        true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
607                                        false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
608                                    };
609                                    SocketAddr::new(ip, 0)
610                                }
611                            };
612                            Box::pin(async move {
613                                let mut ret = net.connect_tcp(addr, peer).await?;
614                                if let Some(no_delay) = no_delay {
615                                    ret.set_nodelay(no_delay).ok();
616                                }
617                                if let Some(keep_alive) = keep_alive {
618                                    ret.set_keepalive(keep_alive).ok();
619                                }
620                                if let Some(dont_route) = dont_route {
621                                    ret.set_dontroute(dont_route).ok();
622                                }
623                                if !nonblocking {
624                                    futures::future::poll_fn(|cx| ret.poll_write_ready(cx)).await?;
625                                }
626                                Ok(ret)
627                            })
628                        }
629                        Socktype::Dgram => return Err(Errno::Inval),
630                        _ => return Err(Errno::Notsup),
631                    }
632                }
633                InodeSocketKind::UdpSocket {
634                    peer: target_peer, ..
635                } => {
636                    target_peer.replace(peer);
637                    return Ok(None);
638                }
639                InodeSocketKind::RemoteSocket { peer_addr, .. } => {
640                    *peer_addr = peer;
641                    return Ok(None);
642                }
643                _ => return Err(Errno::Notsup),
644            }
645        };
646
647        let mut socket = tokio::select! {
648            res = connect => res.map_err(net_error_into_wasi_err)?,
649            _ = tasks.sleep_now(timeout) => return Err(Errno::Timedout)
650        };
651
652        if let Some(handler) = handler {
653            socket
654                .set_handler(handler)
655                .map_err(net_error_into_wasi_err)?;
656        }
657
658        let socket = InodeSocket::new(InodeSocketKind::TcpStream {
659            socket,
660            write_timeout: new_write_timeout,
661            read_timeout: new_read_timeout,
662        });
663
664        Ok(Some(socket))
665    }
666
667    pub fn status(&self) -> Result<WasiSocketStatus, Errno> {
668        let inner = self.inner.protected.read().unwrap();
669        Ok(match &inner.kind {
670            InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
671            InodeSocketKind::TcpListener { .. } => WasiSocketStatus::Opened,
672            InodeSocketKind::TcpStream { socket, .. } => match socket.status() {
673                Ok(virtual_net::SocketStatus::Opening) => WasiSocketStatus::Opening,
674                Ok(virtual_net::SocketStatus::Opened) => WasiSocketStatus::Opened,
675                Ok(virtual_net::SocketStatus::Closed) => WasiSocketStatus::Closed,
676                Ok(virtual_net::SocketStatus::Failed) => WasiSocketStatus::Failed,
677                Err(_) => WasiSocketStatus::Failed,
678            },
679            InodeSocketKind::UdpSocket { .. } => WasiSocketStatus::Opened,
680            InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
681                true => WasiSocketStatus::Closed,
682                false => WasiSocketStatus::Opened,
683            },
684            _ => WasiSocketStatus::Failed,
685        })
686    }
687
688    pub fn addr_local(&self) -> Result<SocketAddr, Errno> {
689        let inner = self.inner.protected.read().unwrap();
690        Ok(match &inner.kind {
691            InodeSocketKind::PreSocket { props, addr, .. } => {
692                if let Some(addr) = addr {
693                    *addr
694                } else {
695                    SocketAddr::new(
696                        match props.family {
697                            Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
698                            Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
699                            _ => return Err(Errno::Inval),
700                        },
701                        0,
702                    )
703                }
704            }
705            InodeSocketKind::Icmp(sock) => sock.addr_local().map_err(net_error_into_wasi_err)?,
706            InodeSocketKind::TcpListener { socket, .. } => {
707                socket.addr_local().map_err(net_error_into_wasi_err)?
708            }
709            InodeSocketKind::TcpStream { socket, .. } => {
710                socket.addr_local().map_err(net_error_into_wasi_err)?
711            }
712            InodeSocketKind::UdpSocket { socket, .. } => {
713                socket.addr_local().map_err(net_error_into_wasi_err)?
714            }
715            InodeSocketKind::RemoteSocket {
716                local_addr: addr, ..
717            } => *addr,
718            _ => return Err(Errno::Notsup),
719        })
720    }
721
722    pub fn addr_peer(&self) -> Result<SocketAddr, Errno> {
723        let inner = self.inner.protected.read().unwrap();
724        Ok(match &inner.kind {
725            InodeSocketKind::PreSocket { props, .. } => SocketAddr::new(
726                match props.family {
727                    Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
728                    Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
729                    _ => return Err(Errno::Inval),
730                },
731                0,
732            ),
733            InodeSocketKind::TcpStream { socket, .. } => {
734                socket.addr_peer().map_err(net_error_into_wasi_err)?
735            }
736            InodeSocketKind::UdpSocket { socket, .. } => socket
737                .addr_peer()
738                .map_err(net_error_into_wasi_err)?
739                .map(Ok)
740                .unwrap_or_else(|| {
741                    socket
742                        .addr_local()
743                        .map_err(net_error_into_wasi_err)
744                        .map(|addr| {
745                            SocketAddr::new(
746                                match addr {
747                                    SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
748                                    SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
749                                },
750                                0,
751                            )
752                        })
753                })?,
754            InodeSocketKind::RemoteSocket { peer_addr, .. } => *peer_addr,
755            _ => return Err(Errno::Notsup),
756        })
757    }
758
759    pub fn set_opt_flag(&mut self, option: WasiSocketOption, val: bool) -> Result<(), Errno> {
760        let mut inner = self.inner.protected.write().unwrap();
761        match &mut inner.kind {
762            InodeSocketKind::PreSocket { props, .. }
763            | InodeSocketKind::RemoteSocket { props, .. } => {
764                match option {
765                    WasiSocketOption::OnlyV6 => props.only_v6 = val,
766                    WasiSocketOption::ReusePort => props.reuse_port = val,
767                    WasiSocketOption::ReuseAddr => props.reuse_addr = val,
768                    WasiSocketOption::NoDelay => props.no_delay = Some(val),
769                    WasiSocketOption::KeepAlive => props.keep_alive = Some(val),
770                    WasiSocketOption::DontRoute => props.dont_route = Some(val),
771                    _ => return Err(Errno::Inval),
772                };
773            }
774            InodeSocketKind::Raw(sock) => match option {
775                WasiSocketOption::Promiscuous => {
776                    sock.set_promiscuous(val).map_err(net_error_into_wasi_err)?
777                }
778                _ => return Err(Errno::Inval),
779            },
780            InodeSocketKind::TcpStream { socket, .. } => match option {
781                WasiSocketOption::NoDelay => {
782                    socket.set_nodelay(val).map_err(net_error_into_wasi_err)?
783                }
784                WasiSocketOption::KeepAlive => {
785                    socket.set_keepalive(val).map_err(net_error_into_wasi_err)?
786                }
787                WasiSocketOption::DontRoute => {
788                    socket.set_dontroute(val).map_err(net_error_into_wasi_err)?
789                }
790                _ => return Err(Errno::Inval),
791            },
792            InodeSocketKind::TcpListener { .. } => return Err(Errno::Inval),
793            InodeSocketKind::UdpSocket { socket, .. } => match option {
794                WasiSocketOption::Broadcast => {
795                    socket.set_broadcast(val).map_err(net_error_into_wasi_err)?
796                }
797                WasiSocketOption::MulticastLoopV4 => socket
798                    .set_multicast_loop_v4(val)
799                    .map_err(net_error_into_wasi_err)?,
800                WasiSocketOption::MulticastLoopV6 => socket
801                    .set_multicast_loop_v6(val)
802                    .map_err(net_error_into_wasi_err)?,
803                _ => return Err(Errno::Inval),
804            },
805            _ => return Err(Errno::Notsup),
806        }
807        Ok(())
808    }
809
810    pub fn get_opt_flag(&self, option: WasiSocketOption) -> Result<bool, Errno> {
811        let mut inner = self.inner.protected.write().unwrap();
812        Ok(match &mut inner.kind {
813            InodeSocketKind::PreSocket { props, .. }
814            | InodeSocketKind::RemoteSocket { props, .. } => match option {
815                WasiSocketOption::OnlyV6 => props.only_v6,
816                WasiSocketOption::ReusePort => props.reuse_port,
817                WasiSocketOption::ReuseAddr => props.reuse_addr,
818                WasiSocketOption::NoDelay => props.no_delay.unwrap_or_default(),
819                WasiSocketOption::KeepAlive => props.keep_alive.unwrap_or_default(),
820                _ => return Err(Errno::Inval),
821            },
822            InodeSocketKind::Raw(sock) => match option {
823                WasiSocketOption::Promiscuous => {
824                    sock.promiscuous().map_err(net_error_into_wasi_err)?
825                }
826                _ => return Err(Errno::Inval),
827            },
828            InodeSocketKind::TcpStream { socket, .. } => match option {
829                WasiSocketOption::NoDelay => socket.nodelay().map_err(net_error_into_wasi_err)?,
830                WasiSocketOption::KeepAlive => {
831                    socket.keepalive().map_err(net_error_into_wasi_err)?
832                }
833                WasiSocketOption::DontRoute => {
834                    socket.dontroute().map_err(net_error_into_wasi_err)?
835                }
836                _ => return Err(Errno::Inval),
837            },
838            InodeSocketKind::UdpSocket { socket, .. } => match option {
839                WasiSocketOption::Broadcast => {
840                    socket.broadcast().map_err(net_error_into_wasi_err)?
841                }
842                WasiSocketOption::MulticastLoopV4 => socket
843                    .multicast_loop_v4()
844                    .map_err(net_error_into_wasi_err)?,
845                WasiSocketOption::MulticastLoopV6 => socket
846                    .multicast_loop_v6()
847                    .map_err(net_error_into_wasi_err)?,
848                _ => return Err(Errno::Inval),
849            },
850            _ => return Err(Errno::Notsup),
851        })
852    }
853
854    pub fn set_send_buf_size(&mut self, size: usize) -> Result<(), Errno> {
855        let mut inner = self.inner.protected.write().unwrap();
856        match &mut inner.kind {
857            InodeSocketKind::PreSocket { props, .. }
858            | InodeSocketKind::RemoteSocket { props, .. } => {
859                props.send_buf_size = Some(size);
860            }
861            InodeSocketKind::TcpStream { socket, .. } => {
862                socket
863                    .set_send_buf_size(size)
864                    .map_err(net_error_into_wasi_err)?;
865            }
866            _ => return Err(Errno::Notsup),
867        }
868        Ok(())
869    }
870
871    pub fn send_buf_size(&self) -> Result<usize, Errno> {
872        let inner = self.inner.protected.read().unwrap();
873        match &inner.kind {
874            InodeSocketKind::PreSocket { props, .. }
875            | InodeSocketKind::RemoteSocket { props, .. } => {
876                Ok(props.send_buf_size.unwrap_or_default())
877            }
878            InodeSocketKind::TcpStream { socket, .. } => {
879                socket.send_buf_size().map_err(net_error_into_wasi_err)
880            }
881            _ => Err(Errno::Notsup),
882        }
883    }
884
885    pub fn set_recv_buf_size(&mut self, size: usize) -> Result<(), Errno> {
886        let mut inner = self.inner.protected.write().unwrap();
887        match &mut inner.kind {
888            InodeSocketKind::PreSocket { props, .. }
889            | InodeSocketKind::RemoteSocket { props, .. } => {
890                props.recv_buf_size = Some(size);
891            }
892            InodeSocketKind::TcpStream { socket, .. } => {
893                socket
894                    .set_recv_buf_size(size)
895                    .map_err(net_error_into_wasi_err)?;
896            }
897            _ => return Err(Errno::Notsup),
898        }
899        Ok(())
900    }
901
902    pub fn recv_buf_size(&self) -> Result<usize, Errno> {
903        let inner = self.inner.protected.read().unwrap();
904        match &inner.kind {
905            InodeSocketKind::PreSocket { props, .. }
906            | InodeSocketKind::RemoteSocket { props, .. } => {
907                Ok(props.recv_buf_size.unwrap_or_default())
908            }
909            InodeSocketKind::TcpStream { socket, .. } => {
910                socket.recv_buf_size().map_err(net_error_into_wasi_err)
911            }
912            _ => Err(Errno::Notsup),
913        }
914    }
915
916    pub fn set_linger(&mut self, linger: Option<std::time::Duration>) -> Result<(), Errno> {
917        let mut inner = self.inner.protected.write().unwrap();
918        match &mut inner.kind {
919            InodeSocketKind::TcpStream { socket, .. } => {
920                socket.set_linger(linger).map_err(net_error_into_wasi_err)
921            }
922            InodeSocketKind::RemoteSocket { .. } => Ok(()),
923            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
924            _ => Err(Errno::Notsup),
925        }
926    }
927
928    pub fn linger(&self) -> Result<Option<std::time::Duration>, Errno> {
929        let inner = self.inner.protected.read().unwrap();
930        match &inner.kind {
931            InodeSocketKind::TcpStream { socket, .. } => {
932                socket.linger().map_err(net_error_into_wasi_err)
933            }
934            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
935            _ => Err(Errno::Notsup),
936        }
937    }
938
939    pub fn set_opt_time(
940        &self,
941        ty: TimeType,
942        timeout: Option<std::time::Duration>,
943    ) -> Result<(), Errno> {
944        let mut inner = self.inner.protected.write().unwrap();
945        match &mut inner.kind {
946            InodeSocketKind::TcpStream {
947                write_timeout,
948                read_timeout,
949                ..
950            } => {
951                match ty {
952                    TimeType::WriteTimeout => *write_timeout = timeout,
953                    TimeType::ReadTimeout => *read_timeout = timeout,
954                    _ => return Err(Errno::Inval),
955                }
956                Ok(())
957            }
958            InodeSocketKind::TcpListener { accept_timeout, .. } => {
959                match ty {
960                    TimeType::AcceptTimeout => *accept_timeout = timeout,
961                    _ => return Err(Errno::Inval),
962                }
963                Ok(())
964            }
965            InodeSocketKind::PreSocket { props, .. }
966            | InodeSocketKind::RemoteSocket { props, .. } => {
967                match ty {
968                    TimeType::ConnectTimeout => props.connect_timeout = timeout,
969                    TimeType::AcceptTimeout => props.accept_timeout = timeout,
970                    TimeType::ReadTimeout => props.read_timeout = timeout,
971                    TimeType::WriteTimeout => props.write_timeout = timeout,
972                    _ => return Err(Errno::Io),
973                }
974                Ok(())
975            }
976            _ => Err(Errno::Notsup),
977        }
978    }
979
980    pub fn opt_time(&self, ty: TimeType) -> Result<Option<std::time::Duration>, Errno> {
981        let inner = self.inner.protected.read().unwrap();
982        match &inner.kind {
983            InodeSocketKind::TcpStream {
984                read_timeout,
985                write_timeout,
986                ..
987            } => Ok(match ty {
988                TimeType::ReadTimeout => *read_timeout,
989                TimeType::WriteTimeout => *write_timeout,
990                _ => return Err(Errno::Inval),
991            }),
992            InodeSocketKind::TcpListener { accept_timeout, .. } => Ok(match ty {
993                TimeType::AcceptTimeout => *accept_timeout,
994                _ => return Err(Errno::Inval),
995            }),
996            InodeSocketKind::PreSocket { props, .. }
997            | InodeSocketKind::RemoteSocket { props, .. } => match ty {
998                TimeType::ConnectTimeout => Ok(props.connect_timeout),
999                TimeType::AcceptTimeout => Ok(props.accept_timeout),
1000                TimeType::ReadTimeout => Ok(props.read_timeout),
1001                TimeType::WriteTimeout => Ok(props.write_timeout),
1002                _ => Err(Errno::Inval),
1003            },
1004            _ => Err(Errno::Notsup),
1005        }
1006    }
1007
1008    pub fn set_ttl(&self, ttl: u32) -> Result<(), Errno> {
1009        let mut inner = self.inner.protected.write().unwrap();
1010        match &mut inner.kind {
1011            InodeSocketKind::TcpStream { socket, .. } => {
1012                socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1013            }
1014            InodeSocketKind::UdpSocket { socket, .. } => {
1015                socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1016            }
1017            InodeSocketKind::RemoteSocket { ttl: set_ttl, .. } => {
1018                *set_ttl = ttl;
1019                Ok(())
1020            }
1021            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1022            _ => Err(Errno::Notsup),
1023        }
1024    }
1025
1026    pub fn ttl(&self) -> Result<u32, Errno> {
1027        let inner = self.inner.protected.read().unwrap();
1028        match &inner.kind {
1029            InodeSocketKind::TcpStream { socket, .. } => {
1030                socket.ttl().map_err(net_error_into_wasi_err)
1031            }
1032            InodeSocketKind::UdpSocket { socket, .. } => {
1033                socket.ttl().map_err(net_error_into_wasi_err)
1034            }
1035            InodeSocketKind::RemoteSocket { ttl, .. } => Ok(*ttl),
1036            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1037            _ => Err(Errno::Notsup),
1038        }
1039    }
1040
1041    pub fn set_multicast_ttl_v4(&self, ttl: u32) -> Result<(), Errno> {
1042        let mut inner = self.inner.protected.write().unwrap();
1043        match &mut inner.kind {
1044            InodeSocketKind::UdpSocket { socket, .. } => socket
1045                .set_multicast_ttl_v4(ttl)
1046                .map_err(net_error_into_wasi_err),
1047            InodeSocketKind::RemoteSocket {
1048                multicast_ttl: set_ttl,
1049                ..
1050            } => {
1051                *set_ttl = ttl;
1052                Ok(())
1053            }
1054            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1055            _ => Err(Errno::Notsup),
1056        }
1057    }
1058
1059    pub fn multicast_ttl_v4(&self) -> Result<u32, Errno> {
1060        let inner = self.inner.protected.read().unwrap();
1061        match &inner.kind {
1062            InodeSocketKind::UdpSocket { socket, .. } => {
1063                socket.multicast_ttl_v4().map_err(net_error_into_wasi_err)
1064            }
1065            InodeSocketKind::RemoteSocket { multicast_ttl, .. } => Ok(*multicast_ttl),
1066            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1067            _ => Err(Errno::Notsup),
1068        }
1069    }
1070
1071    pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1072        let mut inner = self.inner.protected.write().unwrap();
1073        match &mut inner.kind {
1074            InodeSocketKind::UdpSocket { socket, .. } => socket
1075                .join_multicast_v4(multiaddr, iface)
1076                .map_err(net_error_into_wasi_err),
1077            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1078            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1079            _ => Err(Errno::Notsup),
1080        }
1081    }
1082
1083    pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1084        let mut inner = self.inner.protected.write().unwrap();
1085        match &mut inner.kind {
1086            InodeSocketKind::UdpSocket { socket, .. } => socket
1087                .leave_multicast_v4(multiaddr, iface)
1088                .map_err(net_error_into_wasi_err),
1089            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1090            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1091            _ => Err(Errno::Notsup),
1092        }
1093    }
1094
1095    pub fn join_multicast_v6(&self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1096        let mut inner = self.inner.protected.write().unwrap();
1097        match &mut inner.kind {
1098            InodeSocketKind::UdpSocket { socket, .. } => socket
1099                .join_multicast_v6(multiaddr, iface)
1100                .map_err(net_error_into_wasi_err),
1101            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1102            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1103            _ => Err(Errno::Notsup),
1104        }
1105    }
1106
1107    pub fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1108        let mut inner = self.inner.protected.write().unwrap();
1109        match &mut inner.kind {
1110            InodeSocketKind::UdpSocket { socket, .. } => socket
1111                .leave_multicast_v6(multiaddr, iface)
1112                .map_err(net_error_into_wasi_err),
1113            InodeSocketKind::RemoteSocket { .. } => Ok(()),
1114            InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1115            _ => Err(Errno::Notsup),
1116        }
1117    }
1118
1119    pub async fn send(
1120        &self,
1121        tasks: &dyn VirtualTaskManager,
1122        buf: &[u8],
1123        timeout: Option<Duration>,
1124        nonblocking: bool,
1125    ) -> Result<usize, Errno> {
1126        struct SocketSender<'a, 'b> {
1127            inner: &'a InodeSocketInner,
1128            data: &'b [u8],
1129            nonblocking: bool,
1130            handler_registered: bool,
1131        }
1132        impl Drop for SocketSender<'_, '_> {
1133            fn drop(&mut self) {
1134                if self.handler_registered {
1135                    let mut inner = self.inner.protected.write().unwrap();
1136                    inner.remove_handler();
1137                }
1138            }
1139        }
1140        impl Future for SocketSender<'_, '_> {
1141            type Output = Result<usize, Errno>;
1142            fn poll(
1143                mut self: Pin<&mut Self>,
1144                cx: &mut std::task::Context<'_>,
1145            ) -> Poll<Self::Output> {
1146                loop {
1147                    let mut inner = self.inner.protected.write().unwrap();
1148                    let res = match &mut inner.kind {
1149                        InodeSocketKind::Raw(socket) => socket.try_send(self.data),
1150                        InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1151                        InodeSocketKind::UdpSocket { socket, peer } => {
1152                            if let Some(peer) = peer {
1153                                socket.try_send_to(self.data, *peer)
1154                            } else {
1155                                Err(NetworkError::NotConnected)
1156                            }
1157                        }
1158                        InodeSocketKind::PreSocket { .. } => {
1159                            return Poll::Ready(Err(Errno::Notconn));
1160                        }
1161                        InodeSocketKind::RemoteSocket { is_dead, .. } => {
1162                            return match is_dead {
1163                                true => Poll::Ready(Err(Errno::Connreset)),
1164                                false => Poll::Ready(Ok(self.data.len())),
1165                            };
1166                        }
1167                        _ => return Poll::Ready(Err(Errno::Notsup)),
1168                    };
1169                    return match res {
1170                        Ok(amt) => Poll::Ready(Ok(amt)),
1171                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1172                            Poll::Ready(Err(Errno::Again))
1173                        }
1174                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1175                            inner
1176                                .set_handler(cx.waker().into())
1177                                .map_err(net_error_into_wasi_err)?;
1178                            drop(inner);
1179                            self.handler_registered = true;
1180                            continue;
1181                        }
1182                        Err(NetworkError::WouldBlock) => Poll::Pending,
1183                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1184                    };
1185                }
1186            }
1187        }
1188
1189        let poller = SocketSender {
1190            inner: &self.inner,
1191            data: buf,
1192            nonblocking,
1193            handler_registered: false,
1194        };
1195        if let Some(timeout) = timeout {
1196            tokio::select! {
1197                res = poller => res,
1198                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1199            }
1200        } else {
1201            poller.await
1202        }
1203    }
1204
1205    pub async fn send_to<M: MemorySize>(
1206        &self,
1207        tasks: &dyn VirtualTaskManager,
1208        buf: &[u8],
1209        addr: SocketAddr,
1210        timeout: Option<Duration>,
1211        nonblocking: bool,
1212    ) -> Result<usize, Errno> {
1213        struct SocketSender<'a, 'b> {
1214            inner: &'a InodeSocketInner,
1215            data: &'b [u8],
1216            addr: SocketAddr,
1217            nonblocking: bool,
1218            handler_registered: bool,
1219        }
1220        impl Drop for SocketSender<'_, '_> {
1221            fn drop(&mut self) {
1222                if self.handler_registered {
1223                    let mut inner = self.inner.protected.write().unwrap();
1224                    inner.remove_handler();
1225                }
1226            }
1227        }
1228        impl Future for SocketSender<'_, '_> {
1229            type Output = Result<usize, Errno>;
1230            fn poll(
1231                mut self: Pin<&mut Self>,
1232                cx: &mut std::task::Context<'_>,
1233            ) -> Poll<Self::Output> {
1234                loop {
1235                    let mut inner = self.inner.protected.write().unwrap();
1236                    let res = match &mut inner.kind {
1237                        InodeSocketKind::Icmp(socket) => socket.try_send_to(self.data, self.addr),
1238                        InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1239                        InodeSocketKind::UdpSocket { socket, .. } => {
1240                            socket.try_send_to(self.data, self.addr)
1241                        }
1242                        InodeSocketKind::PreSocket { .. } => {
1243                            return Poll::Ready(Err(Errno::Notconn));
1244                        }
1245                        InodeSocketKind::RemoteSocket { is_dead, .. } => {
1246                            return match is_dead {
1247                                true => Poll::Ready(Err(Errno::Connreset)),
1248                                false => Poll::Ready(Ok(self.data.len())),
1249                            };
1250                        }
1251                        _ => return Poll::Ready(Err(Errno::Notsup)),
1252                    };
1253                    return match res {
1254                        Ok(amt) => Poll::Ready(Ok(amt)),
1255                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1256                            Poll::Ready(Err(Errno::Again))
1257                        }
1258                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1259                            inner
1260                                .set_handler(cx.waker().into())
1261                                .map_err(net_error_into_wasi_err)?;
1262                            self.handler_registered = true;
1263                            drop(inner);
1264                            continue;
1265                        }
1266                        Err(NetworkError::WouldBlock) => Poll::Pending,
1267                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1268                    };
1269                }
1270            }
1271        }
1272
1273        let poller = SocketSender {
1274            inner: &self.inner,
1275            data: buf,
1276            addr,
1277            nonblocking,
1278            handler_registered: false,
1279        };
1280        if let Some(timeout) = timeout {
1281            tokio::select! {
1282                res = poller => res,
1283                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1284            }
1285        } else {
1286            poller.await
1287        }
1288    }
1289
1290    pub async fn recv(
1291        &self,
1292        tasks: &dyn VirtualTaskManager,
1293        buf: &mut [MaybeUninit<u8>],
1294        timeout: Option<Duration>,
1295        nonblocking: bool,
1296        peek: bool,
1297    ) -> Result<usize, Errno> {
1298        struct SocketReceiver<'a, 'b> {
1299            inner: &'a InodeSocketInner,
1300            data: &'b mut [MaybeUninit<u8>],
1301            nonblocking: bool,
1302            peek: bool,
1303            handler_registered: bool,
1304        }
1305        impl Drop for SocketReceiver<'_, '_> {
1306            fn drop(&mut self) {
1307                if self.handler_registered {
1308                    let mut inner = self.inner.protected.write().unwrap();
1309                    inner.remove_handler();
1310                }
1311            }
1312        }
1313        impl Future for SocketReceiver<'_, '_> {
1314            type Output = Result<usize, Errno>;
1315            fn poll(
1316                mut self: Pin<&mut Self>,
1317                cx: &mut std::task::Context<'_>,
1318            ) -> Poll<Self::Output> {
1319                loop {
1320                    let peek = self.peek;
1321                    let mut inner = self.inner.protected.write().unwrap();
1322                    let res = match &mut inner.kind {
1323                        InodeSocketKind::Raw(socket) => socket.try_recv(self.data, peek),
1324                        InodeSocketKind::TcpStream { socket, .. } => {
1325                            socket.try_recv(self.data, peek)
1326                        }
1327                        InodeSocketKind::UdpSocket { socket, peer } => {
1328                            if let Some(peer) = peer {
1329                                match socket.try_recv_from(self.data, peek) {
1330                                    Ok((amt, addr)) if addr == *peer => Ok(amt),
1331                                    Ok(_) => Err(NetworkError::WouldBlock),
1332                                    Err(err) => Err(err),
1333                                }
1334                            } else {
1335                                match socket.try_recv_from(self.data, peek) {
1336                                    Ok((amt, _)) => Ok(amt),
1337                                    Err(err) => Err(err),
1338                                }
1339                            }
1340                        }
1341                        InodeSocketKind::RemoteSocket { is_dead, .. } => {
1342                            return match is_dead {
1343                                true => Poll::Ready(Ok(0)),
1344                                false => Poll::Pending,
1345                            };
1346                        }
1347                        InodeSocketKind::PreSocket { .. } => {
1348                            return Poll::Ready(Err(Errno::Notconn));
1349                        }
1350                        _ => return Poll::Ready(Err(Errno::Notsup)),
1351                    };
1352                    return match res {
1353                        Ok(amt) => Poll::Ready(Ok(amt)),
1354                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1355                            Poll::Ready(Err(Errno::Again))
1356                        }
1357                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1358                            inner
1359                                .set_handler(cx.waker().into())
1360                                .map_err(net_error_into_wasi_err)?;
1361                            self.handler_registered = true;
1362                            drop(inner);
1363                            continue;
1364                        }
1365
1366                        Err(NetworkError::WouldBlock) => Poll::Pending,
1367                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1368                    };
1369                }
1370            }
1371        }
1372
1373        let poller = SocketReceiver {
1374            inner: &self.inner,
1375            data: buf,
1376            nonblocking,
1377            peek,
1378            handler_registered: false,
1379        };
1380        if let Some(timeout) = timeout {
1381            tokio::select! {
1382                res = poller => res,
1383                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1384            }
1385        } else {
1386            poller.await
1387        }
1388    }
1389
1390    pub async fn recv_from(
1391        &self,
1392        tasks: &dyn VirtualTaskManager,
1393        buf: &mut [MaybeUninit<u8>],
1394        timeout: Option<Duration>,
1395        nonblocking: bool,
1396        peek: bool,
1397    ) -> Result<(usize, SocketAddr), Errno> {
1398        struct SocketReceiver<'a, 'b> {
1399            inner: &'a InodeSocketInner,
1400            data: &'b mut [MaybeUninit<u8>],
1401            nonblocking: bool,
1402            peek: bool,
1403            handler_registered: bool,
1404        }
1405        impl Drop for SocketReceiver<'_, '_> {
1406            fn drop(&mut self) {
1407                if self.handler_registered {
1408                    let mut inner = self.inner.protected.write().unwrap();
1409                    inner.remove_handler();
1410                }
1411            }
1412        }
1413        impl Future for SocketReceiver<'_, '_> {
1414            type Output = Result<(usize, SocketAddr), Errno>;
1415            fn poll(
1416                mut self: Pin<&mut Self>,
1417                cx: &mut std::task::Context<'_>,
1418            ) -> Poll<Self::Output> {
1419                let peek = self.peek;
1420                let mut inner = self.inner.protected.write().unwrap();
1421                loop {
1422                    let res = match &mut inner.kind {
1423                        InodeSocketKind::Icmp(socket) => socket.try_recv_from(self.data, peek),
1424                        InodeSocketKind::UdpSocket { socket, .. } => {
1425                            socket.try_recv_from(self.data, peek)
1426                        }
1427                        InodeSocketKind::RemoteSocket {
1428                            is_dead, peer_addr, ..
1429                        } => {
1430                            return match is_dead {
1431                                true => Poll::Ready(Ok((0, *peer_addr))),
1432                                false => Poll::Pending,
1433                            };
1434                        }
1435                        InodeSocketKind::PreSocket { .. } => {
1436                            return Poll::Ready(Err(Errno::Notconn));
1437                        }
1438                        _ => return Poll::Ready(Err(Errno::Notsup)),
1439                    };
1440                    return match res {
1441                        Ok((amt, addr)) => Poll::Ready(Ok((amt, addr))),
1442                        Err(NetworkError::WouldBlock) if self.nonblocking => {
1443                            Poll::Ready(Err(Errno::Again))
1444                        }
1445                        Err(NetworkError::WouldBlock) if !self.handler_registered => {
1446                            inner
1447                                .set_handler(cx.waker().into())
1448                                .map_err(net_error_into_wasi_err)?;
1449                            self.handler_registered = true;
1450                            continue;
1451                        }
1452                        Err(NetworkError::WouldBlock) => Poll::Pending,
1453                        Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1454                    };
1455                }
1456            }
1457        }
1458
1459        let poller = SocketReceiver {
1460            inner: &self.inner,
1461            data: buf,
1462            nonblocking,
1463            peek,
1464            handler_registered: false,
1465        };
1466        if let Some(timeout) = timeout {
1467            tokio::select! {
1468                res = poller => res,
1469                _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1470            }
1471        } else {
1472            poller.await
1473        }
1474    }
1475
1476    pub fn shutdown(&mut self, how: std::net::Shutdown) -> Result<(), Errno> {
1477        let mut inner = self.inner.protected.write().unwrap();
1478        match &mut inner.kind {
1479            InodeSocketKind::TcpStream { socket, .. } => {
1480                socket.shutdown(how).map_err(net_error_into_wasi_err)?;
1481            }
1482            InodeSocketKind::RemoteSocket { .. } => return Ok(()),
1483            InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
1484            _ => return Err(Errno::Notsup),
1485        }
1486        Ok(())
1487    }
1488
1489    pub async fn can_write(&self) -> bool {
1490        if let Ok(mut guard) = self.inner.protected.try_write() {
1491            #[allow(clippy::match_like_matches_macro)]
1492            match &mut guard.kind {
1493                InodeSocketKind::TcpStream { .. }
1494                | InodeSocketKind::UdpSocket { .. }
1495                | InodeSocketKind::Raw(..) => true,
1496                InodeSocketKind::RemoteSocket { is_dead, .. } => !(*is_dead),
1497                _ => false,
1498            }
1499        } else {
1500            false
1501        }
1502    }
1503}
1504
1505impl InodeSocketProtected {
1506    pub fn remove_handler(&mut self) {
1507        match &mut self.kind {
1508            InodeSocketKind::TcpListener { socket, .. } => socket.remove_handler(),
1509            InodeSocketKind::TcpStream { socket, .. } => socket.remove_handler(),
1510            InodeSocketKind::UdpSocket { socket, .. } => socket.remove_handler(),
1511            InodeSocketKind::Raw(socket) => socket.remove_handler(),
1512            InodeSocketKind::Icmp(socket) => socket.remove_handler(),
1513            InodeSocketKind::PreSocket { props, .. } => {
1514                props.handler.take();
1515            }
1516            InodeSocketKind::RemoteSocket { props, .. } => {
1517                props.handler.take();
1518            }
1519        }
1520    }
1521
1522    pub fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1523        match &mut self.kind {
1524            InodeSocketKind::TcpListener { socket, .. } => socket.poll_read_ready(cx),
1525            InodeSocketKind::TcpStream { socket, .. } => socket.poll_read_ready(cx),
1526            InodeSocketKind::UdpSocket { socket, .. } => socket.poll_read_ready(cx),
1527            InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
1528            InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
1529            InodeSocketKind::PreSocket { .. } => Poll::Pending,
1530            InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1531                true => Poll::Ready(Ok(0)),
1532                false => Poll::Pending,
1533            },
1534        }
1535        .map_err(net_error_into_io_err)
1536    }
1537
1538    pub fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1539        match &mut self.kind {
1540            InodeSocketKind::TcpListener { socket, .. } => socket.poll_write_ready(cx),
1541            InodeSocketKind::TcpStream { socket, .. } => socket.poll_write_ready(cx),
1542            InodeSocketKind::UdpSocket { socket, .. } => socket.poll_write_ready(cx),
1543            InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
1544            InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
1545            InodeSocketKind::PreSocket { .. } => Poll::Pending,
1546            InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1547                true => Poll::Ready(Ok(0)),
1548                false => Poll::Pending,
1549            },
1550        }
1551        .map_err(net_error_into_io_err)
1552    }
1553
1554    pub fn set_handler(
1555        &mut self,
1556        handler: Box<dyn InterestHandler + Send + Sync>,
1557    ) -> virtual_net::Result<()> {
1558        match &mut self.kind {
1559            InodeSocketKind::TcpListener { socket, .. } => socket.set_handler(handler),
1560            InodeSocketKind::TcpStream { socket, .. } => socket.set_handler(handler),
1561            InodeSocketKind::UdpSocket { socket, .. } => socket.set_handler(handler),
1562            InodeSocketKind::Raw(socket) => socket.set_handler(handler),
1563            InodeSocketKind::Icmp(socket) => socket.set_handler(handler),
1564            InodeSocketKind::PreSocket { props, .. }
1565            | InodeSocketKind::RemoteSocket { props, .. } => {
1566                props.handler.replace(handler);
1567                Ok(())
1568            }
1569        }
1570    }
1571}
1572
1573// TODO: review allow...
1574#[allow(dead_code)]
1575pub(crate) fn all_socket_rights() -> Rights {
1576    Rights::FD_FDSTAT_SET_FLAGS
1577        .union(Rights::FD_FILESTAT_GET)
1578        .union(Rights::FD_READ)
1579        .union(Rights::FD_WRITE)
1580        .union(Rights::POLL_FD_READWRITE)
1581        .union(Rights::SOCK_SHUTDOWN)
1582        .union(Rights::SOCK_CONNECT)
1583        .union(Rights::SOCK_LISTEN)
1584        .union(Rights::SOCK_BIND)
1585        .union(Rights::SOCK_ACCEPT)
1586        .union(Rights::SOCK_RECV)
1587        .union(Rights::SOCK_SEND)
1588        .union(Rights::SOCK_ADDR_LOCAL)
1589        .union(Rights::SOCK_ADDR_REMOTE)
1590        .union(Rights::SOCK_RECV_FROM)
1591        .union(Rights::SOCK_SEND_TO)
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596    use super::{InodeSocket, InodeSocketKind, WasiSocketStatus};
1597    use std::{
1598        mem::MaybeUninit,
1599        net::{Ipv4Addr, Shutdown, SocketAddr},
1600        pin::Pin,
1601        sync::{
1602            Arc,
1603            atomic::{AtomicUsize, Ordering},
1604        },
1605        task::{Context, Poll},
1606        time::Duration,
1607    };
1608    use virtual_mio::InterestHandler;
1609    use virtual_net::{
1610        NetworkError, Result as NetResult, SocketStatus, VirtualConnectedSocket, VirtualIoSource,
1611        VirtualSocket, VirtualTcpSocket,
1612    };
1613
1614    #[derive(Debug)]
1615    struct MockTcpSocket {
1616        read_calls: Arc<AtomicUsize>,
1617        write_calls: Arc<AtomicUsize>,
1618        status: Arc<AtomicUsize>,
1619    }
1620
1621    const MOCK_STATUS_OPENING: usize = 0;
1622    const MOCK_STATUS_OPENED: usize = 1;
1623
1624    fn decode_mock_status(value: usize) -> SocketStatus {
1625        match value {
1626            MOCK_STATUS_OPENED => SocketStatus::Opened,
1627            _ => SocketStatus::Opening,
1628        }
1629    }
1630
1631    impl VirtualIoSource for MockTcpSocket {
1632        fn remove_handler(&mut self) {}
1633
1634        fn poll_read_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1635            self.read_calls.fetch_add(1, Ordering::Relaxed);
1636            Poll::Ready(Ok(3))
1637        }
1638
1639        fn poll_write_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1640            self.write_calls.fetch_add(1, Ordering::Relaxed);
1641            self.status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1642            Poll::Ready(Ok(7))
1643        }
1644    }
1645
1646    impl VirtualSocket for MockTcpSocket {
1647        fn set_ttl(&mut self, _ttl: u32) -> NetResult<()> {
1648            Ok(())
1649        }
1650
1651        fn ttl(&self) -> NetResult<u32> {
1652            Ok(64)
1653        }
1654
1655        fn addr_local(&self) -> NetResult<SocketAddr> {
1656            Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)))
1657        }
1658
1659        fn status(&self) -> NetResult<SocketStatus> {
1660            Ok(decode_mock_status(self.status.load(Ordering::Relaxed)))
1661        }
1662
1663        fn set_handler(
1664            &mut self,
1665            _handler: Box<dyn InterestHandler + Send + Sync>,
1666        ) -> NetResult<()> {
1667            Ok(())
1668        }
1669    }
1670
1671    impl VirtualConnectedSocket for MockTcpSocket {
1672        fn set_linger(&mut self, _linger: Option<Duration>) -> NetResult<()> {
1673            Ok(())
1674        }
1675
1676        fn linger(&self) -> NetResult<Option<Duration>> {
1677            Ok(None)
1678        }
1679
1680        fn try_send(&mut self, _data: &[u8]) -> NetResult<usize> {
1681            Err(NetworkError::Unsupported)
1682        }
1683
1684        fn try_flush(&mut self) -> NetResult<()> {
1685            Err(NetworkError::Unsupported)
1686        }
1687
1688        fn close(&mut self) -> NetResult<()> {
1689            Ok(())
1690        }
1691
1692        fn try_recv(&mut self, _buf: &mut [MaybeUninit<u8>], _peek: bool) -> NetResult<usize> {
1693            Err(NetworkError::Unsupported)
1694        }
1695    }
1696
1697    impl VirtualTcpSocket for MockTcpSocket {
1698        fn set_recv_buf_size(&mut self, _size: usize) -> NetResult<()> {
1699            Ok(())
1700        }
1701
1702        fn recv_buf_size(&self) -> NetResult<usize> {
1703            Ok(0)
1704        }
1705
1706        fn set_send_buf_size(&mut self, _size: usize) -> NetResult<()> {
1707            Ok(())
1708        }
1709
1710        fn send_buf_size(&self) -> NetResult<usize> {
1711            Ok(0)
1712        }
1713
1714        fn set_nodelay(&mut self, _reuse: bool) -> NetResult<()> {
1715            Ok(())
1716        }
1717
1718        fn nodelay(&self) -> NetResult<bool> {
1719            Ok(true)
1720        }
1721
1722        fn set_keepalive(&mut self, _keepalive: bool) -> NetResult<()> {
1723            Ok(())
1724        }
1725
1726        fn keepalive(&self) -> NetResult<bool> {
1727            Ok(false)
1728        }
1729
1730        fn set_dontroute(&mut self, _keepalive: bool) -> NetResult<()> {
1731            Ok(())
1732        }
1733
1734        fn dontroute(&self) -> NetResult<bool> {
1735            Ok(false)
1736        }
1737
1738        fn addr_peer(&self) -> NetResult<SocketAddr> {
1739            Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 80)))
1740        }
1741
1742        fn shutdown(&mut self, _how: Shutdown) -> NetResult<()> {
1743            Ok(())
1744        }
1745
1746        fn is_closed(&self) -> bool {
1747            false
1748        }
1749    }
1750
1751    #[test]
1752    fn inode_socket_poll_write_ready_uses_write_path() {
1753        let read_calls = Arc::new(AtomicUsize::new(0));
1754        let write_calls = Arc::new(AtomicUsize::new(0));
1755        let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENED));
1756        let mut inode = InodeSocket::new(InodeSocketKind::TcpStream {
1757            socket: Box::new(MockTcpSocket {
1758                read_calls: read_calls.clone(),
1759                write_calls: write_calls.clone(),
1760                status,
1761            }),
1762            write_timeout: None,
1763            read_timeout: None,
1764        });
1765
1766        let waker = futures::task::noop_waker();
1767        let mut cx = Context::from_waker(&waker);
1768        let ready = Pin::new(&mut inode).poll_write_ready(&mut cx);
1769
1770        assert!(matches!(ready, Poll::Ready(Ok(7))));
1771        assert_eq!(read_calls.load(Ordering::Relaxed), 0);
1772        assert_eq!(write_calls.load(Ordering::Relaxed), 1);
1773    }
1774
1775    #[test]
1776    fn inode_socket_status_tracks_tcp_socket_status() {
1777        let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENING));
1778        let inode = InodeSocket::new(InodeSocketKind::TcpStream {
1779            socket: Box::new(MockTcpSocket {
1780                read_calls: Arc::new(AtomicUsize::new(0)),
1781                write_calls: Arc::new(AtomicUsize::new(0)),
1782                status: status.clone(),
1783            }),
1784            write_timeout: None,
1785            read_timeout: None,
1786        });
1787
1788        assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opening));
1789        status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1790        assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opened));
1791    }
1792}