1use std::{
2 future::Future,
3 io,
4 mem::MaybeUninit,
5 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
6 pin::Pin,
7 sync::{Arc, RwLock, RwLockWriteGuard},
8 task::{Context, Poll},
9 time::Duration,
10};
11
12#[cfg(feature = "enable-serde")]
13use serde_derive::{Deserialize, Serialize};
14use virtual_mio::InterestHandler;
15use virtual_net::{
16 NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpListener,
17 VirtualTcpSocket, VirtualUdpSocket, net_error_into_io_err,
18};
19use wasmer_types::MemorySize;
20use wasmer_wasix_types::wasi::{Addressfamily, Errno, Rights, SockProto, Sockoption, Socktype};
21
22use crate::{VirtualTaskManager, net::net_error_into_wasi_err};
23
24#[derive(Debug)]
25#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
26pub enum InodeHttpSocketType {
27 Request,
29 Response,
31 Headers,
33}
34
35#[derive(Debug)]
36pub struct SocketProperties {
37 pub family: Addressfamily,
38 pub ty: Socktype,
39 pub pt: SockProto,
40 pub only_v6: bool,
41 pub reuse_port: bool,
42 pub reuse_addr: bool,
43 pub no_delay: Option<bool>,
44 pub keep_alive: Option<bool>,
45 pub dont_route: Option<bool>,
46 pub send_buf_size: Option<usize>,
47 pub recv_buf_size: Option<usize>,
48 pub write_timeout: Option<Duration>,
49 pub read_timeout: Option<Duration>,
50 pub accept_timeout: Option<Duration>,
51 pub connect_timeout: Option<Duration>,
52 pub handler: Option<Box<dyn InterestHandler + Send + Sync>>,
53}
54
55#[derive(Debug)]
56pub enum InodeSocketKind {
58 PreSocket {
59 props: SocketProperties,
60 addr: Option<SocketAddr>,
61 },
62 Icmp(Box<dyn VirtualIcmpSocket + Sync>),
63 Raw(Box<dyn VirtualRawSocket + Sync>),
64 TcpListener {
65 socket: Box<dyn VirtualTcpListener + Sync>,
66 accept_timeout: Option<Duration>,
67 },
68 TcpStream {
69 socket: Box<dyn VirtualTcpSocket + Sync>,
70 write_timeout: Option<Duration>,
71 read_timeout: Option<Duration>,
72 },
73 UdpSocket {
74 socket: Box<dyn VirtualUdpSocket + Sync>,
75 peer: Option<SocketAddr>,
76 },
77 RemoteSocket {
78 props: SocketProperties,
79 local_addr: SocketAddr,
80 peer_addr: SocketAddr,
81 ttl: u32,
82 multicast_ttl: u32,
83 is_dead: bool,
84 },
85}
86
87pub enum WasiSocketOption {
88 Noop,
89 ReusePort,
90 ReuseAddr,
91 NoDelay,
92 DontRoute,
93 OnlyV6,
94 Broadcast,
95 MulticastLoopV4,
96 MulticastLoopV6,
97 Promiscuous,
98 Listening,
99 LastError,
100 KeepAlive,
101 Linger,
102 OobInline,
103 RecvBufSize,
104 SendBufSize,
105 RecvLowat,
106 SendLowat,
107 RecvTimeout,
108 SendTimeout,
109 ConnectTimeout,
110 AcceptTimeout,
111 Ttl,
112 MulticastTtlV4,
113 Type,
114 Proto,
115}
116
117impl TryFrom<Sockoption> for WasiSocketOption {
118 type Error = Errno;
119
120 fn try_from(opt: Sockoption) -> Result<Self, Self::Error> {
121 use WasiSocketOption::*;
122 match opt {
123 Sockoption::Noop => Ok(Noop),
124 Sockoption::ReusePort => Ok(ReusePort),
125 Sockoption::ReuseAddr => Ok(ReuseAddr),
126 Sockoption::NoDelay => Ok(NoDelay),
127 Sockoption::DontRoute => Ok(DontRoute),
128 Sockoption::OnlyV6 => Ok(OnlyV6),
129 Sockoption::Broadcast => Ok(Broadcast),
130 Sockoption::MulticastLoopV4 => Ok(MulticastLoopV4),
131 Sockoption::MulticastLoopV6 => Ok(MulticastLoopV6),
132 Sockoption::Promiscuous => Ok(Promiscuous),
133 Sockoption::Listening => Ok(Listening),
134 Sockoption::LastError => Ok(LastError),
135 Sockoption::KeepAlive => Ok(KeepAlive),
136 Sockoption::Linger => Ok(Linger),
137 Sockoption::OobInline => Ok(OobInline),
138 Sockoption::RecvBufSize => Ok(RecvBufSize),
139 Sockoption::SendBufSize => Ok(SendBufSize),
140 Sockoption::RecvLowat => Ok(RecvLowat),
141 Sockoption::SendLowat => Ok(SendLowat),
142 Sockoption::RecvTimeout => Ok(RecvTimeout),
143 Sockoption::SendTimeout => Ok(SendTimeout),
144 Sockoption::ConnectTimeout => Ok(ConnectTimeout),
145 Sockoption::AcceptTimeout => Ok(AcceptTimeout),
146 Sockoption::Ttl => Ok(Ttl),
147 Sockoption::MulticastTtlV4 => Ok(MulticastTtlV4),
148 Sockoption::Type => Ok(Type),
149 Sockoption::Proto => Ok(Proto),
150 _ => Err(Errno::Inval),
151 }
152 }
153}
154
155#[derive(Debug)]
156pub enum WasiSocketStatus {
157 Opening,
158 Opened,
159 Closed,
160 Failed,
161}
162
163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
165pub enum TimeType {
166 ReadTimeout,
167 WriteTimeout,
168 AcceptTimeout,
169 ConnectTimeout,
170 BindTimeout,
171 Linger,
172}
173
174impl From<TimeType> for wasmer_journal::SocketOptTimeType {
175 fn from(value: TimeType) -> Self {
176 match value {
177 TimeType::ReadTimeout => Self::ReadTimeout,
178 TimeType::WriteTimeout => Self::WriteTimeout,
179 TimeType::AcceptTimeout => Self::AcceptTimeout,
180 TimeType::ConnectTimeout => Self::ConnectTimeout,
181 TimeType::BindTimeout => Self::BindTimeout,
182 TimeType::Linger => Self::Linger,
183 }
184 }
185}
186
187impl From<wasmer_journal::SocketOptTimeType> for TimeType {
188 fn from(value: wasmer_journal::SocketOptTimeType) -> Self {
189 use wasmer_journal::SocketOptTimeType;
190 match value {
191 SocketOptTimeType::ReadTimeout => TimeType::ReadTimeout,
192 SocketOptTimeType::WriteTimeout => TimeType::WriteTimeout,
193 SocketOptTimeType::AcceptTimeout => TimeType::AcceptTimeout,
194 SocketOptTimeType::ConnectTimeout => TimeType::ConnectTimeout,
195 SocketOptTimeType::BindTimeout => TimeType::BindTimeout,
196 SocketOptTimeType::Linger => TimeType::Linger,
197 }
198 }
199}
200
201#[derive(Debug)]
202pub(crate) struct InodeSocketProtected {
204 pub kind: InodeSocketKind,
205}
206
207#[derive(Debug)]
208pub(crate) struct InodeSocketInner {
210 pub protected: RwLock<InodeSocketProtected>,
211}
212
213#[derive(Debug, Clone)]
214pub struct InodeSocket {
216 pub(crate) inner: Arc<InodeSocketInner>,
217}
218
219impl InodeSocket {
220 pub fn new(kind: InodeSocketKind) -> Self {
221 let protected = InodeSocketProtected { kind };
222 Self {
223 inner: Arc::new(InodeSocketInner {
224 protected: RwLock::new(protected),
225 }),
226 }
227 }
228
229 pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
230 let mut inner = self.inner.protected.write().unwrap();
231 inner.poll_read_ready(cx)
232 }
233
234 pub fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
235 let mut inner = self.inner.protected.write().unwrap();
236 inner.poll_write_ready(cx)
237 }
238
239 #[allow(clippy::await_holding_lock, clippy::readonly_write_lock)]
244 pub async fn auto_bind_udp(
245 &self,
246 tasks: &dyn VirtualTaskManager,
247 net: &dyn VirtualNetworking,
248 ) -> Result<Option<InodeSocket>, Errno> {
249 let timeout = self
250 .opt_time(TimeType::BindTimeout)
251 .ok()
252 .flatten()
253 .unwrap_or(Duration::from_secs(30));
254 let inner = self.inner.protected.write().unwrap();
255 match &inner.kind {
256 InodeSocketKind::PreSocket { props, .. } if props.ty == Socktype::Dgram => {
257 let addr = match props.family {
258 Addressfamily::Inet4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
259 Addressfamily::Inet6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
260 _ => return Err(Errno::Notsup),
261 };
262 Self::bind_internal(tasks, net, addr, timeout, inner).await
263 }
264 _ => Ok(None),
265 }
266 }
267
268 pub async fn bind(
269 &self,
270 tasks: &dyn VirtualTaskManager,
271 net: &dyn VirtualNetworking,
272 set_addr: SocketAddr,
273 ) -> Result<Option<InodeSocket>, Errno> {
274 let timeout = self
275 .opt_time(TimeType::BindTimeout)
276 .ok()
277 .flatten()
278 .unwrap_or(Duration::from_secs(30));
279 let inner = self.inner.protected.write().unwrap();
280 Self::bind_internal(tasks, net, set_addr, timeout, inner).await
281 }
282
283 #[allow(clippy::await_holding_lock)]
285 async fn bind_internal(
286 tasks: &dyn VirtualTaskManager,
287 net: &dyn VirtualNetworking,
288 set_addr: SocketAddr,
289 timeout: Duration,
290 mut inner: RwLockWriteGuard<'_, InodeSocketProtected>,
291 ) -> Result<Option<InodeSocket>, Errno> {
292 let socket = {
293 match &mut inner.kind {
294 InodeSocketKind::PreSocket { props, addr, .. } => {
295 match props.family {
296 Addressfamily::Inet4 => {
297 if !set_addr.is_ipv4() {
298 tracing::debug!(
299 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
300 );
301 return Err(Errno::Inval);
302 }
303 }
304 Addressfamily::Inet6 => {
305 if !set_addr.is_ipv6() {
306 tracing::debug!(
307 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
308 );
309 return Err(Errno::Inval);
310 }
311 }
312 _ => {
313 return Err(Errno::Notsup);
314 }
315 }
316
317 addr.replace(set_addr);
318 let addr = (*addr).unwrap();
319
320 match props.ty {
321 Socktype::Stream => {
322 return Ok(None);
325 }
326 Socktype::Dgram => {
327 let reuse_port = props.reuse_port;
328 let reuse_addr = props.reuse_addr;
329
330 net.bind_udp(addr, reuse_port, reuse_addr)
331 }
332 _ => return Err(Errno::Inval),
333 }
334 }
335 InodeSocketKind::RemoteSocket {
336 props,
337 local_addr: addr,
338 ..
339 } => {
340 match props.family {
341 Addressfamily::Inet4 => {
342 if !set_addr.is_ipv4() {
343 tracing::debug!(
344 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
345 );
346 return Err(Errno::Inval);
347 }
348 }
349 Addressfamily::Inet6 => {
350 if !set_addr.is_ipv6() {
351 tracing::debug!(
352 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
353 );
354 return Err(Errno::Inval);
355 }
356 }
357 _ => {
358 return Err(Errno::Notsup);
359 }
360 }
361
362 *addr = set_addr;
363 let addr = *addr;
364
365 match props.ty {
366 Socktype::Stream => {
367 return Ok(None);
370 }
371 Socktype::Dgram => {
372 let reuse_port = props.reuse_port;
373 let reuse_addr = props.reuse_addr;
374
375 net.bind_udp(addr, reuse_port, reuse_addr)
376 }
377 _ => return Err(Errno::Inval),
378 }
379 }
380 _ => return Err(Errno::Notsup),
381 }
382 };
383
384 drop(inner);
385
386 tokio::select! {
387 socket = socket => {
388 let socket = socket.map_err(net_error_into_wasi_err)?;
389 Ok(Some(InodeSocket::new(InodeSocketKind::UdpSocket { socket, peer: None })))
390 },
391 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
392 }
393 }
394
395 pub async fn listen(
396 &self,
397 tasks: &dyn VirtualTaskManager,
398 net: &dyn VirtualNetworking,
399 _backlog: usize,
400 ) -> Result<Option<InodeSocket>, Errno> {
401 let timeout = self
402 .opt_time(TimeType::AcceptTimeout)
403 .ok()
404 .flatten()
405 .unwrap_or(Duration::from_secs(30));
406
407 let socket = {
408 let inner = self.inner.protected.read().unwrap();
409 match &inner.kind {
410 InodeSocketKind::PreSocket { props, addr, .. } => match props.ty {
411 Socktype::Stream => {
412 if addr.is_none() {
413 tracing::warn!("wasi[?]::sock_listen - failed - address not set");
414 return Err(Errno::Inval);
415 }
416 let addr = *addr.as_ref().unwrap();
417 let only_v6 = props.only_v6;
418 let reuse_port = props.reuse_port;
419 let reuse_addr = props.reuse_addr;
420 drop(inner);
421
422 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
423 }
424 ty => {
425 tracing::warn!(
426 "wasi[?]::sock_listen - failed - not supported(pre-socket:{:?})",
427 ty
428 );
429 return Err(Errno::Notsup);
430 }
431 },
432 InodeSocketKind::RemoteSocket {
433 props,
434 local_addr: addr,
435 ..
436 } => match props.ty {
437 Socktype::Stream => {
438 let addr = *addr;
439 let only_v6 = props.only_v6;
440 let reuse_port = props.reuse_port;
441 let reuse_addr = props.reuse_addr;
442 drop(inner);
443
444 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
445 }
446 ty => {
447 tracing::warn!(
448 "wasi[?]::sock_listen - failed - not supported(remote-socket:{:?})",
449 ty
450 );
451 return Err(Errno::Notsup);
452 }
453 },
454 InodeSocketKind::Icmp(_) => {
455 tracing::warn!("wasi[?]::sock_listen - failed - not supported(icmp)");
456 return Err(Errno::Notsup);
457 }
458 InodeSocketKind::Raw(_) => {
459 tracing::warn!("wasi[?]::sock_listen - failed - not supported(raw)");
460 return Err(Errno::Notsup);
461 }
462 InodeSocketKind::TcpListener { .. } => {
463 tracing::warn!(
464 "wasi[?]::sock_listen - failed - already listening (tcp-listener)"
465 );
466 return Err(Errno::Notsup);
467 }
468 InodeSocketKind::TcpStream { .. } => {
469 tracing::warn!("wasi[?]::sock_listen - failed - not supported(tcp-stream)");
470 return Err(Errno::Notsup);
471 }
472 InodeSocketKind::UdpSocket { .. } => {
473 tracing::warn!("wasi[?]::sock_listen - failed - not supported(udp-socket)");
474 return Err(Errno::Notsup);
475 }
476 }
477 };
478
479 tokio::select! {
480 socket = socket => {
481 let socket = socket.map_err(net_error_into_wasi_err)?;
482 Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
483 socket,
484 accept_timeout: Some(timeout),
485 })))
486 },
487 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
488 }
489 }
490
491 pub async fn accept(
492 &self,
493 tasks: &dyn VirtualTaskManager,
494 nonblocking: bool,
495 timeout: Option<Duration>,
496 ) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno> {
497 struct SocketAccepter<'a> {
498 sock: &'a InodeSocket,
499 nonblocking: bool,
500 handler_registered: bool,
501 }
502 impl Drop for SocketAccepter<'_> {
503 fn drop(&mut self) {
504 if self.handler_registered {
505 let mut inner = self.sock.inner.protected.write().unwrap();
506 inner.remove_handler();
507 }
508 }
509 }
510 impl Future for SocketAccepter<'_> {
511 type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno>;
512 fn poll(
513 mut self: Pin<&mut Self>,
514 cx: &mut std::task::Context<'_>,
515 ) -> std::task::Poll<Self::Output> {
516 loop {
517 let mut inner = self.sock.inner.protected.write().unwrap();
518 return match &mut inner.kind {
519 InodeSocketKind::TcpListener { socket, .. } => match socket.try_accept() {
520 Ok((child, addr)) => Poll::Ready(Ok((child, addr))),
521 Err(NetworkError::WouldBlock) if self.nonblocking => {
522 Poll::Ready(Err(Errno::Again))
523 }
524 Err(NetworkError::WouldBlock) if !self.handler_registered => {
525 let res = socket.set_handler(cx.waker().into());
526 if let Err(err) = res {
527 return Poll::Ready(Err(net_error_into_wasi_err(err)));
528 }
529 drop(inner);
530 self.handler_registered = true;
531 continue;
532 }
533 Err(NetworkError::WouldBlock) => Poll::Pending,
534 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
535 },
536 InodeSocketKind::PreSocket { .. } => Poll::Ready(Err(Errno::Notconn)),
537 _ => Poll::Ready(Err(Errno::Notsup)),
538 };
539 }
540 }
541 }
542
543 let acceptor = SocketAccepter {
544 sock: self,
545 nonblocking,
546 handler_registered: false,
547 };
548 if let Some(timeout) = timeout {
549 tokio::select! {
550 res = acceptor => res,
551 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
552 }
553 } else {
554 acceptor.await
555 }
556 }
557
558 pub fn close(&self) -> Result<(), Errno> {
559 let mut inner = self.inner.protected.write().unwrap();
560 match &mut inner.kind {
561 InodeSocketKind::TcpListener { .. } => {}
562 InodeSocketKind::TcpStream { socket, .. } => {
563 socket.close().map_err(net_error_into_wasi_err)?;
564 }
565 InodeSocketKind::Icmp(_) => {}
566 InodeSocketKind::UdpSocket { .. } => {}
567 InodeSocketKind::Raw(_) => {}
568 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
569 InodeSocketKind::RemoteSocket { .. } => {}
570 };
571 Ok(())
572 }
573
574 pub async fn connect(
575 &mut self,
576 tasks: &dyn VirtualTaskManager,
577 net: &dyn VirtualNetworking,
578 peer: SocketAddr,
579 timeout: Option<std::time::Duration>,
580 nonblocking: bool,
581 ) -> Result<Option<InodeSocket>, Errno> {
582 let new_write_timeout;
583 let new_read_timeout;
584
585 let timeout = timeout.unwrap_or(Duration::from_secs(30));
586
587 let handler;
588 let connect = {
589 let mut inner = self.inner.protected.write().unwrap();
590 match &mut inner.kind {
591 InodeSocketKind::PreSocket { props, addr, .. } => {
592 handler = props.handler.take();
593 new_write_timeout = props.write_timeout;
594 new_read_timeout = props.read_timeout;
595 match props.ty {
596 Socktype::Stream => {
597 let no_delay = props.no_delay;
598 let keep_alive = props.keep_alive;
599 let dont_route = props.dont_route;
600 let addr = match addr {
601 Some(a) => *a,
602 None => {
603 let ip = match peer.is_ipv4() {
604 true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
605 false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
606 };
607 SocketAddr::new(ip, 0)
608 }
609 };
610 Box::pin(async move {
611 let mut ret = net.connect_tcp(addr, peer).await?;
612 if let Some(no_delay) = no_delay {
613 ret.set_nodelay(no_delay).ok();
614 }
615 if let Some(keep_alive) = keep_alive {
616 ret.set_keepalive(keep_alive).ok();
617 }
618 if let Some(dont_route) = dont_route {
619 ret.set_dontroute(dont_route).ok();
620 }
621 if !nonblocking {
622 futures::future::poll_fn(|cx| ret.poll_write_ready(cx)).await?;
623 }
624 Ok(ret)
625 })
626 }
627 Socktype::Dgram => return Err(Errno::Inval),
628 _ => return Err(Errno::Notsup),
629 }
630 }
631 InodeSocketKind::UdpSocket {
632 peer: target_peer, ..
633 } => {
634 target_peer.replace(peer);
635 return Ok(None);
636 }
637 InodeSocketKind::RemoteSocket { peer_addr, .. } => {
638 *peer_addr = peer;
639 return Ok(None);
640 }
641 _ => return Err(Errno::Notsup),
642 }
643 };
644
645 let mut socket = tokio::select! {
646 res = connect => res.map_err(net_error_into_wasi_err)?,
647 _ = tasks.sleep_now(timeout) => return Err(Errno::Timedout)
648 };
649
650 if let Some(handler) = handler {
651 socket
652 .set_handler(handler)
653 .map_err(net_error_into_wasi_err)?;
654 }
655
656 let socket = InodeSocket::new(InodeSocketKind::TcpStream {
657 socket,
658 write_timeout: new_write_timeout,
659 read_timeout: new_read_timeout,
660 });
661
662 Ok(Some(socket))
663 }
664
665 pub fn status(&self) -> Result<WasiSocketStatus, Errno> {
666 let inner = self.inner.protected.read().unwrap();
667 Ok(match &inner.kind {
668 InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
669 InodeSocketKind::TcpListener { .. } => WasiSocketStatus::Opened,
670 InodeSocketKind::TcpStream { socket, .. } => match socket.status() {
671 Ok(virtual_net::SocketStatus::Opening) => WasiSocketStatus::Opening,
672 Ok(virtual_net::SocketStatus::Opened) => WasiSocketStatus::Opened,
673 Ok(virtual_net::SocketStatus::Closed) => WasiSocketStatus::Closed,
674 Ok(virtual_net::SocketStatus::Failed) => WasiSocketStatus::Failed,
675 Err(_) => WasiSocketStatus::Failed,
676 },
677 InodeSocketKind::UdpSocket { .. } => WasiSocketStatus::Opened,
678 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
679 true => WasiSocketStatus::Closed,
680 false => WasiSocketStatus::Opened,
681 },
682 _ => WasiSocketStatus::Failed,
683 })
684 }
685
686 pub fn addr_local(&self) -> Result<SocketAddr, Errno> {
687 let inner = self.inner.protected.read().unwrap();
688 Ok(match &inner.kind {
689 InodeSocketKind::PreSocket { props, addr, .. } => {
690 if let Some(addr) = addr {
691 *addr
692 } else {
693 SocketAddr::new(
694 match props.family {
695 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
696 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
697 _ => return Err(Errno::Inval),
698 },
699 0,
700 )
701 }
702 }
703 InodeSocketKind::Icmp(sock) => sock.addr_local().map_err(net_error_into_wasi_err)?,
704 InodeSocketKind::TcpListener { socket, .. } => {
705 socket.addr_local().map_err(net_error_into_wasi_err)?
706 }
707 InodeSocketKind::TcpStream { socket, .. } => {
708 socket.addr_local().map_err(net_error_into_wasi_err)?
709 }
710 InodeSocketKind::UdpSocket { socket, .. } => {
711 socket.addr_local().map_err(net_error_into_wasi_err)?
712 }
713 InodeSocketKind::RemoteSocket {
714 local_addr: addr, ..
715 } => *addr,
716 _ => return Err(Errno::Notsup),
717 })
718 }
719
720 pub fn addr_peer(&self) -> Result<SocketAddr, Errno> {
721 let inner = self.inner.protected.read().unwrap();
722 Ok(match &inner.kind {
723 InodeSocketKind::PreSocket { props, .. } => SocketAddr::new(
724 match props.family {
725 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
726 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
727 _ => return Err(Errno::Inval),
728 },
729 0,
730 ),
731 InodeSocketKind::TcpStream { socket, .. } => {
732 socket.addr_peer().map_err(net_error_into_wasi_err)?
733 }
734 InodeSocketKind::UdpSocket { socket, .. } => socket
735 .addr_peer()
736 .map_err(net_error_into_wasi_err)?
737 .map(Ok)
738 .unwrap_or_else(|| {
739 socket
740 .addr_local()
741 .map_err(net_error_into_wasi_err)
742 .map(|addr| {
743 SocketAddr::new(
744 match addr {
745 SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
746 SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
747 },
748 0,
749 )
750 })
751 })?,
752 InodeSocketKind::RemoteSocket { peer_addr, .. } => *peer_addr,
753 _ => return Err(Errno::Notsup),
754 })
755 }
756
757 pub fn set_opt_flag(&mut self, option: WasiSocketOption, val: bool) -> Result<(), Errno> {
758 let mut inner = self.inner.protected.write().unwrap();
759 match &mut inner.kind {
760 InodeSocketKind::PreSocket { props, .. }
761 | InodeSocketKind::RemoteSocket { props, .. } => {
762 match option {
763 WasiSocketOption::OnlyV6 => props.only_v6 = val,
764 WasiSocketOption::ReusePort => props.reuse_port = val,
765 WasiSocketOption::ReuseAddr => props.reuse_addr = val,
766 WasiSocketOption::NoDelay => props.no_delay = Some(val),
767 WasiSocketOption::KeepAlive => props.keep_alive = Some(val),
768 WasiSocketOption::DontRoute => props.dont_route = Some(val),
769 _ => return Err(Errno::Inval),
770 };
771 }
772 InodeSocketKind::Raw(sock) => match option {
773 WasiSocketOption::Promiscuous => {
774 sock.set_promiscuous(val).map_err(net_error_into_wasi_err)?
775 }
776 _ => return Err(Errno::Inval),
777 },
778 InodeSocketKind::TcpStream { socket, .. } => match option {
779 WasiSocketOption::NoDelay => {
780 socket.set_nodelay(val).map_err(net_error_into_wasi_err)?
781 }
782 WasiSocketOption::KeepAlive => {
783 socket.set_keepalive(val).map_err(net_error_into_wasi_err)?
784 }
785 WasiSocketOption::DontRoute => {
786 socket.set_dontroute(val).map_err(net_error_into_wasi_err)?
787 }
788 _ => return Err(Errno::Inval),
789 },
790 InodeSocketKind::TcpListener { .. } => return Err(Errno::Inval),
791 InodeSocketKind::UdpSocket { socket, .. } => match option {
792 WasiSocketOption::Broadcast => {
793 socket.set_broadcast(val).map_err(net_error_into_wasi_err)?
794 }
795 WasiSocketOption::MulticastLoopV4 => socket
796 .set_multicast_loop_v4(val)
797 .map_err(net_error_into_wasi_err)?,
798 WasiSocketOption::MulticastLoopV6 => socket
799 .set_multicast_loop_v6(val)
800 .map_err(net_error_into_wasi_err)?,
801 _ => return Err(Errno::Inval),
802 },
803 _ => return Err(Errno::Notsup),
804 }
805 Ok(())
806 }
807
808 pub fn get_opt_flag(&self, option: WasiSocketOption) -> Result<bool, Errno> {
809 let mut inner = self.inner.protected.write().unwrap();
810 Ok(match &mut inner.kind {
811 InodeSocketKind::PreSocket { props, .. }
812 | InodeSocketKind::RemoteSocket { props, .. } => match option {
813 WasiSocketOption::OnlyV6 => props.only_v6,
814 WasiSocketOption::ReusePort => props.reuse_port,
815 WasiSocketOption::ReuseAddr => props.reuse_addr,
816 WasiSocketOption::NoDelay => props.no_delay.unwrap_or_default(),
817 WasiSocketOption::KeepAlive => props.keep_alive.unwrap_or_default(),
818 _ => return Err(Errno::Inval),
819 },
820 InodeSocketKind::Raw(sock) => match option {
821 WasiSocketOption::Promiscuous => {
822 sock.promiscuous().map_err(net_error_into_wasi_err)?
823 }
824 _ => return Err(Errno::Inval),
825 },
826 InodeSocketKind::TcpStream { socket, .. } => match option {
827 WasiSocketOption::NoDelay => socket.nodelay().map_err(net_error_into_wasi_err)?,
828 WasiSocketOption::KeepAlive => {
829 socket.keepalive().map_err(net_error_into_wasi_err)?
830 }
831 WasiSocketOption::DontRoute => {
832 socket.dontroute().map_err(net_error_into_wasi_err)?
833 }
834 _ => return Err(Errno::Inval),
835 },
836 InodeSocketKind::UdpSocket { socket, .. } => match option {
837 WasiSocketOption::Broadcast => {
838 socket.broadcast().map_err(net_error_into_wasi_err)?
839 }
840 WasiSocketOption::MulticastLoopV4 => socket
841 .multicast_loop_v4()
842 .map_err(net_error_into_wasi_err)?,
843 WasiSocketOption::MulticastLoopV6 => socket
844 .multicast_loop_v6()
845 .map_err(net_error_into_wasi_err)?,
846 _ => return Err(Errno::Inval),
847 },
848 _ => return Err(Errno::Notsup),
849 })
850 }
851
852 pub fn set_send_buf_size(&mut self, size: usize) -> Result<(), Errno> {
853 let mut inner = self.inner.protected.write().unwrap();
854 match &mut inner.kind {
855 InodeSocketKind::PreSocket { props, .. }
856 | InodeSocketKind::RemoteSocket { props, .. } => {
857 props.send_buf_size = Some(size);
858 }
859 InodeSocketKind::TcpStream { socket, .. } => {
860 socket
861 .set_send_buf_size(size)
862 .map_err(net_error_into_wasi_err)?;
863 }
864 _ => return Err(Errno::Notsup),
865 }
866 Ok(())
867 }
868
869 pub fn send_buf_size(&self) -> Result<usize, Errno> {
870 let inner = self.inner.protected.read().unwrap();
871 match &inner.kind {
872 InodeSocketKind::PreSocket { props, .. }
873 | InodeSocketKind::RemoteSocket { props, .. } => {
874 Ok(props.send_buf_size.unwrap_or_default())
875 }
876 InodeSocketKind::TcpStream { socket, .. } => {
877 socket.send_buf_size().map_err(net_error_into_wasi_err)
878 }
879 _ => Err(Errno::Notsup),
880 }
881 }
882
883 pub fn set_recv_buf_size(&mut self, size: usize) -> Result<(), Errno> {
884 let mut inner = self.inner.protected.write().unwrap();
885 match &mut inner.kind {
886 InodeSocketKind::PreSocket { props, .. }
887 | InodeSocketKind::RemoteSocket { props, .. } => {
888 props.recv_buf_size = Some(size);
889 }
890 InodeSocketKind::TcpStream { socket, .. } => {
891 socket
892 .set_recv_buf_size(size)
893 .map_err(net_error_into_wasi_err)?;
894 }
895 _ => return Err(Errno::Notsup),
896 }
897 Ok(())
898 }
899
900 pub fn recv_buf_size(&self) -> Result<usize, Errno> {
901 let inner = self.inner.protected.read().unwrap();
902 match &inner.kind {
903 InodeSocketKind::PreSocket { props, .. }
904 | InodeSocketKind::RemoteSocket { props, .. } => {
905 Ok(props.recv_buf_size.unwrap_or_default())
906 }
907 InodeSocketKind::TcpStream { socket, .. } => {
908 socket.recv_buf_size().map_err(net_error_into_wasi_err)
909 }
910 _ => Err(Errno::Notsup),
911 }
912 }
913
914 pub fn set_linger(&mut self, linger: Option<std::time::Duration>) -> Result<(), Errno> {
915 let mut inner = self.inner.protected.write().unwrap();
916 match &mut inner.kind {
917 InodeSocketKind::TcpStream { socket, .. } => {
918 socket.set_linger(linger).map_err(net_error_into_wasi_err)
919 }
920 InodeSocketKind::RemoteSocket { .. } => Ok(()),
921 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
922 _ => Err(Errno::Notsup),
923 }
924 }
925
926 pub fn linger(&self) -> Result<Option<std::time::Duration>, Errno> {
927 let inner = self.inner.protected.read().unwrap();
928 match &inner.kind {
929 InodeSocketKind::TcpStream { socket, .. } => {
930 socket.linger().map_err(net_error_into_wasi_err)
931 }
932 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
933 _ => Err(Errno::Notsup),
934 }
935 }
936
937 pub fn set_opt_time(
938 &self,
939 ty: TimeType,
940 timeout: Option<std::time::Duration>,
941 ) -> Result<(), Errno> {
942 let mut inner = self.inner.protected.write().unwrap();
943 match &mut inner.kind {
944 InodeSocketKind::TcpStream {
945 write_timeout,
946 read_timeout,
947 ..
948 } => {
949 match ty {
950 TimeType::WriteTimeout => *write_timeout = timeout,
951 TimeType::ReadTimeout => *read_timeout = timeout,
952 _ => return Err(Errno::Inval),
953 }
954 Ok(())
955 }
956 InodeSocketKind::TcpListener { accept_timeout, .. } => {
957 match ty {
958 TimeType::AcceptTimeout => *accept_timeout = timeout,
959 _ => return Err(Errno::Inval),
960 }
961 Ok(())
962 }
963 InodeSocketKind::PreSocket { props, .. }
964 | InodeSocketKind::RemoteSocket { props, .. } => {
965 match ty {
966 TimeType::ConnectTimeout => props.connect_timeout = timeout,
967 TimeType::AcceptTimeout => props.accept_timeout = timeout,
968 TimeType::ReadTimeout => props.read_timeout = timeout,
969 TimeType::WriteTimeout => props.write_timeout = timeout,
970 _ => return Err(Errno::Io),
971 }
972 Ok(())
973 }
974 _ => Err(Errno::Notsup),
975 }
976 }
977
978 pub fn opt_time(&self, ty: TimeType) -> Result<Option<std::time::Duration>, Errno> {
979 let inner = self.inner.protected.read().unwrap();
980 match &inner.kind {
981 InodeSocketKind::TcpStream {
982 read_timeout,
983 write_timeout,
984 ..
985 } => Ok(match ty {
986 TimeType::ReadTimeout => *read_timeout,
987 TimeType::WriteTimeout => *write_timeout,
988 _ => return Err(Errno::Inval),
989 }),
990 InodeSocketKind::TcpListener { accept_timeout, .. } => Ok(match ty {
991 TimeType::AcceptTimeout => *accept_timeout,
992 _ => return Err(Errno::Inval),
993 }),
994 InodeSocketKind::PreSocket { props, .. }
995 | InodeSocketKind::RemoteSocket { props, .. } => match ty {
996 TimeType::ConnectTimeout => Ok(props.connect_timeout),
997 TimeType::AcceptTimeout => Ok(props.accept_timeout),
998 TimeType::ReadTimeout => Ok(props.read_timeout),
999 TimeType::WriteTimeout => Ok(props.write_timeout),
1000 _ => Err(Errno::Inval),
1001 },
1002 _ => Err(Errno::Notsup),
1003 }
1004 }
1005
1006 pub fn set_ttl(&self, ttl: u32) -> Result<(), Errno> {
1007 let mut inner = self.inner.protected.write().unwrap();
1008 match &mut inner.kind {
1009 InodeSocketKind::TcpStream { socket, .. } => {
1010 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1011 }
1012 InodeSocketKind::UdpSocket { socket, .. } => {
1013 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1014 }
1015 InodeSocketKind::RemoteSocket { ttl: set_ttl, .. } => {
1016 *set_ttl = ttl;
1017 Ok(())
1018 }
1019 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1020 _ => Err(Errno::Notsup),
1021 }
1022 }
1023
1024 pub fn ttl(&self) -> Result<u32, Errno> {
1025 let inner = self.inner.protected.read().unwrap();
1026 match &inner.kind {
1027 InodeSocketKind::TcpStream { socket, .. } => {
1028 socket.ttl().map_err(net_error_into_wasi_err)
1029 }
1030 InodeSocketKind::UdpSocket { socket, .. } => {
1031 socket.ttl().map_err(net_error_into_wasi_err)
1032 }
1033 InodeSocketKind::RemoteSocket { ttl, .. } => Ok(*ttl),
1034 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1035 _ => Err(Errno::Notsup),
1036 }
1037 }
1038
1039 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> Result<(), Errno> {
1040 let mut inner = self.inner.protected.write().unwrap();
1041 match &mut inner.kind {
1042 InodeSocketKind::UdpSocket { socket, .. } => socket
1043 .set_multicast_ttl_v4(ttl)
1044 .map_err(net_error_into_wasi_err),
1045 InodeSocketKind::RemoteSocket {
1046 multicast_ttl: set_ttl,
1047 ..
1048 } => {
1049 *set_ttl = ttl;
1050 Ok(())
1051 }
1052 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1053 _ => Err(Errno::Notsup),
1054 }
1055 }
1056
1057 pub fn multicast_ttl_v4(&self) -> Result<u32, Errno> {
1058 let inner = self.inner.protected.read().unwrap();
1059 match &inner.kind {
1060 InodeSocketKind::UdpSocket { socket, .. } => {
1061 socket.multicast_ttl_v4().map_err(net_error_into_wasi_err)
1062 }
1063 InodeSocketKind::RemoteSocket { multicast_ttl, .. } => Ok(*multicast_ttl),
1064 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1065 _ => Err(Errno::Notsup),
1066 }
1067 }
1068
1069 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1070 let mut inner = self.inner.protected.write().unwrap();
1071 match &mut inner.kind {
1072 InodeSocketKind::UdpSocket { socket, .. } => socket
1073 .join_multicast_v4(multiaddr, iface)
1074 .map_err(net_error_into_wasi_err),
1075 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1076 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1077 _ => Err(Errno::Notsup),
1078 }
1079 }
1080
1081 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1082 let mut inner = self.inner.protected.write().unwrap();
1083 match &mut inner.kind {
1084 InodeSocketKind::UdpSocket { socket, .. } => socket
1085 .leave_multicast_v4(multiaddr, iface)
1086 .map_err(net_error_into_wasi_err),
1087 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1088 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1089 _ => Err(Errno::Notsup),
1090 }
1091 }
1092
1093 pub fn join_multicast_v6(&self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1094 let mut inner = self.inner.protected.write().unwrap();
1095 match &mut inner.kind {
1096 InodeSocketKind::UdpSocket { socket, .. } => socket
1097 .join_multicast_v6(multiaddr, iface)
1098 .map_err(net_error_into_wasi_err),
1099 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1100 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1101 _ => Err(Errno::Notsup),
1102 }
1103 }
1104
1105 pub fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1106 let mut inner = self.inner.protected.write().unwrap();
1107 match &mut inner.kind {
1108 InodeSocketKind::UdpSocket { socket, .. } => socket
1109 .leave_multicast_v6(multiaddr, iface)
1110 .map_err(net_error_into_wasi_err),
1111 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1112 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1113 _ => Err(Errno::Notsup),
1114 }
1115 }
1116
1117 pub async fn send(
1118 &self,
1119 tasks: &dyn VirtualTaskManager,
1120 buf: &[u8],
1121 timeout: Option<Duration>,
1122 nonblocking: bool,
1123 ) -> Result<usize, Errno> {
1124 struct SocketSender<'a, 'b> {
1125 inner: &'a InodeSocketInner,
1126 data: &'b [u8],
1127 nonblocking: bool,
1128 handler_registered: bool,
1129 }
1130 impl Drop for SocketSender<'_, '_> {
1131 fn drop(&mut self) {
1132 if self.handler_registered {
1133 let mut inner = self.inner.protected.write().unwrap();
1134 inner.remove_handler();
1135 }
1136 }
1137 }
1138 impl Future for SocketSender<'_, '_> {
1139 type Output = Result<usize, Errno>;
1140 fn poll(
1141 mut self: Pin<&mut Self>,
1142 cx: &mut std::task::Context<'_>,
1143 ) -> Poll<Self::Output> {
1144 loop {
1145 let mut inner = self.inner.protected.write().unwrap();
1146 let res = match &mut inner.kind {
1147 InodeSocketKind::Raw(socket) => socket.try_send(self.data),
1148 InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1149 InodeSocketKind::UdpSocket { socket, peer } => {
1150 if let Some(peer) = peer {
1151 socket.try_send_to(self.data, *peer)
1152 } else {
1153 Err(NetworkError::NotConnected)
1154 }
1155 }
1156 InodeSocketKind::PreSocket { .. } => {
1157 return Poll::Ready(Err(Errno::Notconn));
1158 }
1159 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1160 return match is_dead {
1161 true => Poll::Ready(Err(Errno::Connreset)),
1162 false => Poll::Ready(Ok(self.data.len())),
1163 };
1164 }
1165 _ => return Poll::Ready(Err(Errno::Notsup)),
1166 };
1167 return match res {
1168 Ok(amt) => Poll::Ready(Ok(amt)),
1169 Err(NetworkError::WouldBlock) if self.nonblocking => {
1170 Poll::Ready(Err(Errno::Again))
1171 }
1172 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1173 inner
1174 .set_handler(cx.waker().into())
1175 .map_err(net_error_into_wasi_err)?;
1176 drop(inner);
1177 self.handler_registered = true;
1178 continue;
1179 }
1180 Err(NetworkError::WouldBlock) => Poll::Pending,
1181 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1182 };
1183 }
1184 }
1185 }
1186
1187 let poller = SocketSender {
1188 inner: &self.inner,
1189 data: buf,
1190 nonblocking,
1191 handler_registered: false,
1192 };
1193 if let Some(timeout) = timeout {
1194 tokio::select! {
1195 res = poller => res,
1196 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1197 }
1198 } else {
1199 poller.await
1200 }
1201 }
1202
1203 pub async fn send_to<M: MemorySize>(
1204 &self,
1205 tasks: &dyn VirtualTaskManager,
1206 buf: &[u8],
1207 addr: SocketAddr,
1208 timeout: Option<Duration>,
1209 nonblocking: bool,
1210 ) -> Result<usize, Errno> {
1211 struct SocketSender<'a, 'b> {
1212 inner: &'a InodeSocketInner,
1213 data: &'b [u8],
1214 addr: SocketAddr,
1215 nonblocking: bool,
1216 handler_registered: bool,
1217 }
1218 impl Drop for SocketSender<'_, '_> {
1219 fn drop(&mut self) {
1220 if self.handler_registered {
1221 let mut inner = self.inner.protected.write().unwrap();
1222 inner.remove_handler();
1223 }
1224 }
1225 }
1226 impl Future for SocketSender<'_, '_> {
1227 type Output = Result<usize, Errno>;
1228 fn poll(
1229 mut self: Pin<&mut Self>,
1230 cx: &mut std::task::Context<'_>,
1231 ) -> Poll<Self::Output> {
1232 loop {
1233 let mut inner = self.inner.protected.write().unwrap();
1234 let res = match &mut inner.kind {
1235 InodeSocketKind::Icmp(socket) => socket.try_send_to(self.data, self.addr),
1236 InodeSocketKind::UdpSocket { socket, .. } => {
1237 socket.try_send_to(self.data, self.addr)
1238 }
1239 InodeSocketKind::PreSocket { .. } => {
1240 return Poll::Ready(Err(Errno::Notconn));
1241 }
1242 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1243 return match is_dead {
1244 true => Poll::Ready(Err(Errno::Connreset)),
1245 false => Poll::Ready(Ok(self.data.len())),
1246 };
1247 }
1248 _ => return Poll::Ready(Err(Errno::Notsup)),
1249 };
1250 return match res {
1251 Ok(amt) => Poll::Ready(Ok(amt)),
1252 Err(NetworkError::WouldBlock) if self.nonblocking => {
1253 Poll::Ready(Err(Errno::Again))
1254 }
1255 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1256 inner
1257 .set_handler(cx.waker().into())
1258 .map_err(net_error_into_wasi_err)?;
1259 self.handler_registered = true;
1260 drop(inner);
1261 continue;
1262 }
1263 Err(NetworkError::WouldBlock) => Poll::Pending,
1264 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1265 };
1266 }
1267 }
1268 }
1269
1270 let poller = SocketSender {
1271 inner: &self.inner,
1272 data: buf,
1273 addr,
1274 nonblocking,
1275 handler_registered: false,
1276 };
1277 if let Some(timeout) = timeout {
1278 tokio::select! {
1279 res = poller => res,
1280 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1281 }
1282 } else {
1283 poller.await
1284 }
1285 }
1286
1287 pub async fn recv(
1288 &self,
1289 tasks: &dyn VirtualTaskManager,
1290 buf: &mut [MaybeUninit<u8>],
1291 timeout: Option<Duration>,
1292 nonblocking: bool,
1293 peek: bool,
1294 ) -> Result<usize, Errno> {
1295 struct SocketReceiver<'a, 'b> {
1296 inner: &'a InodeSocketInner,
1297 data: &'b mut [MaybeUninit<u8>],
1298 nonblocking: bool,
1299 peek: bool,
1300 handler_registered: bool,
1301 }
1302 impl Drop for SocketReceiver<'_, '_> {
1303 fn drop(&mut self) {
1304 if self.handler_registered {
1305 let mut inner = self.inner.protected.write().unwrap();
1306 inner.remove_handler();
1307 }
1308 }
1309 }
1310 impl Future for SocketReceiver<'_, '_> {
1311 type Output = Result<usize, Errno>;
1312 fn poll(
1313 mut self: Pin<&mut Self>,
1314 cx: &mut std::task::Context<'_>,
1315 ) -> Poll<Self::Output> {
1316 loop {
1317 let peek = self.peek;
1318 let mut inner = self.inner.protected.write().unwrap();
1319 let res = match &mut inner.kind {
1320 InodeSocketKind::Raw(socket) => socket.try_recv(self.data, peek),
1321 InodeSocketKind::TcpStream { socket, .. } => {
1322 socket.try_recv(self.data, peek)
1323 }
1324 InodeSocketKind::UdpSocket { socket, peer } => {
1325 if let Some(peer) = peer {
1326 match socket.try_recv_from(self.data, peek) {
1327 Ok((amt, addr)) if addr == *peer => Ok(amt),
1328 Ok(_) => Err(NetworkError::WouldBlock),
1329 Err(err) => Err(err),
1330 }
1331 } else {
1332 match socket.try_recv_from(self.data, peek) {
1333 Ok((amt, _)) => Ok(amt),
1334 Err(err) => Err(err),
1335 }
1336 }
1337 }
1338 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1339 return match is_dead {
1340 true => Poll::Ready(Ok(0)),
1341 false => Poll::Pending,
1342 };
1343 }
1344 InodeSocketKind::PreSocket { .. } => {
1345 return Poll::Ready(Err(Errno::Notconn));
1346 }
1347 _ => return Poll::Ready(Err(Errno::Notsup)),
1348 };
1349 return match res {
1350 Ok(amt) => Poll::Ready(Ok(amt)),
1351 Err(NetworkError::WouldBlock) if self.nonblocking => {
1352 Poll::Ready(Err(Errno::Again))
1353 }
1354 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1355 inner
1356 .set_handler(cx.waker().into())
1357 .map_err(net_error_into_wasi_err)?;
1358 self.handler_registered = true;
1359 drop(inner);
1360 continue;
1361 }
1362
1363 Err(NetworkError::WouldBlock) => Poll::Pending,
1364 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1365 };
1366 }
1367 }
1368 }
1369
1370 let poller = SocketReceiver {
1371 inner: &self.inner,
1372 data: buf,
1373 nonblocking,
1374 peek,
1375 handler_registered: false,
1376 };
1377 if let Some(timeout) = timeout {
1378 tokio::select! {
1379 res = poller => res,
1380 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1381 }
1382 } else {
1383 poller.await
1384 }
1385 }
1386
1387 pub async fn recv_from(
1388 &self,
1389 tasks: &dyn VirtualTaskManager,
1390 buf: &mut [MaybeUninit<u8>],
1391 timeout: Option<Duration>,
1392 nonblocking: bool,
1393 peek: bool,
1394 ) -> Result<(usize, SocketAddr), Errno> {
1395 struct SocketReceiver<'a, 'b> {
1396 inner: &'a InodeSocketInner,
1397 data: &'b mut [MaybeUninit<u8>],
1398 nonblocking: bool,
1399 peek: bool,
1400 handler_registered: bool,
1401 }
1402 impl Drop for SocketReceiver<'_, '_> {
1403 fn drop(&mut self) {
1404 if self.handler_registered {
1405 let mut inner = self.inner.protected.write().unwrap();
1406 inner.remove_handler();
1407 }
1408 }
1409 }
1410 impl Future for SocketReceiver<'_, '_> {
1411 type Output = Result<(usize, SocketAddr), Errno>;
1412 fn poll(
1413 mut self: Pin<&mut Self>,
1414 cx: &mut std::task::Context<'_>,
1415 ) -> Poll<Self::Output> {
1416 let peek = self.peek;
1417 let mut inner = self.inner.protected.write().unwrap();
1418 loop {
1419 let res = match &mut inner.kind {
1420 InodeSocketKind::Icmp(socket) => socket.try_recv_from(self.data, peek),
1421 InodeSocketKind::UdpSocket { socket, .. } => {
1422 socket.try_recv_from(self.data, peek)
1423 }
1424 InodeSocketKind::RemoteSocket {
1425 is_dead, peer_addr, ..
1426 } => {
1427 return match is_dead {
1428 true => Poll::Ready(Ok((0, *peer_addr))),
1429 false => Poll::Pending,
1430 };
1431 }
1432 InodeSocketKind::PreSocket { .. } => {
1433 return Poll::Ready(Err(Errno::Notconn));
1434 }
1435 _ => return Poll::Ready(Err(Errno::Notsup)),
1436 };
1437 return match res {
1438 Ok((amt, addr)) => Poll::Ready(Ok((amt, addr))),
1439 Err(NetworkError::WouldBlock) if self.nonblocking => {
1440 Poll::Ready(Err(Errno::Again))
1441 }
1442 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1443 inner
1444 .set_handler(cx.waker().into())
1445 .map_err(net_error_into_wasi_err)?;
1446 self.handler_registered = true;
1447 continue;
1448 }
1449 Err(NetworkError::WouldBlock) => Poll::Pending,
1450 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1451 };
1452 }
1453 }
1454 }
1455
1456 let poller = SocketReceiver {
1457 inner: &self.inner,
1458 data: buf,
1459 nonblocking,
1460 peek,
1461 handler_registered: false,
1462 };
1463 if let Some(timeout) = timeout {
1464 tokio::select! {
1465 res = poller => res,
1466 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1467 }
1468 } else {
1469 poller.await
1470 }
1471 }
1472
1473 pub fn shutdown(&mut self, how: std::net::Shutdown) -> Result<(), Errno> {
1474 let mut inner = self.inner.protected.write().unwrap();
1475 match &mut inner.kind {
1476 InodeSocketKind::TcpStream { socket, .. } => {
1477 socket.shutdown(how).map_err(net_error_into_wasi_err)?;
1478 }
1479 InodeSocketKind::RemoteSocket { .. } => return Ok(()),
1480 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
1481 _ => return Err(Errno::Notsup),
1482 }
1483 Ok(())
1484 }
1485
1486 pub async fn can_write(&self) -> bool {
1487 if let Ok(mut guard) = self.inner.protected.try_write() {
1488 #[allow(clippy::match_like_matches_macro)]
1489 match &mut guard.kind {
1490 InodeSocketKind::TcpStream { .. }
1491 | InodeSocketKind::UdpSocket { .. }
1492 | InodeSocketKind::Raw(..) => true,
1493 InodeSocketKind::RemoteSocket { is_dead, .. } => !(*is_dead),
1494 _ => false,
1495 }
1496 } else {
1497 false
1498 }
1499 }
1500}
1501
1502impl InodeSocketProtected {
1503 pub fn remove_handler(&mut self) {
1504 match &mut self.kind {
1505 InodeSocketKind::TcpListener { socket, .. } => socket.remove_handler(),
1506 InodeSocketKind::TcpStream { socket, .. } => socket.remove_handler(),
1507 InodeSocketKind::UdpSocket { socket, .. } => socket.remove_handler(),
1508 InodeSocketKind::Raw(socket) => socket.remove_handler(),
1509 InodeSocketKind::Icmp(socket) => socket.remove_handler(),
1510 InodeSocketKind::PreSocket { props, .. } => {
1511 props.handler.take();
1512 }
1513 InodeSocketKind::RemoteSocket { props, .. } => {
1514 props.handler.take();
1515 }
1516 }
1517 }
1518
1519 pub fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1520 match &mut self.kind {
1521 InodeSocketKind::TcpListener { socket, .. } => socket.poll_read_ready(cx),
1522 InodeSocketKind::TcpStream { socket, .. } => socket.poll_read_ready(cx),
1523 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_read_ready(cx),
1524 InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
1525 InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
1526 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1527 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1528 true => Poll::Ready(Ok(0)),
1529 false => Poll::Pending,
1530 },
1531 }
1532 .map_err(net_error_into_io_err)
1533 }
1534
1535 pub fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1536 match &mut self.kind {
1537 InodeSocketKind::TcpListener { socket, .. } => socket.poll_write_ready(cx),
1538 InodeSocketKind::TcpStream { socket, .. } => socket.poll_write_ready(cx),
1539 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_write_ready(cx),
1540 InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
1541 InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
1542 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1543 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1544 true => Poll::Ready(Ok(0)),
1545 false => Poll::Pending,
1546 },
1547 }
1548 .map_err(net_error_into_io_err)
1549 }
1550
1551 pub fn set_handler(
1552 &mut self,
1553 handler: Box<dyn InterestHandler + Send + Sync>,
1554 ) -> virtual_net::Result<()> {
1555 match &mut self.kind {
1556 InodeSocketKind::TcpListener { socket, .. } => socket.set_handler(handler),
1557 InodeSocketKind::TcpStream { socket, .. } => socket.set_handler(handler),
1558 InodeSocketKind::UdpSocket { socket, .. } => socket.set_handler(handler),
1559 InodeSocketKind::Raw(socket) => socket.set_handler(handler),
1560 InodeSocketKind::Icmp(socket) => socket.set_handler(handler),
1561 InodeSocketKind::PreSocket { props, .. }
1562 | InodeSocketKind::RemoteSocket { props, .. } => {
1563 props.handler.replace(handler);
1564 Ok(())
1565 }
1566 }
1567 }
1568}
1569
1570#[allow(dead_code)]
1572pub(crate) fn all_socket_rights() -> Rights {
1573 Rights::FD_FDSTAT_SET_FLAGS
1574 .union(Rights::FD_FILESTAT_GET)
1575 .union(Rights::FD_READ)
1576 .union(Rights::FD_WRITE)
1577 .union(Rights::POLL_FD_READWRITE)
1578 .union(Rights::SOCK_SHUTDOWN)
1579 .union(Rights::SOCK_CONNECT)
1580 .union(Rights::SOCK_LISTEN)
1581 .union(Rights::SOCK_BIND)
1582 .union(Rights::SOCK_ACCEPT)
1583 .union(Rights::SOCK_RECV)
1584 .union(Rights::SOCK_SEND)
1585 .union(Rights::SOCK_ADDR_LOCAL)
1586 .union(Rights::SOCK_ADDR_REMOTE)
1587 .union(Rights::SOCK_RECV_FROM)
1588 .union(Rights::SOCK_SEND_TO)
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593 use super::{InodeSocket, InodeSocketKind, WasiSocketStatus};
1594 use std::{
1595 mem::MaybeUninit,
1596 net::{Ipv4Addr, Shutdown, SocketAddr},
1597 pin::Pin,
1598 sync::{
1599 Arc,
1600 atomic::{AtomicUsize, Ordering},
1601 },
1602 task::{Context, Poll},
1603 time::Duration,
1604 };
1605 use virtual_mio::InterestHandler;
1606 use virtual_net::{
1607 NetworkError, Result as NetResult, SocketStatus, VirtualConnectedSocket, VirtualIoSource,
1608 VirtualSocket, VirtualTcpSocket,
1609 };
1610
1611 #[derive(Debug)]
1612 struct MockTcpSocket {
1613 read_calls: Arc<AtomicUsize>,
1614 write_calls: Arc<AtomicUsize>,
1615 status: Arc<AtomicUsize>,
1616 }
1617
1618 const MOCK_STATUS_OPENING: usize = 0;
1619 const MOCK_STATUS_OPENED: usize = 1;
1620
1621 fn decode_mock_status(value: usize) -> SocketStatus {
1622 match value {
1623 MOCK_STATUS_OPENED => SocketStatus::Opened,
1624 _ => SocketStatus::Opening,
1625 }
1626 }
1627
1628 impl VirtualIoSource for MockTcpSocket {
1629 fn remove_handler(&mut self) {}
1630
1631 fn poll_read_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1632 self.read_calls.fetch_add(1, Ordering::Relaxed);
1633 Poll::Ready(Ok(3))
1634 }
1635
1636 fn poll_write_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1637 self.write_calls.fetch_add(1, Ordering::Relaxed);
1638 self.status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1639 Poll::Ready(Ok(7))
1640 }
1641 }
1642
1643 impl VirtualSocket for MockTcpSocket {
1644 fn set_ttl(&mut self, _ttl: u32) -> NetResult<()> {
1645 Ok(())
1646 }
1647
1648 fn ttl(&self) -> NetResult<u32> {
1649 Ok(64)
1650 }
1651
1652 fn addr_local(&self) -> NetResult<SocketAddr> {
1653 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)))
1654 }
1655
1656 fn status(&self) -> NetResult<SocketStatus> {
1657 Ok(decode_mock_status(self.status.load(Ordering::Relaxed)))
1658 }
1659
1660 fn set_handler(
1661 &mut self,
1662 _handler: Box<dyn InterestHandler + Send + Sync>,
1663 ) -> NetResult<()> {
1664 Ok(())
1665 }
1666 }
1667
1668 impl VirtualConnectedSocket for MockTcpSocket {
1669 fn set_linger(&mut self, _linger: Option<Duration>) -> NetResult<()> {
1670 Ok(())
1671 }
1672
1673 fn linger(&self) -> NetResult<Option<Duration>> {
1674 Ok(None)
1675 }
1676
1677 fn try_send(&mut self, _data: &[u8]) -> NetResult<usize> {
1678 Err(NetworkError::Unsupported)
1679 }
1680
1681 fn try_flush(&mut self) -> NetResult<()> {
1682 Err(NetworkError::Unsupported)
1683 }
1684
1685 fn close(&mut self) -> NetResult<()> {
1686 Ok(())
1687 }
1688
1689 fn try_recv(&mut self, _buf: &mut [MaybeUninit<u8>], _peek: bool) -> NetResult<usize> {
1690 Err(NetworkError::Unsupported)
1691 }
1692 }
1693
1694 impl VirtualTcpSocket for MockTcpSocket {
1695 fn set_recv_buf_size(&mut self, _size: usize) -> NetResult<()> {
1696 Ok(())
1697 }
1698
1699 fn recv_buf_size(&self) -> NetResult<usize> {
1700 Ok(0)
1701 }
1702
1703 fn set_send_buf_size(&mut self, _size: usize) -> NetResult<()> {
1704 Ok(())
1705 }
1706
1707 fn send_buf_size(&self) -> NetResult<usize> {
1708 Ok(0)
1709 }
1710
1711 fn set_nodelay(&mut self, _reuse: bool) -> NetResult<()> {
1712 Ok(())
1713 }
1714
1715 fn nodelay(&self) -> NetResult<bool> {
1716 Ok(true)
1717 }
1718
1719 fn set_keepalive(&mut self, _keepalive: bool) -> NetResult<()> {
1720 Ok(())
1721 }
1722
1723 fn keepalive(&self) -> NetResult<bool> {
1724 Ok(false)
1725 }
1726
1727 fn set_dontroute(&mut self, _keepalive: bool) -> NetResult<()> {
1728 Ok(())
1729 }
1730
1731 fn dontroute(&self) -> NetResult<bool> {
1732 Ok(false)
1733 }
1734
1735 fn addr_peer(&self) -> NetResult<SocketAddr> {
1736 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 80)))
1737 }
1738
1739 fn shutdown(&mut self, _how: Shutdown) -> NetResult<()> {
1740 Ok(())
1741 }
1742
1743 fn is_closed(&self) -> bool {
1744 false
1745 }
1746 }
1747
1748 #[test]
1749 fn inode_socket_poll_write_ready_uses_write_path() {
1750 let read_calls = Arc::new(AtomicUsize::new(0));
1751 let write_calls = Arc::new(AtomicUsize::new(0));
1752 let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENED));
1753 let mut inode = InodeSocket::new(InodeSocketKind::TcpStream {
1754 socket: Box::new(MockTcpSocket {
1755 read_calls: read_calls.clone(),
1756 write_calls: write_calls.clone(),
1757 status,
1758 }),
1759 write_timeout: None,
1760 read_timeout: None,
1761 });
1762
1763 let waker = futures::task::noop_waker();
1764 let mut cx = Context::from_waker(&waker);
1765 let ready = Pin::new(&mut inode).poll_write_ready(&mut cx);
1766
1767 assert!(matches!(ready, Poll::Ready(Ok(7))));
1768 assert_eq!(read_calls.load(Ordering::Relaxed), 0);
1769 assert_eq!(write_calls.load(Ordering::Relaxed), 1);
1770 }
1771
1772 #[test]
1773 fn inode_socket_status_tracks_tcp_socket_status() {
1774 let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENING));
1775 let inode = InodeSocket::new(InodeSocketKind::TcpStream {
1776 socket: Box::new(MockTcpSocket {
1777 read_calls: Arc::new(AtomicUsize::new(0)),
1778 write_calls: Arc::new(AtomicUsize::new(0)),
1779 status: status.clone(),
1780 }),
1781 write_timeout: None,
1782 read_timeout: None,
1783 });
1784
1785 assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opening));
1786 status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1787 assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opened));
1788 }
1789}