1use std::{
2 future::Future,
3 io,
4 mem::MaybeUninit,
5 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
6 pin::Pin,
7 sync::{Arc, RwLock, RwLockWriteGuard},
8 task::{Context, Poll},
9 time::Duration,
10};
11
12#[cfg(feature = "enable-serde")]
13use serde_derive::{Deserialize, Serialize};
14use virtual_mio::InterestHandler;
15use virtual_net::{
16 NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpListener,
17 VirtualTcpSocket, VirtualUdpSocket, net_error_into_io_err,
18};
19use wasmer_types::MemorySize;
20use wasmer_wasix_types::wasi::{Addressfamily, Errno, Rights, SockProto, Sockoption, Socktype};
21
22use crate::{VirtualTaskManager, net::net_error_into_wasi_err};
23
24#[derive(Debug)]
25#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
26pub enum InodeHttpSocketType {
27 Request,
29 Response,
31 Headers,
33}
34
35#[derive(Debug)]
36pub struct SocketProperties {
37 pub family: Addressfamily,
38 pub ty: Socktype,
39 pub pt: SockProto,
40 pub only_v6: bool,
41 pub reuse_port: bool,
42 pub reuse_addr: bool,
43 pub no_delay: Option<bool>,
44 pub keep_alive: Option<bool>,
45 pub dont_route: Option<bool>,
46 pub send_buf_size: Option<usize>,
47 pub recv_buf_size: Option<usize>,
48 pub write_timeout: Option<Duration>,
49 pub read_timeout: Option<Duration>,
50 pub accept_timeout: Option<Duration>,
51 pub connect_timeout: Option<Duration>,
52 pub handler: Option<Box<dyn InterestHandler + Send + Sync>>,
53}
54
55#[derive(Debug)]
56pub enum InodeSocketKind {
58 PreSocket {
59 props: SocketProperties,
60 addr: Option<SocketAddr>,
61 },
62 Icmp(Box<dyn VirtualIcmpSocket + Sync>),
63 Raw(Box<dyn VirtualRawSocket + Sync>),
64 TcpListener {
65 socket: Box<dyn VirtualTcpListener + Sync>,
66 accept_timeout: Option<Duration>,
67 },
68 TcpStream {
69 socket: Box<dyn VirtualTcpSocket + Sync>,
70 write_timeout: Option<Duration>,
71 read_timeout: Option<Duration>,
72 },
73 UdpSocket {
74 socket: Box<dyn VirtualUdpSocket + Sync>,
75 peer: Option<SocketAddr>,
76 },
77 RemoteSocket {
78 props: SocketProperties,
79 local_addr: SocketAddr,
80 peer_addr: SocketAddr,
81 ttl: u32,
82 multicast_ttl: u32,
83 is_dead: bool,
84 },
85}
86
87pub enum WasiSocketOption {
88 Noop,
89 ReusePort,
90 ReuseAddr,
91 NoDelay,
92 DontRoute,
93 OnlyV6,
94 Broadcast,
95 MulticastLoopV4,
96 MulticastLoopV6,
97 Promiscuous,
98 Listening,
99 LastError,
100 KeepAlive,
101 Linger,
102 OobInline,
103 RecvBufSize,
104 SendBufSize,
105 RecvLowat,
106 SendLowat,
107 RecvTimeout,
108 SendTimeout,
109 ConnectTimeout,
110 AcceptTimeout,
111 Ttl,
112 MulticastTtlV4,
113 Type,
114 Proto,
115}
116
117impl TryFrom<Sockoption> for WasiSocketOption {
118 type Error = Errno;
119
120 fn try_from(opt: Sockoption) -> Result<Self, Self::Error> {
121 use WasiSocketOption::*;
122 match opt {
123 Sockoption::Noop => Ok(Noop),
124 Sockoption::ReusePort => Ok(ReusePort),
125 Sockoption::ReuseAddr => Ok(ReuseAddr),
126 Sockoption::NoDelay => Ok(NoDelay),
127 Sockoption::DontRoute => Ok(DontRoute),
128 Sockoption::OnlyV6 => Ok(OnlyV6),
129 Sockoption::Broadcast => Ok(Broadcast),
130 Sockoption::MulticastLoopV4 => Ok(MulticastLoopV4),
131 Sockoption::MulticastLoopV6 => Ok(MulticastLoopV6),
132 Sockoption::Promiscuous => Ok(Promiscuous),
133 Sockoption::Listening => Ok(Listening),
134 Sockoption::LastError => Ok(LastError),
135 Sockoption::KeepAlive => Ok(KeepAlive),
136 Sockoption::Linger => Ok(Linger),
137 Sockoption::OobInline => Ok(OobInline),
138 Sockoption::RecvBufSize => Ok(RecvBufSize),
139 Sockoption::SendBufSize => Ok(SendBufSize),
140 Sockoption::RecvLowat => Ok(RecvLowat),
141 Sockoption::SendLowat => Ok(SendLowat),
142 Sockoption::RecvTimeout => Ok(RecvTimeout),
143 Sockoption::SendTimeout => Ok(SendTimeout),
144 Sockoption::ConnectTimeout => Ok(ConnectTimeout),
145 Sockoption::AcceptTimeout => Ok(AcceptTimeout),
146 Sockoption::Ttl => Ok(Ttl),
147 Sockoption::MulticastTtlV4 => Ok(MulticastTtlV4),
148 Sockoption::Type => Ok(Type),
149 Sockoption::Proto => Ok(Proto),
150 _ => Err(Errno::Inval),
151 }
152 }
153}
154
155#[derive(Debug)]
156pub enum WasiSocketStatus {
157 Opening,
158 Opened,
159 Closed,
160 Failed,
161}
162
163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
165pub enum TimeType {
166 ReadTimeout,
167 WriteTimeout,
168 AcceptTimeout,
169 ConnectTimeout,
170 BindTimeout,
171 Linger,
172}
173
174impl From<TimeType> for wasmer_journal::SocketOptTimeType {
175 fn from(value: TimeType) -> Self {
176 match value {
177 TimeType::ReadTimeout => Self::ReadTimeout,
178 TimeType::WriteTimeout => Self::WriteTimeout,
179 TimeType::AcceptTimeout => Self::AcceptTimeout,
180 TimeType::ConnectTimeout => Self::ConnectTimeout,
181 TimeType::BindTimeout => Self::BindTimeout,
182 TimeType::Linger => Self::Linger,
183 }
184 }
185}
186
187impl From<wasmer_journal::SocketOptTimeType> for TimeType {
188 fn from(value: wasmer_journal::SocketOptTimeType) -> Self {
189 use wasmer_journal::SocketOptTimeType;
190 match value {
191 SocketOptTimeType::ReadTimeout => TimeType::ReadTimeout,
192 SocketOptTimeType::WriteTimeout => TimeType::WriteTimeout,
193 SocketOptTimeType::AcceptTimeout => TimeType::AcceptTimeout,
194 SocketOptTimeType::ConnectTimeout => TimeType::ConnectTimeout,
195 SocketOptTimeType::BindTimeout => TimeType::BindTimeout,
196 SocketOptTimeType::Linger => TimeType::Linger,
197 }
198 }
199}
200
201#[derive(Debug)]
202pub(crate) struct InodeSocketProtected {
204 pub kind: InodeSocketKind,
205}
206
207#[derive(Debug)]
208pub(crate) struct InodeSocketInner {
210 pub protected: RwLock<InodeSocketProtected>,
211}
212
213#[derive(Debug, Clone)]
214pub struct InodeSocket {
216 pub(crate) inner: Arc<InodeSocketInner>,
217}
218
219impl InodeSocket {
220 pub fn new(kind: InodeSocketKind) -> Self {
221 let protected = InodeSocketProtected { kind };
222 Self {
223 inner: Arc::new(InodeSocketInner {
224 protected: RwLock::new(protected),
225 }),
226 }
227 }
228
229 pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
230 let mut inner = self.inner.protected.write().unwrap();
231 inner.poll_read_ready(cx)
232 }
233
234 pub fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
235 let mut inner = self.inner.protected.write().unwrap();
236 inner.poll_write_ready(cx)
237 }
238
239 #[allow(clippy::await_holding_lock, clippy::readonly_write_lock)]
244 pub async fn auto_bind_udp(
245 &self,
246 tasks: &dyn VirtualTaskManager,
247 net: &dyn VirtualNetworking,
248 ) -> Result<Option<InodeSocket>, Errno> {
249 let timeout = self
250 .opt_time(TimeType::BindTimeout)
251 .ok()
252 .flatten()
253 .unwrap_or(Duration::from_secs(30));
254 let inner = self.inner.protected.write().unwrap();
255 match &inner.kind {
256 InodeSocketKind::PreSocket { props, .. } if props.ty == Socktype::Dgram => {
257 let addr = match props.family {
258 Addressfamily::Inet4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
259 Addressfamily::Inet6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
260 _ => return Err(Errno::Notsup),
261 };
262 Self::bind_internal(tasks, net, addr, timeout, inner).await
263 }
264 _ => Ok(None),
265 }
266 }
267
268 pub async fn bind(
269 &self,
270 tasks: &dyn VirtualTaskManager,
271 net: &dyn VirtualNetworking,
272 set_addr: SocketAddr,
273 ) -> Result<Option<InodeSocket>, Errno> {
274 let timeout = self
275 .opt_time(TimeType::BindTimeout)
276 .ok()
277 .flatten()
278 .unwrap_or(Duration::from_secs(30));
279 let inner = self.inner.protected.write().unwrap();
280 Self::bind_internal(tasks, net, set_addr, timeout, inner).await
281 }
282
283 #[allow(clippy::await_holding_lock)]
285 async fn bind_internal(
286 tasks: &dyn VirtualTaskManager,
287 net: &dyn VirtualNetworking,
288 set_addr: SocketAddr,
289 timeout: Duration,
290 mut inner: RwLockWriteGuard<'_, InodeSocketProtected>,
291 ) -> Result<Option<InodeSocket>, Errno> {
292 let socket = {
293 match &mut inner.kind {
294 InodeSocketKind::PreSocket { props, addr, .. } => {
295 match props.family {
296 Addressfamily::Inet4 => {
297 if !set_addr.is_ipv4() {
298 tracing::debug!(
299 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
300 );
301 return Err(Errno::Inval);
302 }
303 }
304 Addressfamily::Inet6 => {
305 if !set_addr.is_ipv6() {
306 tracing::debug!(
307 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
308 );
309 return Err(Errno::Inval);
310 }
311 }
312 _ => {
313 return Err(Errno::Notsup);
314 }
315 }
316
317 addr.replace(set_addr);
318 let addr = (*addr).unwrap();
319
320 match props.ty {
321 Socktype::Stream => {
322 return Ok(None);
325 }
326 Socktype::Dgram => {
327 let reuse_port = props.reuse_port;
328 let reuse_addr = props.reuse_addr;
329
330 net.bind_udp(addr, reuse_port, reuse_addr)
331 }
332 _ => return Err(Errno::Inval),
333 }
334 }
335 InodeSocketKind::RemoteSocket {
336 props,
337 local_addr: addr,
338 ..
339 } => {
340 match props.family {
341 Addressfamily::Inet4 => {
342 if !set_addr.is_ipv4() {
343 tracing::debug!(
344 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
345 );
346 return Err(Errno::Inval);
347 }
348 }
349 Addressfamily::Inet6 => {
350 if !set_addr.is_ipv6() {
351 tracing::debug!(
352 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
353 );
354 return Err(Errno::Inval);
355 }
356 }
357 _ => {
358 return Err(Errno::Notsup);
359 }
360 }
361
362 *addr = set_addr;
363 let addr = *addr;
364
365 match props.ty {
366 Socktype::Stream => {
367 return Ok(None);
370 }
371 Socktype::Dgram => {
372 let reuse_port = props.reuse_port;
373 let reuse_addr = props.reuse_addr;
374
375 net.bind_udp(addr, reuse_port, reuse_addr)
376 }
377 _ => return Err(Errno::Inval),
378 }
379 }
380 _ => return Err(Errno::Notsup),
381 }
382 };
383
384 drop(inner);
385
386 tokio::select! {
387 socket = socket => {
388 let socket = socket.map_err(net_error_into_wasi_err)?;
389 Ok(Some(InodeSocket::new(InodeSocketKind::UdpSocket { socket, peer: None })))
390 },
391 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
392 }
393 }
394
395 pub async fn listen(
396 &self,
397 tasks: &dyn VirtualTaskManager,
398 net: &dyn VirtualNetworking,
399 _backlog: usize,
400 ) -> Result<Option<InodeSocket>, Errno> {
401 let timeout = self
402 .opt_time(TimeType::AcceptTimeout)
403 .ok()
404 .flatten()
405 .unwrap_or(Duration::from_secs(30));
406
407 let socket = {
408 let inner = self.inner.protected.read().unwrap();
409 match &inner.kind {
410 InodeSocketKind::PreSocket { props, addr, .. } => match props.ty {
411 Socktype::Stream => {
412 if addr.is_none() {
413 tracing::warn!("wasi[?]::sock_listen - failed - address not set");
414 return Err(Errno::Inval);
415 }
416 let addr = *addr.as_ref().unwrap();
417 let only_v6 = props.only_v6;
418 let reuse_port = props.reuse_port;
419 let reuse_addr = props.reuse_addr;
420 drop(inner);
421
422 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
423 }
424 ty => {
425 tracing::warn!(
426 "wasi[?]::sock_listen - failed - not supported(pre-socket:{:?})",
427 ty
428 );
429 return Err(Errno::Notsup);
430 }
431 },
432 InodeSocketKind::RemoteSocket {
433 props,
434 local_addr: addr,
435 ..
436 } => match props.ty {
437 Socktype::Stream => {
438 let addr = *addr;
439 let only_v6 = props.only_v6;
440 let reuse_port = props.reuse_port;
441 let reuse_addr = props.reuse_addr;
442 drop(inner);
443
444 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
445 }
446 ty => {
447 tracing::warn!(
448 "wasi[?]::sock_listen - failed - not supported(remote-socket:{:?})",
449 ty
450 );
451 return Err(Errno::Notsup);
452 }
453 },
454 InodeSocketKind::Icmp(_) => {
455 tracing::warn!("wasi[?]::sock_listen - failed - not supported(icmp)");
456 return Err(Errno::Notsup);
457 }
458 InodeSocketKind::Raw(_) => {
459 tracing::warn!("wasi[?]::sock_listen - failed - not supported(raw)");
460 return Err(Errno::Notsup);
461 }
462 InodeSocketKind::TcpListener { .. } => {
463 tracing::warn!(
464 "wasi[?]::sock_listen - failed - already listening (tcp-listener)"
465 );
466 return Err(Errno::Notsup);
467 }
468 InodeSocketKind::TcpStream { .. } => {
469 tracing::warn!("wasi[?]::sock_listen - failed - not supported(tcp-stream)");
470 return Err(Errno::Notsup);
471 }
472 InodeSocketKind::UdpSocket { .. } => {
473 tracing::warn!("wasi[?]::sock_listen - failed - not supported(udp-socket)");
474 return Err(Errno::Notsup);
475 }
476 }
477 };
478
479 tokio::select! {
480 socket = socket => {
481 let socket = socket.map_err(net_error_into_wasi_err)?;
482 Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
483 socket,
484 accept_timeout: Some(timeout),
485 })))
486 },
487 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
488 }
489 }
490
491 pub async fn accept(
492 &self,
493 tasks: &dyn VirtualTaskManager,
494 nonblocking: bool,
495 timeout: Option<Duration>,
496 ) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno> {
497 struct SocketAccepter<'a> {
498 sock: &'a InodeSocket,
499 nonblocking: bool,
500 handler_registered: bool,
501 }
502 impl Drop for SocketAccepter<'_> {
503 fn drop(&mut self) {
504 if self.handler_registered {
505 let mut inner = self.sock.inner.protected.write().unwrap();
506 inner.remove_handler();
507 }
508 }
509 }
510 impl Future for SocketAccepter<'_> {
511 type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno>;
512 fn poll(
513 mut self: Pin<&mut Self>,
514 cx: &mut std::task::Context<'_>,
515 ) -> std::task::Poll<Self::Output> {
516 loop {
517 let mut inner = self.sock.inner.protected.write().unwrap();
518 return match &mut inner.kind {
519 InodeSocketKind::TcpListener { socket, .. } => match socket.try_accept() {
520 Ok((child, addr)) => Poll::Ready(Ok((child, addr))),
521 Err(NetworkError::WouldBlock) if self.nonblocking => {
522 Poll::Ready(Err(Errno::Again))
523 }
524 Err(NetworkError::WouldBlock) if !self.handler_registered => {
525 let res = socket.set_handler(cx.waker().into());
526 if let Err(err) = res {
527 return Poll::Ready(Err(net_error_into_wasi_err(err)));
528 }
529 drop(inner);
530 self.handler_registered = true;
531 continue;
532 }
533 Err(NetworkError::WouldBlock) => Poll::Pending,
534 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
535 },
536 InodeSocketKind::PreSocket { .. } => Poll::Ready(Err(Errno::Notconn)),
537 _ => Poll::Ready(Err(Errno::Notsup)),
538 };
539 }
540 }
541 }
542
543 let acceptor = SocketAccepter {
544 sock: self,
545 nonblocking,
546 handler_registered: false,
547 };
548 if let Some(timeout) = timeout {
549 tokio::select! {
550 res = acceptor => res,
551 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
552 }
553 } else {
554 acceptor.await
555 }
556 }
557
558 pub fn close(&self) -> Result<(), Errno> {
559 let mut inner = self.inner.protected.write().unwrap();
560 match &mut inner.kind {
561 InodeSocketKind::TcpListener { .. } => {}
562 InodeSocketKind::TcpStream { socket, .. } => {
563 socket.close().map_err(net_error_into_wasi_err)?;
564 }
565 InodeSocketKind::Icmp(_) => {}
566 InodeSocketKind::UdpSocket { .. } => {}
567 InodeSocketKind::Raw(_) => {}
568 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
569 InodeSocketKind::RemoteSocket { .. } => {}
570 };
571 Ok(())
572 }
573
574 pub async fn connect(
575 &mut self,
576 tasks: &dyn VirtualTaskManager,
577 net: &dyn VirtualNetworking,
578 peer: SocketAddr,
579 timeout: Option<std::time::Duration>,
580 nonblocking: bool,
581 ) -> Result<Option<InodeSocket>, Errno> {
582 let new_write_timeout;
583 let new_read_timeout;
584
585 let timeout = timeout
586 .or_else(|| self.opt_time(TimeType::ConnectTimeout).ok().flatten())
587 .unwrap_or(Duration::from_secs(30));
588
589 let handler;
590 let connect = {
591 let mut inner = self.inner.protected.write().unwrap();
592 match &mut inner.kind {
593 InodeSocketKind::PreSocket { props, addr, .. } => {
594 handler = props.handler.take();
595 new_write_timeout = props.write_timeout;
596 new_read_timeout = props.read_timeout;
597 match props.ty {
598 Socktype::Stream => {
599 let no_delay = props.no_delay;
600 let keep_alive = props.keep_alive;
601 let dont_route = props.dont_route;
602 let addr = match addr {
603 Some(a) => *a,
604 None => {
605 let ip = match peer.is_ipv4() {
606 true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
607 false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
608 };
609 SocketAddr::new(ip, 0)
610 }
611 };
612 Box::pin(async move {
613 let mut ret = net.connect_tcp(addr, peer).await?;
614 if let Some(no_delay) = no_delay {
615 ret.set_nodelay(no_delay).ok();
616 }
617 if let Some(keep_alive) = keep_alive {
618 ret.set_keepalive(keep_alive).ok();
619 }
620 if let Some(dont_route) = dont_route {
621 ret.set_dontroute(dont_route).ok();
622 }
623 if !nonblocking {
624 futures::future::poll_fn(|cx| ret.poll_write_ready(cx)).await?;
625 }
626 Ok(ret)
627 })
628 }
629 Socktype::Dgram => return Err(Errno::Inval),
630 _ => return Err(Errno::Notsup),
631 }
632 }
633 InodeSocketKind::UdpSocket {
634 peer: target_peer, ..
635 } => {
636 target_peer.replace(peer);
637 return Ok(None);
638 }
639 InodeSocketKind::RemoteSocket { peer_addr, .. } => {
640 *peer_addr = peer;
641 return Ok(None);
642 }
643 _ => return Err(Errno::Notsup),
644 }
645 };
646
647 let mut socket = tokio::select! {
648 res = connect => res.map_err(net_error_into_wasi_err)?,
649 _ = tasks.sleep_now(timeout) => return Err(Errno::Timedout)
650 };
651
652 if let Some(handler) = handler {
653 socket
654 .set_handler(handler)
655 .map_err(net_error_into_wasi_err)?;
656 }
657
658 let socket = InodeSocket::new(InodeSocketKind::TcpStream {
659 socket,
660 write_timeout: new_write_timeout,
661 read_timeout: new_read_timeout,
662 });
663
664 Ok(Some(socket))
665 }
666
667 pub fn status(&self) -> Result<WasiSocketStatus, Errno> {
668 let inner = self.inner.protected.read().unwrap();
669 Ok(match &inner.kind {
670 InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
671 InodeSocketKind::TcpListener { .. } => WasiSocketStatus::Opened,
672 InodeSocketKind::TcpStream { socket, .. } => match socket.status() {
673 Ok(virtual_net::SocketStatus::Opening) => WasiSocketStatus::Opening,
674 Ok(virtual_net::SocketStatus::Opened) => WasiSocketStatus::Opened,
675 Ok(virtual_net::SocketStatus::Closed) => WasiSocketStatus::Closed,
676 Ok(virtual_net::SocketStatus::Failed) => WasiSocketStatus::Failed,
677 Err(_) => WasiSocketStatus::Failed,
678 },
679 InodeSocketKind::UdpSocket { .. } => WasiSocketStatus::Opened,
680 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
681 true => WasiSocketStatus::Closed,
682 false => WasiSocketStatus::Opened,
683 },
684 _ => WasiSocketStatus::Failed,
685 })
686 }
687
688 pub fn addr_local(&self) -> Result<SocketAddr, Errno> {
689 let inner = self.inner.protected.read().unwrap();
690 Ok(match &inner.kind {
691 InodeSocketKind::PreSocket { props, addr, .. } => {
692 if let Some(addr) = addr {
693 *addr
694 } else {
695 SocketAddr::new(
696 match props.family {
697 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
698 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
699 _ => return Err(Errno::Inval),
700 },
701 0,
702 )
703 }
704 }
705 InodeSocketKind::Icmp(sock) => sock.addr_local().map_err(net_error_into_wasi_err)?,
706 InodeSocketKind::TcpListener { socket, .. } => {
707 socket.addr_local().map_err(net_error_into_wasi_err)?
708 }
709 InodeSocketKind::TcpStream { socket, .. } => {
710 socket.addr_local().map_err(net_error_into_wasi_err)?
711 }
712 InodeSocketKind::UdpSocket { socket, .. } => {
713 socket.addr_local().map_err(net_error_into_wasi_err)?
714 }
715 InodeSocketKind::RemoteSocket {
716 local_addr: addr, ..
717 } => *addr,
718 _ => return Err(Errno::Notsup),
719 })
720 }
721
722 pub fn addr_peer(&self) -> Result<SocketAddr, Errno> {
723 let inner = self.inner.protected.read().unwrap();
724 Ok(match &inner.kind {
725 InodeSocketKind::PreSocket { props, .. } => SocketAddr::new(
726 match props.family {
727 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
728 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
729 _ => return Err(Errno::Inval),
730 },
731 0,
732 ),
733 InodeSocketKind::TcpStream { socket, .. } => {
734 socket.addr_peer().map_err(net_error_into_wasi_err)?
735 }
736 InodeSocketKind::UdpSocket { socket, .. } => socket
737 .addr_peer()
738 .map_err(net_error_into_wasi_err)?
739 .map(Ok)
740 .unwrap_or_else(|| {
741 socket
742 .addr_local()
743 .map_err(net_error_into_wasi_err)
744 .map(|addr| {
745 SocketAddr::new(
746 match addr {
747 SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
748 SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
749 },
750 0,
751 )
752 })
753 })?,
754 InodeSocketKind::RemoteSocket { peer_addr, .. } => *peer_addr,
755 _ => return Err(Errno::Notsup),
756 })
757 }
758
759 pub fn set_opt_flag(&mut self, option: WasiSocketOption, val: bool) -> Result<(), Errno> {
760 let mut inner = self.inner.protected.write().unwrap();
761 match &mut inner.kind {
762 InodeSocketKind::PreSocket { props, .. }
763 | InodeSocketKind::RemoteSocket { props, .. } => {
764 match option {
765 WasiSocketOption::OnlyV6 => props.only_v6 = val,
766 WasiSocketOption::ReusePort => props.reuse_port = val,
767 WasiSocketOption::ReuseAddr => props.reuse_addr = val,
768 WasiSocketOption::NoDelay => props.no_delay = Some(val),
769 WasiSocketOption::KeepAlive => props.keep_alive = Some(val),
770 WasiSocketOption::DontRoute => props.dont_route = Some(val),
771 _ => return Err(Errno::Inval),
772 };
773 }
774 InodeSocketKind::Raw(sock) => match option {
775 WasiSocketOption::Promiscuous => {
776 sock.set_promiscuous(val).map_err(net_error_into_wasi_err)?
777 }
778 _ => return Err(Errno::Inval),
779 },
780 InodeSocketKind::TcpStream { socket, .. } => match option {
781 WasiSocketOption::NoDelay => {
782 socket.set_nodelay(val).map_err(net_error_into_wasi_err)?
783 }
784 WasiSocketOption::KeepAlive => {
785 socket.set_keepalive(val).map_err(net_error_into_wasi_err)?
786 }
787 WasiSocketOption::DontRoute => {
788 socket.set_dontroute(val).map_err(net_error_into_wasi_err)?
789 }
790 _ => return Err(Errno::Inval),
791 },
792 InodeSocketKind::TcpListener { .. } => return Err(Errno::Inval),
793 InodeSocketKind::UdpSocket { socket, .. } => match option {
794 WasiSocketOption::Broadcast => {
795 socket.set_broadcast(val).map_err(net_error_into_wasi_err)?
796 }
797 WasiSocketOption::MulticastLoopV4 => socket
798 .set_multicast_loop_v4(val)
799 .map_err(net_error_into_wasi_err)?,
800 WasiSocketOption::MulticastLoopV6 => socket
801 .set_multicast_loop_v6(val)
802 .map_err(net_error_into_wasi_err)?,
803 _ => return Err(Errno::Inval),
804 },
805 _ => return Err(Errno::Notsup),
806 }
807 Ok(())
808 }
809
810 pub fn get_opt_flag(&self, option: WasiSocketOption) -> Result<bool, Errno> {
811 let mut inner = self.inner.protected.write().unwrap();
812 Ok(match &mut inner.kind {
813 InodeSocketKind::PreSocket { props, .. }
814 | InodeSocketKind::RemoteSocket { props, .. } => match option {
815 WasiSocketOption::OnlyV6 => props.only_v6,
816 WasiSocketOption::ReusePort => props.reuse_port,
817 WasiSocketOption::ReuseAddr => props.reuse_addr,
818 WasiSocketOption::NoDelay => props.no_delay.unwrap_or_default(),
819 WasiSocketOption::KeepAlive => props.keep_alive.unwrap_or_default(),
820 _ => return Err(Errno::Inval),
821 },
822 InodeSocketKind::Raw(sock) => match option {
823 WasiSocketOption::Promiscuous => {
824 sock.promiscuous().map_err(net_error_into_wasi_err)?
825 }
826 _ => return Err(Errno::Inval),
827 },
828 InodeSocketKind::TcpStream { socket, .. } => match option {
829 WasiSocketOption::NoDelay => socket.nodelay().map_err(net_error_into_wasi_err)?,
830 WasiSocketOption::KeepAlive => {
831 socket.keepalive().map_err(net_error_into_wasi_err)?
832 }
833 WasiSocketOption::DontRoute => {
834 socket.dontroute().map_err(net_error_into_wasi_err)?
835 }
836 _ => return Err(Errno::Inval),
837 },
838 InodeSocketKind::UdpSocket { socket, .. } => match option {
839 WasiSocketOption::Broadcast => {
840 socket.broadcast().map_err(net_error_into_wasi_err)?
841 }
842 WasiSocketOption::MulticastLoopV4 => socket
843 .multicast_loop_v4()
844 .map_err(net_error_into_wasi_err)?,
845 WasiSocketOption::MulticastLoopV6 => socket
846 .multicast_loop_v6()
847 .map_err(net_error_into_wasi_err)?,
848 _ => return Err(Errno::Inval),
849 },
850 _ => return Err(Errno::Notsup),
851 })
852 }
853
854 pub fn set_send_buf_size(&mut self, size: usize) -> Result<(), Errno> {
855 let mut inner = self.inner.protected.write().unwrap();
856 match &mut inner.kind {
857 InodeSocketKind::PreSocket { props, .. }
858 | InodeSocketKind::RemoteSocket { props, .. } => {
859 props.send_buf_size = Some(size);
860 }
861 InodeSocketKind::TcpStream { socket, .. } => {
862 socket
863 .set_send_buf_size(size)
864 .map_err(net_error_into_wasi_err)?;
865 }
866 _ => return Err(Errno::Notsup),
867 }
868 Ok(())
869 }
870
871 pub fn send_buf_size(&self) -> Result<usize, Errno> {
872 let inner = self.inner.protected.read().unwrap();
873 match &inner.kind {
874 InodeSocketKind::PreSocket { props, .. }
875 | InodeSocketKind::RemoteSocket { props, .. } => {
876 Ok(props.send_buf_size.unwrap_or_default())
877 }
878 InodeSocketKind::TcpStream { socket, .. } => {
879 socket.send_buf_size().map_err(net_error_into_wasi_err)
880 }
881 _ => Err(Errno::Notsup),
882 }
883 }
884
885 pub fn set_recv_buf_size(&mut self, size: usize) -> Result<(), Errno> {
886 let mut inner = self.inner.protected.write().unwrap();
887 match &mut inner.kind {
888 InodeSocketKind::PreSocket { props, .. }
889 | InodeSocketKind::RemoteSocket { props, .. } => {
890 props.recv_buf_size = Some(size);
891 }
892 InodeSocketKind::TcpStream { socket, .. } => {
893 socket
894 .set_recv_buf_size(size)
895 .map_err(net_error_into_wasi_err)?;
896 }
897 _ => return Err(Errno::Notsup),
898 }
899 Ok(())
900 }
901
902 pub fn recv_buf_size(&self) -> Result<usize, Errno> {
903 let inner = self.inner.protected.read().unwrap();
904 match &inner.kind {
905 InodeSocketKind::PreSocket { props, .. }
906 | InodeSocketKind::RemoteSocket { props, .. } => {
907 Ok(props.recv_buf_size.unwrap_or_default())
908 }
909 InodeSocketKind::TcpStream { socket, .. } => {
910 socket.recv_buf_size().map_err(net_error_into_wasi_err)
911 }
912 _ => Err(Errno::Notsup),
913 }
914 }
915
916 pub fn set_linger(&mut self, linger: Option<std::time::Duration>) -> Result<(), Errno> {
917 let mut inner = self.inner.protected.write().unwrap();
918 match &mut inner.kind {
919 InodeSocketKind::TcpStream { socket, .. } => {
920 socket.set_linger(linger).map_err(net_error_into_wasi_err)
921 }
922 InodeSocketKind::RemoteSocket { .. } => Ok(()),
923 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
924 _ => Err(Errno::Notsup),
925 }
926 }
927
928 pub fn linger(&self) -> Result<Option<std::time::Duration>, Errno> {
929 let inner = self.inner.protected.read().unwrap();
930 match &inner.kind {
931 InodeSocketKind::TcpStream { socket, .. } => {
932 socket.linger().map_err(net_error_into_wasi_err)
933 }
934 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
935 _ => Err(Errno::Notsup),
936 }
937 }
938
939 pub fn set_opt_time(
940 &self,
941 ty: TimeType,
942 timeout: Option<std::time::Duration>,
943 ) -> Result<(), Errno> {
944 let mut inner = self.inner.protected.write().unwrap();
945 match &mut inner.kind {
946 InodeSocketKind::TcpStream {
947 write_timeout,
948 read_timeout,
949 ..
950 } => {
951 match ty {
952 TimeType::WriteTimeout => *write_timeout = timeout,
953 TimeType::ReadTimeout => *read_timeout = timeout,
954 _ => return Err(Errno::Inval),
955 }
956 Ok(())
957 }
958 InodeSocketKind::TcpListener { accept_timeout, .. } => {
959 match ty {
960 TimeType::AcceptTimeout => *accept_timeout = timeout,
961 _ => return Err(Errno::Inval),
962 }
963 Ok(())
964 }
965 InodeSocketKind::PreSocket { props, .. }
966 | InodeSocketKind::RemoteSocket { props, .. } => {
967 match ty {
968 TimeType::ConnectTimeout => props.connect_timeout = timeout,
969 TimeType::AcceptTimeout => props.accept_timeout = timeout,
970 TimeType::ReadTimeout => props.read_timeout = timeout,
971 TimeType::WriteTimeout => props.write_timeout = timeout,
972 _ => return Err(Errno::Io),
973 }
974 Ok(())
975 }
976 _ => Err(Errno::Notsup),
977 }
978 }
979
980 pub fn opt_time(&self, ty: TimeType) -> Result<Option<std::time::Duration>, Errno> {
981 let inner = self.inner.protected.read().unwrap();
982 match &inner.kind {
983 InodeSocketKind::TcpStream {
984 read_timeout,
985 write_timeout,
986 ..
987 } => Ok(match ty {
988 TimeType::ReadTimeout => *read_timeout,
989 TimeType::WriteTimeout => *write_timeout,
990 _ => return Err(Errno::Inval),
991 }),
992 InodeSocketKind::TcpListener { accept_timeout, .. } => Ok(match ty {
993 TimeType::AcceptTimeout => *accept_timeout,
994 _ => return Err(Errno::Inval),
995 }),
996 InodeSocketKind::PreSocket { props, .. }
997 | InodeSocketKind::RemoteSocket { props, .. } => match ty {
998 TimeType::ConnectTimeout => Ok(props.connect_timeout),
999 TimeType::AcceptTimeout => Ok(props.accept_timeout),
1000 TimeType::ReadTimeout => Ok(props.read_timeout),
1001 TimeType::WriteTimeout => Ok(props.write_timeout),
1002 _ => Err(Errno::Inval),
1003 },
1004 _ => Err(Errno::Notsup),
1005 }
1006 }
1007
1008 pub fn set_ttl(&self, ttl: u32) -> Result<(), Errno> {
1009 let mut inner = self.inner.protected.write().unwrap();
1010 match &mut inner.kind {
1011 InodeSocketKind::TcpStream { socket, .. } => {
1012 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1013 }
1014 InodeSocketKind::UdpSocket { socket, .. } => {
1015 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1016 }
1017 InodeSocketKind::RemoteSocket { ttl: set_ttl, .. } => {
1018 *set_ttl = ttl;
1019 Ok(())
1020 }
1021 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1022 _ => Err(Errno::Notsup),
1023 }
1024 }
1025
1026 pub fn ttl(&self) -> Result<u32, Errno> {
1027 let inner = self.inner.protected.read().unwrap();
1028 match &inner.kind {
1029 InodeSocketKind::TcpStream { socket, .. } => {
1030 socket.ttl().map_err(net_error_into_wasi_err)
1031 }
1032 InodeSocketKind::UdpSocket { socket, .. } => {
1033 socket.ttl().map_err(net_error_into_wasi_err)
1034 }
1035 InodeSocketKind::RemoteSocket { ttl, .. } => Ok(*ttl),
1036 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1037 _ => Err(Errno::Notsup),
1038 }
1039 }
1040
1041 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> Result<(), Errno> {
1042 let mut inner = self.inner.protected.write().unwrap();
1043 match &mut inner.kind {
1044 InodeSocketKind::UdpSocket { socket, .. } => socket
1045 .set_multicast_ttl_v4(ttl)
1046 .map_err(net_error_into_wasi_err),
1047 InodeSocketKind::RemoteSocket {
1048 multicast_ttl: set_ttl,
1049 ..
1050 } => {
1051 *set_ttl = ttl;
1052 Ok(())
1053 }
1054 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1055 _ => Err(Errno::Notsup),
1056 }
1057 }
1058
1059 pub fn multicast_ttl_v4(&self) -> Result<u32, Errno> {
1060 let inner = self.inner.protected.read().unwrap();
1061 match &inner.kind {
1062 InodeSocketKind::UdpSocket { socket, .. } => {
1063 socket.multicast_ttl_v4().map_err(net_error_into_wasi_err)
1064 }
1065 InodeSocketKind::RemoteSocket { multicast_ttl, .. } => Ok(*multicast_ttl),
1066 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1067 _ => Err(Errno::Notsup),
1068 }
1069 }
1070
1071 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1072 let mut inner = self.inner.protected.write().unwrap();
1073 match &mut inner.kind {
1074 InodeSocketKind::UdpSocket { socket, .. } => socket
1075 .join_multicast_v4(multiaddr, iface)
1076 .map_err(net_error_into_wasi_err),
1077 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1078 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1079 _ => Err(Errno::Notsup),
1080 }
1081 }
1082
1083 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1084 let mut inner = self.inner.protected.write().unwrap();
1085 match &mut inner.kind {
1086 InodeSocketKind::UdpSocket { socket, .. } => socket
1087 .leave_multicast_v4(multiaddr, iface)
1088 .map_err(net_error_into_wasi_err),
1089 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1090 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1091 _ => Err(Errno::Notsup),
1092 }
1093 }
1094
1095 pub fn join_multicast_v6(&self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1096 let mut inner = self.inner.protected.write().unwrap();
1097 match &mut inner.kind {
1098 InodeSocketKind::UdpSocket { socket, .. } => socket
1099 .join_multicast_v6(multiaddr, iface)
1100 .map_err(net_error_into_wasi_err),
1101 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1102 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1103 _ => Err(Errno::Notsup),
1104 }
1105 }
1106
1107 pub fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1108 let mut inner = self.inner.protected.write().unwrap();
1109 match &mut inner.kind {
1110 InodeSocketKind::UdpSocket { socket, .. } => socket
1111 .leave_multicast_v6(multiaddr, iface)
1112 .map_err(net_error_into_wasi_err),
1113 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1114 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1115 _ => Err(Errno::Notsup),
1116 }
1117 }
1118
1119 pub async fn send(
1120 &self,
1121 tasks: &dyn VirtualTaskManager,
1122 buf: &[u8],
1123 timeout: Option<Duration>,
1124 nonblocking: bool,
1125 ) -> Result<usize, Errno> {
1126 struct SocketSender<'a, 'b> {
1127 inner: &'a InodeSocketInner,
1128 data: &'b [u8],
1129 nonblocking: bool,
1130 handler_registered: bool,
1131 }
1132 impl Drop for SocketSender<'_, '_> {
1133 fn drop(&mut self) {
1134 if self.handler_registered {
1135 let mut inner = self.inner.protected.write().unwrap();
1136 inner.remove_handler();
1137 }
1138 }
1139 }
1140 impl Future for SocketSender<'_, '_> {
1141 type Output = Result<usize, Errno>;
1142 fn poll(
1143 mut self: Pin<&mut Self>,
1144 cx: &mut std::task::Context<'_>,
1145 ) -> Poll<Self::Output> {
1146 loop {
1147 let mut inner = self.inner.protected.write().unwrap();
1148 let res = match &mut inner.kind {
1149 InodeSocketKind::Raw(socket) => socket.try_send(self.data),
1150 InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1151 InodeSocketKind::UdpSocket { socket, peer } => {
1152 if let Some(peer) = peer {
1153 socket.try_send_to(self.data, *peer)
1154 } else {
1155 Err(NetworkError::NotConnected)
1156 }
1157 }
1158 InodeSocketKind::PreSocket { .. } => {
1159 return Poll::Ready(Err(Errno::Notconn));
1160 }
1161 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1162 return match is_dead {
1163 true => Poll::Ready(Err(Errno::Connreset)),
1164 false => Poll::Ready(Ok(self.data.len())),
1165 };
1166 }
1167 _ => return Poll::Ready(Err(Errno::Notsup)),
1168 };
1169 return match res {
1170 Ok(amt) => Poll::Ready(Ok(amt)),
1171 Err(NetworkError::WouldBlock) if self.nonblocking => {
1172 Poll::Ready(Err(Errno::Again))
1173 }
1174 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1175 inner
1176 .set_handler(cx.waker().into())
1177 .map_err(net_error_into_wasi_err)?;
1178 drop(inner);
1179 self.handler_registered = true;
1180 continue;
1181 }
1182 Err(NetworkError::WouldBlock) => Poll::Pending,
1183 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1184 };
1185 }
1186 }
1187 }
1188
1189 let poller = SocketSender {
1190 inner: &self.inner,
1191 data: buf,
1192 nonblocking,
1193 handler_registered: false,
1194 };
1195 if let Some(timeout) = timeout {
1196 tokio::select! {
1197 res = poller => res,
1198 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1199 }
1200 } else {
1201 poller.await
1202 }
1203 }
1204
1205 pub async fn send_to<M: MemorySize>(
1206 &self,
1207 tasks: &dyn VirtualTaskManager,
1208 buf: &[u8],
1209 addr: SocketAddr,
1210 timeout: Option<Duration>,
1211 nonblocking: bool,
1212 ) -> Result<usize, Errno> {
1213 struct SocketSender<'a, 'b> {
1214 inner: &'a InodeSocketInner,
1215 data: &'b [u8],
1216 addr: SocketAddr,
1217 nonblocking: bool,
1218 handler_registered: bool,
1219 }
1220 impl Drop for SocketSender<'_, '_> {
1221 fn drop(&mut self) {
1222 if self.handler_registered {
1223 let mut inner = self.inner.protected.write().unwrap();
1224 inner.remove_handler();
1225 }
1226 }
1227 }
1228 impl Future for SocketSender<'_, '_> {
1229 type Output = Result<usize, Errno>;
1230 fn poll(
1231 mut self: Pin<&mut Self>,
1232 cx: &mut std::task::Context<'_>,
1233 ) -> Poll<Self::Output> {
1234 loop {
1235 let mut inner = self.inner.protected.write().unwrap();
1236 let res = match &mut inner.kind {
1237 InodeSocketKind::Icmp(socket) => socket.try_send_to(self.data, self.addr),
1238 InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1239 InodeSocketKind::UdpSocket { socket, .. } => {
1240 socket.try_send_to(self.data, self.addr)
1241 }
1242 InodeSocketKind::PreSocket { .. } => {
1243 return Poll::Ready(Err(Errno::Notconn));
1244 }
1245 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1246 return match is_dead {
1247 true => Poll::Ready(Err(Errno::Connreset)),
1248 false => Poll::Ready(Ok(self.data.len())),
1249 };
1250 }
1251 _ => return Poll::Ready(Err(Errno::Notsup)),
1252 };
1253 return match res {
1254 Ok(amt) => Poll::Ready(Ok(amt)),
1255 Err(NetworkError::WouldBlock) if self.nonblocking => {
1256 Poll::Ready(Err(Errno::Again))
1257 }
1258 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1259 inner
1260 .set_handler(cx.waker().into())
1261 .map_err(net_error_into_wasi_err)?;
1262 self.handler_registered = true;
1263 drop(inner);
1264 continue;
1265 }
1266 Err(NetworkError::WouldBlock) => Poll::Pending,
1267 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1268 };
1269 }
1270 }
1271 }
1272
1273 let poller = SocketSender {
1274 inner: &self.inner,
1275 data: buf,
1276 addr,
1277 nonblocking,
1278 handler_registered: false,
1279 };
1280 if let Some(timeout) = timeout {
1281 tokio::select! {
1282 res = poller => res,
1283 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1284 }
1285 } else {
1286 poller.await
1287 }
1288 }
1289
1290 pub async fn recv(
1291 &self,
1292 tasks: &dyn VirtualTaskManager,
1293 buf: &mut [MaybeUninit<u8>],
1294 timeout: Option<Duration>,
1295 nonblocking: bool,
1296 peek: bool,
1297 ) -> Result<usize, Errno> {
1298 struct SocketReceiver<'a, 'b> {
1299 inner: &'a InodeSocketInner,
1300 data: &'b mut [MaybeUninit<u8>],
1301 nonblocking: bool,
1302 peek: bool,
1303 handler_registered: bool,
1304 }
1305 impl Drop for SocketReceiver<'_, '_> {
1306 fn drop(&mut self) {
1307 if self.handler_registered {
1308 let mut inner = self.inner.protected.write().unwrap();
1309 inner.remove_handler();
1310 }
1311 }
1312 }
1313 impl Future for SocketReceiver<'_, '_> {
1314 type Output = Result<usize, Errno>;
1315 fn poll(
1316 mut self: Pin<&mut Self>,
1317 cx: &mut std::task::Context<'_>,
1318 ) -> Poll<Self::Output> {
1319 loop {
1320 let peek = self.peek;
1321 let mut inner = self.inner.protected.write().unwrap();
1322 let res = match &mut inner.kind {
1323 InodeSocketKind::Raw(socket) => socket.try_recv(self.data, peek),
1324 InodeSocketKind::TcpStream { socket, .. } => {
1325 socket.try_recv(self.data, peek)
1326 }
1327 InodeSocketKind::UdpSocket { socket, peer } => {
1328 if let Some(peer) = peer {
1329 match socket.try_recv_from(self.data, peek) {
1330 Ok((amt, addr)) if addr == *peer => Ok(amt),
1331 Ok(_) => Err(NetworkError::WouldBlock),
1332 Err(err) => Err(err),
1333 }
1334 } else {
1335 match socket.try_recv_from(self.data, peek) {
1336 Ok((amt, _)) => Ok(amt),
1337 Err(err) => Err(err),
1338 }
1339 }
1340 }
1341 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1342 return match is_dead {
1343 true => Poll::Ready(Ok(0)),
1344 false => Poll::Pending,
1345 };
1346 }
1347 InodeSocketKind::PreSocket { .. } => {
1348 return Poll::Ready(Err(Errno::Notconn));
1349 }
1350 _ => return Poll::Ready(Err(Errno::Notsup)),
1351 };
1352 return match res {
1353 Ok(amt) => Poll::Ready(Ok(amt)),
1354 Err(NetworkError::WouldBlock) if self.nonblocking => {
1355 Poll::Ready(Err(Errno::Again))
1356 }
1357 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1358 inner
1359 .set_handler(cx.waker().into())
1360 .map_err(net_error_into_wasi_err)?;
1361 self.handler_registered = true;
1362 drop(inner);
1363 continue;
1364 }
1365
1366 Err(NetworkError::WouldBlock) => Poll::Pending,
1367 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1368 };
1369 }
1370 }
1371 }
1372
1373 let poller = SocketReceiver {
1374 inner: &self.inner,
1375 data: buf,
1376 nonblocking,
1377 peek,
1378 handler_registered: false,
1379 };
1380 if let Some(timeout) = timeout {
1381 tokio::select! {
1382 res = poller => res,
1383 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1384 }
1385 } else {
1386 poller.await
1387 }
1388 }
1389
1390 pub async fn recv_from(
1391 &self,
1392 tasks: &dyn VirtualTaskManager,
1393 buf: &mut [MaybeUninit<u8>],
1394 timeout: Option<Duration>,
1395 nonblocking: bool,
1396 peek: bool,
1397 ) -> Result<(usize, SocketAddr), Errno> {
1398 struct SocketReceiver<'a, 'b> {
1399 inner: &'a InodeSocketInner,
1400 data: &'b mut [MaybeUninit<u8>],
1401 nonblocking: bool,
1402 peek: bool,
1403 handler_registered: bool,
1404 }
1405 impl Drop for SocketReceiver<'_, '_> {
1406 fn drop(&mut self) {
1407 if self.handler_registered {
1408 let mut inner = self.inner.protected.write().unwrap();
1409 inner.remove_handler();
1410 }
1411 }
1412 }
1413 impl Future for SocketReceiver<'_, '_> {
1414 type Output = Result<(usize, SocketAddr), Errno>;
1415 fn poll(
1416 mut self: Pin<&mut Self>,
1417 cx: &mut std::task::Context<'_>,
1418 ) -> Poll<Self::Output> {
1419 let peek = self.peek;
1420 let mut inner = self.inner.protected.write().unwrap();
1421 loop {
1422 let res = match &mut inner.kind {
1423 InodeSocketKind::Icmp(socket) => socket.try_recv_from(self.data, peek),
1424 InodeSocketKind::UdpSocket { socket, .. } => {
1425 socket.try_recv_from(self.data, peek)
1426 }
1427 InodeSocketKind::RemoteSocket {
1428 is_dead, peer_addr, ..
1429 } => {
1430 return match is_dead {
1431 true => Poll::Ready(Ok((0, *peer_addr))),
1432 false => Poll::Pending,
1433 };
1434 }
1435 InodeSocketKind::PreSocket { .. } => {
1436 return Poll::Ready(Err(Errno::Notconn));
1437 }
1438 _ => return Poll::Ready(Err(Errno::Notsup)),
1439 };
1440 return match res {
1441 Ok((amt, addr)) => Poll::Ready(Ok((amt, addr))),
1442 Err(NetworkError::WouldBlock) if self.nonblocking => {
1443 Poll::Ready(Err(Errno::Again))
1444 }
1445 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1446 inner
1447 .set_handler(cx.waker().into())
1448 .map_err(net_error_into_wasi_err)?;
1449 self.handler_registered = true;
1450 continue;
1451 }
1452 Err(NetworkError::WouldBlock) => Poll::Pending,
1453 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1454 };
1455 }
1456 }
1457 }
1458
1459 let poller = SocketReceiver {
1460 inner: &self.inner,
1461 data: buf,
1462 nonblocking,
1463 peek,
1464 handler_registered: false,
1465 };
1466 if let Some(timeout) = timeout {
1467 tokio::select! {
1468 res = poller => res,
1469 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1470 }
1471 } else {
1472 poller.await
1473 }
1474 }
1475
1476 pub fn shutdown(&mut self, how: std::net::Shutdown) -> Result<(), Errno> {
1477 let mut inner = self.inner.protected.write().unwrap();
1478 match &mut inner.kind {
1479 InodeSocketKind::TcpStream { socket, .. } => {
1480 socket.shutdown(how).map_err(net_error_into_wasi_err)?;
1481 }
1482 InodeSocketKind::RemoteSocket { .. } => return Ok(()),
1483 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
1484 _ => return Err(Errno::Notsup),
1485 }
1486 Ok(())
1487 }
1488
1489 pub async fn can_write(&self) -> bool {
1490 if let Ok(mut guard) = self.inner.protected.try_write() {
1491 #[allow(clippy::match_like_matches_macro)]
1492 match &mut guard.kind {
1493 InodeSocketKind::TcpStream { .. }
1494 | InodeSocketKind::UdpSocket { .. }
1495 | InodeSocketKind::Raw(..) => true,
1496 InodeSocketKind::RemoteSocket { is_dead, .. } => !(*is_dead),
1497 _ => false,
1498 }
1499 } else {
1500 false
1501 }
1502 }
1503}
1504
1505impl InodeSocketProtected {
1506 pub fn remove_handler(&mut self) {
1507 match &mut self.kind {
1508 InodeSocketKind::TcpListener { socket, .. } => socket.remove_handler(),
1509 InodeSocketKind::TcpStream { socket, .. } => socket.remove_handler(),
1510 InodeSocketKind::UdpSocket { socket, .. } => socket.remove_handler(),
1511 InodeSocketKind::Raw(socket) => socket.remove_handler(),
1512 InodeSocketKind::Icmp(socket) => socket.remove_handler(),
1513 InodeSocketKind::PreSocket { props, .. } => {
1514 props.handler.take();
1515 }
1516 InodeSocketKind::RemoteSocket { props, .. } => {
1517 props.handler.take();
1518 }
1519 }
1520 }
1521
1522 pub fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1523 match &mut self.kind {
1524 InodeSocketKind::TcpListener { socket, .. } => socket.poll_read_ready(cx),
1525 InodeSocketKind::TcpStream { socket, .. } => socket.poll_read_ready(cx),
1526 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_read_ready(cx),
1527 InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
1528 InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
1529 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1530 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1531 true => Poll::Ready(Ok(0)),
1532 false => Poll::Pending,
1533 },
1534 }
1535 .map_err(net_error_into_io_err)
1536 }
1537
1538 pub fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1539 match &mut self.kind {
1540 InodeSocketKind::TcpListener { socket, .. } => socket.poll_write_ready(cx),
1541 InodeSocketKind::TcpStream { socket, .. } => socket.poll_write_ready(cx),
1542 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_write_ready(cx),
1543 InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
1544 InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
1545 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1546 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1547 true => Poll::Ready(Ok(0)),
1548 false => Poll::Pending,
1549 },
1550 }
1551 .map_err(net_error_into_io_err)
1552 }
1553
1554 pub fn set_handler(
1555 &mut self,
1556 handler: Box<dyn InterestHandler + Send + Sync>,
1557 ) -> virtual_net::Result<()> {
1558 match &mut self.kind {
1559 InodeSocketKind::TcpListener { socket, .. } => socket.set_handler(handler),
1560 InodeSocketKind::TcpStream { socket, .. } => socket.set_handler(handler),
1561 InodeSocketKind::UdpSocket { socket, .. } => socket.set_handler(handler),
1562 InodeSocketKind::Raw(socket) => socket.set_handler(handler),
1563 InodeSocketKind::Icmp(socket) => socket.set_handler(handler),
1564 InodeSocketKind::PreSocket { props, .. }
1565 | InodeSocketKind::RemoteSocket { props, .. } => {
1566 props.handler.replace(handler);
1567 Ok(())
1568 }
1569 }
1570 }
1571}
1572
1573#[allow(dead_code)]
1575pub(crate) fn all_socket_rights() -> Rights {
1576 Rights::FD_FDSTAT_SET_FLAGS
1577 .union(Rights::FD_FILESTAT_GET)
1578 .union(Rights::FD_READ)
1579 .union(Rights::FD_WRITE)
1580 .union(Rights::POLL_FD_READWRITE)
1581 .union(Rights::SOCK_SHUTDOWN)
1582 .union(Rights::SOCK_CONNECT)
1583 .union(Rights::SOCK_LISTEN)
1584 .union(Rights::SOCK_BIND)
1585 .union(Rights::SOCK_ACCEPT)
1586 .union(Rights::SOCK_RECV)
1587 .union(Rights::SOCK_SEND)
1588 .union(Rights::SOCK_ADDR_LOCAL)
1589 .union(Rights::SOCK_ADDR_REMOTE)
1590 .union(Rights::SOCK_RECV_FROM)
1591 .union(Rights::SOCK_SEND_TO)
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596 use super::{InodeSocket, InodeSocketKind, WasiSocketStatus};
1597 use std::{
1598 mem::MaybeUninit,
1599 net::{Ipv4Addr, Shutdown, SocketAddr},
1600 pin::Pin,
1601 sync::{
1602 Arc,
1603 atomic::{AtomicUsize, Ordering},
1604 },
1605 task::{Context, Poll},
1606 time::Duration,
1607 };
1608 use virtual_mio::InterestHandler;
1609 use virtual_net::{
1610 NetworkError, Result as NetResult, SocketStatus, VirtualConnectedSocket, VirtualIoSource,
1611 VirtualSocket, VirtualTcpSocket,
1612 };
1613
1614 #[derive(Debug)]
1615 struct MockTcpSocket {
1616 read_calls: Arc<AtomicUsize>,
1617 write_calls: Arc<AtomicUsize>,
1618 status: Arc<AtomicUsize>,
1619 }
1620
1621 const MOCK_STATUS_OPENING: usize = 0;
1622 const MOCK_STATUS_OPENED: usize = 1;
1623
1624 fn decode_mock_status(value: usize) -> SocketStatus {
1625 match value {
1626 MOCK_STATUS_OPENED => SocketStatus::Opened,
1627 _ => SocketStatus::Opening,
1628 }
1629 }
1630
1631 impl VirtualIoSource for MockTcpSocket {
1632 fn remove_handler(&mut self) {}
1633
1634 fn poll_read_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1635 self.read_calls.fetch_add(1, Ordering::Relaxed);
1636 Poll::Ready(Ok(3))
1637 }
1638
1639 fn poll_write_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1640 self.write_calls.fetch_add(1, Ordering::Relaxed);
1641 self.status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1642 Poll::Ready(Ok(7))
1643 }
1644 }
1645
1646 impl VirtualSocket for MockTcpSocket {
1647 fn set_ttl(&mut self, _ttl: u32) -> NetResult<()> {
1648 Ok(())
1649 }
1650
1651 fn ttl(&self) -> NetResult<u32> {
1652 Ok(64)
1653 }
1654
1655 fn addr_local(&self) -> NetResult<SocketAddr> {
1656 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)))
1657 }
1658
1659 fn status(&self) -> NetResult<SocketStatus> {
1660 Ok(decode_mock_status(self.status.load(Ordering::Relaxed)))
1661 }
1662
1663 fn set_handler(
1664 &mut self,
1665 _handler: Box<dyn InterestHandler + Send + Sync>,
1666 ) -> NetResult<()> {
1667 Ok(())
1668 }
1669 }
1670
1671 impl VirtualConnectedSocket for MockTcpSocket {
1672 fn set_linger(&mut self, _linger: Option<Duration>) -> NetResult<()> {
1673 Ok(())
1674 }
1675
1676 fn linger(&self) -> NetResult<Option<Duration>> {
1677 Ok(None)
1678 }
1679
1680 fn try_send(&mut self, _data: &[u8]) -> NetResult<usize> {
1681 Err(NetworkError::Unsupported)
1682 }
1683
1684 fn try_flush(&mut self) -> NetResult<()> {
1685 Err(NetworkError::Unsupported)
1686 }
1687
1688 fn close(&mut self) -> NetResult<()> {
1689 Ok(())
1690 }
1691
1692 fn try_recv(&mut self, _buf: &mut [MaybeUninit<u8>], _peek: bool) -> NetResult<usize> {
1693 Err(NetworkError::Unsupported)
1694 }
1695 }
1696
1697 impl VirtualTcpSocket for MockTcpSocket {
1698 fn set_recv_buf_size(&mut self, _size: usize) -> NetResult<()> {
1699 Ok(())
1700 }
1701
1702 fn recv_buf_size(&self) -> NetResult<usize> {
1703 Ok(0)
1704 }
1705
1706 fn set_send_buf_size(&mut self, _size: usize) -> NetResult<()> {
1707 Ok(())
1708 }
1709
1710 fn send_buf_size(&self) -> NetResult<usize> {
1711 Ok(0)
1712 }
1713
1714 fn set_nodelay(&mut self, _reuse: bool) -> NetResult<()> {
1715 Ok(())
1716 }
1717
1718 fn nodelay(&self) -> NetResult<bool> {
1719 Ok(true)
1720 }
1721
1722 fn set_keepalive(&mut self, _keepalive: bool) -> NetResult<()> {
1723 Ok(())
1724 }
1725
1726 fn keepalive(&self) -> NetResult<bool> {
1727 Ok(false)
1728 }
1729
1730 fn set_dontroute(&mut self, _keepalive: bool) -> NetResult<()> {
1731 Ok(())
1732 }
1733
1734 fn dontroute(&self) -> NetResult<bool> {
1735 Ok(false)
1736 }
1737
1738 fn addr_peer(&self) -> NetResult<SocketAddr> {
1739 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 80)))
1740 }
1741
1742 fn shutdown(&mut self, _how: Shutdown) -> NetResult<()> {
1743 Ok(())
1744 }
1745
1746 fn is_closed(&self) -> bool {
1747 false
1748 }
1749 }
1750
1751 #[test]
1752 fn inode_socket_poll_write_ready_uses_write_path() {
1753 let read_calls = Arc::new(AtomicUsize::new(0));
1754 let write_calls = Arc::new(AtomicUsize::new(0));
1755 let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENED));
1756 let mut inode = InodeSocket::new(InodeSocketKind::TcpStream {
1757 socket: Box::new(MockTcpSocket {
1758 read_calls: read_calls.clone(),
1759 write_calls: write_calls.clone(),
1760 status,
1761 }),
1762 write_timeout: None,
1763 read_timeout: None,
1764 });
1765
1766 let waker = futures::task::noop_waker();
1767 let mut cx = Context::from_waker(&waker);
1768 let ready = Pin::new(&mut inode).poll_write_ready(&mut cx);
1769
1770 assert!(matches!(ready, Poll::Ready(Ok(7))));
1771 assert_eq!(read_calls.load(Ordering::Relaxed), 0);
1772 assert_eq!(write_calls.load(Ordering::Relaxed), 1);
1773 }
1774
1775 #[test]
1776 fn inode_socket_status_tracks_tcp_socket_status() {
1777 let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENING));
1778 let inode = InodeSocket::new(InodeSocketKind::TcpStream {
1779 socket: Box::new(MockTcpSocket {
1780 read_calls: Arc::new(AtomicUsize::new(0)),
1781 write_calls: Arc::new(AtomicUsize::new(0)),
1782 status: status.clone(),
1783 }),
1784 write_timeout: None,
1785 read_timeout: None,
1786 });
1787
1788 assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opening));
1789 status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1790 assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opened));
1791 }
1792}