1use std::{
2 future::Future,
3 io,
4 mem::MaybeUninit,
5 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
6 pin::Pin,
7 sync::{Arc, RwLock, RwLockWriteGuard},
8 task::{Context, Poll},
9 time::Duration,
10};
11
12#[cfg(feature = "enable-serde")]
13use serde_derive::{Deserialize, Serialize};
14use virtual_mio::InterestHandler;
15use virtual_net::{
16 NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpListener,
17 VirtualTcpSocket, VirtualUdpSocket, net_error_into_io_err,
18};
19use wasmer_types::MemorySize;
20use wasmer_wasix_types::wasi::{Addressfamily, Errno, Rights, SockProto, Sockoption, Socktype};
21
22use crate::{VirtualTaskManager, net::net_error_into_wasi_err};
23
24#[derive(Debug)]
25#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
26pub enum InodeHttpSocketType {
27 Request,
29 Response,
31 Headers,
33}
34
35#[derive(Debug)]
36pub struct SocketProperties {
37 pub family: Addressfamily,
38 pub ty: Socktype,
39 pub pt: SockProto,
40 pub only_v6: bool,
41 pub reuse_port: bool,
42 pub reuse_addr: bool,
43 pub no_delay: Option<bool>,
44 pub keep_alive: Option<bool>,
45 pub dont_route: Option<bool>,
46 pub send_buf_size: Option<usize>,
47 pub recv_buf_size: Option<usize>,
48 pub write_timeout: Option<Duration>,
49 pub read_timeout: Option<Duration>,
50 pub accept_timeout: Option<Duration>,
51 pub connect_timeout: Option<Duration>,
52 pub handler: Option<Box<dyn InterestHandler + Send + Sync>>,
53}
54
55#[derive(Debug)]
56pub enum InodeSocketKind {
58 PreSocket {
59 props: SocketProperties,
60 addr: Option<SocketAddr>,
61 },
62 Icmp(Box<dyn VirtualIcmpSocket + Sync>),
63 Raw(Box<dyn VirtualRawSocket + Sync>),
64 TcpListener {
65 socket: Box<dyn VirtualTcpListener + Sync>,
66 accept_timeout: Option<Duration>,
67 },
68 TcpStream {
69 socket: Box<dyn VirtualTcpSocket + Sync>,
70 write_timeout: Option<Duration>,
71 read_timeout: Option<Duration>,
72 },
73 UdpSocket {
74 socket: Box<dyn VirtualUdpSocket + Sync>,
75 peer: Option<SocketAddr>,
76 },
77 RemoteSocket {
78 props: SocketProperties,
79 local_addr: SocketAddr,
80 peer_addr: SocketAddr,
81 ttl: u32,
82 multicast_ttl: u32,
83 is_dead: bool,
84 },
85}
86
87pub enum WasiSocketOption {
88 Noop,
89 ReusePort,
90 ReuseAddr,
91 NoDelay,
92 DontRoute,
93 OnlyV6,
94 Broadcast,
95 MulticastLoopV4,
96 MulticastLoopV6,
97 Promiscuous,
98 Listening,
99 LastError,
100 KeepAlive,
101 Linger,
102 OobInline,
103 RecvBufSize,
104 SendBufSize,
105 RecvLowat,
106 SendLowat,
107 RecvTimeout,
108 SendTimeout,
109 ConnectTimeout,
110 AcceptTimeout,
111 Ttl,
112 MulticastTtlV4,
113 Type,
114 Proto,
115}
116
117impl From<Sockoption> for WasiSocketOption {
118 fn from(opt: Sockoption) -> Self {
119 use WasiSocketOption::*;
120 match opt {
121 Sockoption::Noop => Noop,
122 Sockoption::ReusePort => ReusePort,
123 Sockoption::ReuseAddr => ReuseAddr,
124 Sockoption::NoDelay => NoDelay,
125 Sockoption::DontRoute => DontRoute,
126 Sockoption::OnlyV6 => OnlyV6,
127 Sockoption::Broadcast => Broadcast,
128 Sockoption::MulticastLoopV4 => MulticastLoopV4,
129 Sockoption::MulticastLoopV6 => MulticastLoopV6,
130 Sockoption::Promiscuous => Promiscuous,
131 Sockoption::Listening => Listening,
132 Sockoption::LastError => LastError,
133 Sockoption::KeepAlive => KeepAlive,
134 Sockoption::Linger => Linger,
135 Sockoption::OobInline => OobInline,
136 Sockoption::RecvBufSize => RecvBufSize,
137 Sockoption::SendBufSize => SendBufSize,
138 Sockoption::RecvLowat => RecvLowat,
139 Sockoption::SendLowat => SendLowat,
140 Sockoption::RecvTimeout => RecvTimeout,
141 Sockoption::SendTimeout => SendTimeout,
142 Sockoption::ConnectTimeout => ConnectTimeout,
143 Sockoption::AcceptTimeout => AcceptTimeout,
144 Sockoption::Ttl => Ttl,
145 Sockoption::MulticastTtlV4 => MulticastTtlV4,
146 Sockoption::Type => Type,
147 Sockoption::Proto => Proto,
148 }
149 }
150}
151
152#[derive(Debug)]
153pub enum WasiSocketStatus {
154 Opening,
155 Opened,
156 Closed,
157 Failed,
158}
159
160#[derive(Debug, Copy, Clone, PartialEq, Eq)]
161#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
162pub enum TimeType {
163 ReadTimeout,
164 WriteTimeout,
165 AcceptTimeout,
166 ConnectTimeout,
167 BindTimeout,
168 Linger,
169}
170
171impl From<TimeType> for wasmer_journal::SocketOptTimeType {
172 fn from(value: TimeType) -> Self {
173 match value {
174 TimeType::ReadTimeout => Self::ReadTimeout,
175 TimeType::WriteTimeout => Self::WriteTimeout,
176 TimeType::AcceptTimeout => Self::AcceptTimeout,
177 TimeType::ConnectTimeout => Self::ConnectTimeout,
178 TimeType::BindTimeout => Self::BindTimeout,
179 TimeType::Linger => Self::Linger,
180 }
181 }
182}
183
184impl From<wasmer_journal::SocketOptTimeType> for TimeType {
185 fn from(value: wasmer_journal::SocketOptTimeType) -> Self {
186 use wasmer_journal::SocketOptTimeType;
187 match value {
188 SocketOptTimeType::ReadTimeout => TimeType::ReadTimeout,
189 SocketOptTimeType::WriteTimeout => TimeType::WriteTimeout,
190 SocketOptTimeType::AcceptTimeout => TimeType::AcceptTimeout,
191 SocketOptTimeType::ConnectTimeout => TimeType::ConnectTimeout,
192 SocketOptTimeType::BindTimeout => TimeType::BindTimeout,
193 SocketOptTimeType::Linger => TimeType::Linger,
194 }
195 }
196}
197
198#[derive(Debug)]
199pub(crate) struct InodeSocketProtected {
201 pub kind: InodeSocketKind,
202}
203
204#[derive(Debug)]
205pub(crate) struct InodeSocketInner {
207 pub protected: RwLock<InodeSocketProtected>,
208}
209
210#[derive(Debug, Clone)]
211pub struct InodeSocket {
213 pub(crate) inner: Arc<InodeSocketInner>,
214}
215
216impl InodeSocket {
217 pub fn new(kind: InodeSocketKind) -> Self {
218 let protected = InodeSocketProtected { kind };
219 Self {
220 inner: Arc::new(InodeSocketInner {
221 protected: RwLock::new(protected),
222 }),
223 }
224 }
225
226 pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
227 let mut inner = self.inner.protected.write().unwrap();
228 inner.poll_read_ready(cx)
229 }
230
231 pub fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
232 let mut inner = self.inner.protected.write().unwrap();
233 inner.poll_read_ready(cx)
234 }
235
236 #[allow(clippy::await_holding_lock, clippy::readonly_write_lock)]
241 pub async fn auto_bind_udp(
242 &self,
243 tasks: &dyn VirtualTaskManager,
244 net: &dyn VirtualNetworking,
245 ) -> Result<Option<InodeSocket>, Errno> {
246 let timeout = self
247 .opt_time(TimeType::BindTimeout)
248 .ok()
249 .flatten()
250 .unwrap_or(Duration::from_secs(30));
251 let inner = self.inner.protected.write().unwrap();
252 match &inner.kind {
253 InodeSocketKind::PreSocket { props, .. } if props.ty == Socktype::Dgram => {
254 let addr = match props.family {
255 Addressfamily::Inet4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
256 Addressfamily::Inet6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
257 _ => return Err(Errno::Notsup),
258 };
259 Self::bind_internal(tasks, net, addr, timeout, inner).await
260 }
261 _ => Ok(None),
262 }
263 }
264
265 pub async fn bind(
266 &self,
267 tasks: &dyn VirtualTaskManager,
268 net: &dyn VirtualNetworking,
269 set_addr: SocketAddr,
270 ) -> Result<Option<InodeSocket>, Errno> {
271 let timeout = self
272 .opt_time(TimeType::BindTimeout)
273 .ok()
274 .flatten()
275 .unwrap_or(Duration::from_secs(30));
276 let inner = self.inner.protected.write().unwrap();
277 Self::bind_internal(tasks, net, set_addr, timeout, inner).await
278 }
279
280 #[allow(clippy::await_holding_lock)]
282 async fn bind_internal(
283 tasks: &dyn VirtualTaskManager,
284 net: &dyn VirtualNetworking,
285 set_addr: SocketAddr,
286 timeout: Duration,
287 mut inner: RwLockWriteGuard<'_, InodeSocketProtected>,
288 ) -> Result<Option<InodeSocket>, Errno> {
289 let socket = {
290 match &mut inner.kind {
291 InodeSocketKind::PreSocket { props, addr, .. } => {
292 match props.family {
293 Addressfamily::Inet4 => {
294 if !set_addr.is_ipv4() {
295 tracing::debug!(
296 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
297 );
298 return Err(Errno::Inval);
299 }
300 }
301 Addressfamily::Inet6 => {
302 if !set_addr.is_ipv6() {
303 tracing::debug!(
304 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
305 );
306 return Err(Errno::Inval);
307 }
308 }
309 _ => {
310 return Err(Errno::Notsup);
311 }
312 }
313
314 addr.replace(set_addr);
315 let addr = (*addr).unwrap();
316
317 match props.ty {
318 Socktype::Stream => {
319 return Ok(None);
322 }
323 Socktype::Dgram => {
324 let reuse_port = props.reuse_port;
325 let reuse_addr = props.reuse_addr;
326
327 net.bind_udp(addr, reuse_port, reuse_addr)
328 }
329 _ => return Err(Errno::Inval),
330 }
331 }
332 InodeSocketKind::RemoteSocket {
333 props,
334 local_addr: addr,
335 ..
336 } => {
337 match props.family {
338 Addressfamily::Inet4 => {
339 if !set_addr.is_ipv4() {
340 tracing::debug!(
341 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
342 );
343 return Err(Errno::Inval);
344 }
345 }
346 Addressfamily::Inet6 => {
347 if !set_addr.is_ipv6() {
348 tracing::debug!(
349 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
350 );
351 return Err(Errno::Inval);
352 }
353 }
354 _ => {
355 return Err(Errno::Notsup);
356 }
357 }
358
359 *addr = set_addr;
360 let addr = *addr;
361
362 match props.ty {
363 Socktype::Stream => {
364 return Ok(None);
367 }
368 Socktype::Dgram => {
369 let reuse_port = props.reuse_port;
370 let reuse_addr = props.reuse_addr;
371
372 net.bind_udp(addr, reuse_port, reuse_addr)
373 }
374 _ => return Err(Errno::Inval),
375 }
376 }
377 _ => return Err(Errno::Notsup),
378 }
379 };
380
381 drop(inner);
382
383 tokio::select! {
384 socket = socket => {
385 let socket = socket.map_err(net_error_into_wasi_err)?;
386 Ok(Some(InodeSocket::new(InodeSocketKind::UdpSocket { socket, peer: None })))
387 },
388 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
389 }
390 }
391
392 pub async fn listen(
393 &self,
394 tasks: &dyn VirtualTaskManager,
395 net: &dyn VirtualNetworking,
396 _backlog: usize,
397 ) -> Result<Option<InodeSocket>, Errno> {
398 let timeout = self
399 .opt_time(TimeType::AcceptTimeout)
400 .ok()
401 .flatten()
402 .unwrap_or(Duration::from_secs(30));
403
404 let socket = {
405 let inner = self.inner.protected.read().unwrap();
406 match &inner.kind {
407 InodeSocketKind::PreSocket { props, addr, .. } => match props.ty {
408 Socktype::Stream => {
409 if addr.is_none() {
410 tracing::warn!("wasi[?]::sock_listen - failed - address not set");
411 return Err(Errno::Inval);
412 }
413 let addr = *addr.as_ref().unwrap();
414 let only_v6 = props.only_v6;
415 let reuse_port = props.reuse_port;
416 let reuse_addr = props.reuse_addr;
417 drop(inner);
418
419 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
420 }
421 ty => {
422 tracing::warn!(
423 "wasi[?]::sock_listen - failed - not supported(pre-socket:{:?})",
424 ty
425 );
426 return Err(Errno::Notsup);
427 }
428 },
429 InodeSocketKind::RemoteSocket {
430 props,
431 local_addr: addr,
432 ..
433 } => match props.ty {
434 Socktype::Stream => {
435 let addr = *addr;
436 let only_v6 = props.only_v6;
437 let reuse_port = props.reuse_port;
438 let reuse_addr = props.reuse_addr;
439 drop(inner);
440
441 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
442 }
443 ty => {
444 tracing::warn!(
445 "wasi[?]::sock_listen - failed - not supported(remote-socket:{:?})",
446 ty
447 );
448 return Err(Errno::Notsup);
449 }
450 },
451 InodeSocketKind::Icmp(_) => {
452 tracing::warn!("wasi[?]::sock_listen - failed - not supported(icmp)");
453 return Err(Errno::Notsup);
454 }
455 InodeSocketKind::Raw(_) => {
456 tracing::warn!("wasi[?]::sock_listen - failed - not supported(raw)");
457 return Err(Errno::Notsup);
458 }
459 InodeSocketKind::TcpListener { .. } => {
460 tracing::warn!(
461 "wasi[?]::sock_listen - failed - already listening (tcp-listener)"
462 );
463 return Err(Errno::Notsup);
464 }
465 InodeSocketKind::TcpStream { .. } => {
466 tracing::warn!("wasi[?]::sock_listen - failed - not supported(tcp-stream)");
467 return Err(Errno::Notsup);
468 }
469 InodeSocketKind::UdpSocket { .. } => {
470 tracing::warn!("wasi[?]::sock_listen - failed - not supported(udp-socket)");
471 return Err(Errno::Notsup);
472 }
473 }
474 };
475
476 tokio::select! {
477 socket = socket => {
478 let socket = socket.map_err(net_error_into_wasi_err)?;
479 Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
480 socket,
481 accept_timeout: Some(timeout),
482 })))
483 },
484 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
485 }
486 }
487
488 pub async fn accept(
489 &self,
490 tasks: &dyn VirtualTaskManager,
491 nonblocking: bool,
492 timeout: Option<Duration>,
493 ) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno> {
494 struct SocketAccepter<'a> {
495 sock: &'a InodeSocket,
496 nonblocking: bool,
497 handler_registered: bool,
498 }
499 impl Drop for SocketAccepter<'_> {
500 fn drop(&mut self) {
501 if self.handler_registered {
502 let mut inner = self.sock.inner.protected.write().unwrap();
503 inner.remove_handler();
504 }
505 }
506 }
507 impl Future for SocketAccepter<'_> {
508 type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno>;
509 fn poll(
510 mut self: Pin<&mut Self>,
511 cx: &mut std::task::Context<'_>,
512 ) -> std::task::Poll<Self::Output> {
513 loop {
514 let mut inner = self.sock.inner.protected.write().unwrap();
515 return match &mut inner.kind {
516 InodeSocketKind::TcpListener { socket, .. } => match socket.try_accept() {
517 Ok((child, addr)) => Poll::Ready(Ok((child, addr))),
518 Err(NetworkError::WouldBlock) if self.nonblocking => {
519 Poll::Ready(Err(Errno::Again))
520 }
521 Err(NetworkError::WouldBlock) if !self.handler_registered => {
522 let res = socket.set_handler(cx.waker().into());
523 if let Err(err) = res {
524 return Poll::Ready(Err(net_error_into_wasi_err(err)));
525 }
526 drop(inner);
527 self.handler_registered = true;
528 continue;
529 }
530 Err(NetworkError::WouldBlock) => Poll::Pending,
531 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
532 },
533 InodeSocketKind::PreSocket { .. } => Poll::Ready(Err(Errno::Notconn)),
534 _ => Poll::Ready(Err(Errno::Notsup)),
535 };
536 }
537 }
538 }
539
540 let acceptor = SocketAccepter {
541 sock: self,
542 nonblocking,
543 handler_registered: false,
544 };
545 if let Some(timeout) = timeout {
546 tokio::select! {
547 res = acceptor => res,
548 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
549 }
550 } else {
551 acceptor.await
552 }
553 }
554
555 pub fn close(&self) -> Result<(), Errno> {
556 let mut inner = self.inner.protected.write().unwrap();
557 match &mut inner.kind {
558 InodeSocketKind::TcpListener { .. } => {}
559 InodeSocketKind::TcpStream { socket, .. } => {
560 socket.close().map_err(net_error_into_wasi_err)?;
561 }
562 InodeSocketKind::Icmp(_) => {}
563 InodeSocketKind::UdpSocket { .. } => {}
564 InodeSocketKind::Raw(_) => {}
565 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
566 InodeSocketKind::RemoteSocket { .. } => {}
567 };
568 Ok(())
569 }
570
571 pub async fn connect(
572 &mut self,
573 tasks: &dyn VirtualTaskManager,
574 net: &dyn VirtualNetworking,
575 peer: SocketAddr,
576 timeout: Option<std::time::Duration>,
577 nonblocking: bool,
578 ) -> Result<Option<InodeSocket>, Errno> {
579 let new_write_timeout;
580 let new_read_timeout;
581
582 let timeout = timeout.unwrap_or(Duration::from_secs(30));
583
584 let handler;
585 let connect = {
586 let mut inner = self.inner.protected.write().unwrap();
587 match &mut inner.kind {
588 InodeSocketKind::PreSocket { props, addr, .. } => {
589 handler = props.handler.take();
590 new_write_timeout = props.write_timeout;
591 new_read_timeout = props.read_timeout;
592 match props.ty {
593 Socktype::Stream => {
594 let no_delay = props.no_delay;
595 let keep_alive = props.keep_alive;
596 let dont_route = props.dont_route;
597 let addr = match addr {
598 Some(a) => *a,
599 None => {
600 let ip = match peer.is_ipv4() {
601 true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
602 false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
603 };
604 SocketAddr::new(ip, 0)
605 }
606 };
607 Box::pin(async move {
608 let mut ret = net.connect_tcp(addr, peer).await?;
609 if let Some(no_delay) = no_delay {
610 ret.set_nodelay(no_delay).ok();
611 }
612 if let Some(keep_alive) = keep_alive {
613 ret.set_keepalive(keep_alive).ok();
614 }
615 if let Some(dont_route) = dont_route {
616 ret.set_dontroute(dont_route).ok();
617 }
618 if !nonblocking {
619 futures::future::poll_fn(|cx| ret.poll_write_ready(cx)).await?;
620 }
621 Ok(ret)
622 })
623 }
624 Socktype::Dgram => return Err(Errno::Inval),
625 _ => return Err(Errno::Notsup),
626 }
627 }
628 InodeSocketKind::UdpSocket {
629 peer: target_peer, ..
630 } => {
631 target_peer.replace(peer);
632 return Ok(None);
633 }
634 InodeSocketKind::RemoteSocket { peer_addr, .. } => {
635 *peer_addr = peer;
636 return Ok(None);
637 }
638 _ => return Err(Errno::Notsup),
639 }
640 };
641
642 let mut socket = tokio::select! {
643 res = connect => res.map_err(net_error_into_wasi_err)?,
644 _ = tasks.sleep_now(timeout) => return Err(Errno::Timedout)
645 };
646
647 if let Some(handler) = handler {
648 socket
649 .set_handler(handler)
650 .map_err(net_error_into_wasi_err)?;
651 }
652
653 let socket = InodeSocket::new(InodeSocketKind::TcpStream {
654 socket,
655 write_timeout: new_write_timeout,
656 read_timeout: new_read_timeout,
657 });
658
659 Ok(Some(socket))
660 }
661
662 pub fn status(&self) -> Result<WasiSocketStatus, Errno> {
663 let inner = self.inner.protected.read().unwrap();
664 Ok(match &inner.kind {
665 InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
666 InodeSocketKind::TcpListener { .. } => WasiSocketStatus::Opened,
667 InodeSocketKind::TcpStream { .. } => WasiSocketStatus::Opened,
668 InodeSocketKind::UdpSocket { .. } => WasiSocketStatus::Opened,
669 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
670 true => WasiSocketStatus::Closed,
671 false => WasiSocketStatus::Opened,
672 },
673 _ => WasiSocketStatus::Failed,
674 })
675 }
676
677 pub fn addr_local(&self) -> Result<SocketAddr, Errno> {
678 let inner = self.inner.protected.read().unwrap();
679 Ok(match &inner.kind {
680 InodeSocketKind::PreSocket { props, addr, .. } => {
681 if let Some(addr) = addr {
682 *addr
683 } else {
684 SocketAddr::new(
685 match props.family {
686 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
687 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
688 _ => return Err(Errno::Inval),
689 },
690 0,
691 )
692 }
693 }
694 InodeSocketKind::Icmp(sock) => sock.addr_local().map_err(net_error_into_wasi_err)?,
695 InodeSocketKind::TcpListener { socket, .. } => {
696 socket.addr_local().map_err(net_error_into_wasi_err)?
697 }
698 InodeSocketKind::TcpStream { socket, .. } => {
699 socket.addr_local().map_err(net_error_into_wasi_err)?
700 }
701 InodeSocketKind::UdpSocket { socket, .. } => {
702 socket.addr_local().map_err(net_error_into_wasi_err)?
703 }
704 InodeSocketKind::RemoteSocket {
705 local_addr: addr, ..
706 } => *addr,
707 _ => return Err(Errno::Notsup),
708 })
709 }
710
711 pub fn addr_peer(&self) -> Result<SocketAddr, Errno> {
712 let inner = self.inner.protected.read().unwrap();
713 Ok(match &inner.kind {
714 InodeSocketKind::PreSocket { props, .. } => SocketAddr::new(
715 match props.family {
716 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
717 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
718 _ => return Err(Errno::Inval),
719 },
720 0,
721 ),
722 InodeSocketKind::TcpStream { socket, .. } => {
723 socket.addr_peer().map_err(net_error_into_wasi_err)?
724 }
725 InodeSocketKind::UdpSocket { socket, .. } => socket
726 .addr_peer()
727 .map_err(net_error_into_wasi_err)?
728 .map(Ok)
729 .unwrap_or_else(|| {
730 socket
731 .addr_local()
732 .map_err(net_error_into_wasi_err)
733 .map(|addr| {
734 SocketAddr::new(
735 match addr {
736 SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
737 SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
738 },
739 0,
740 )
741 })
742 })?,
743 InodeSocketKind::RemoteSocket { peer_addr, .. } => *peer_addr,
744 _ => return Err(Errno::Notsup),
745 })
746 }
747
748 pub fn set_opt_flag(&mut self, option: WasiSocketOption, val: bool) -> Result<(), Errno> {
749 let mut inner = self.inner.protected.write().unwrap();
750 match &mut inner.kind {
751 InodeSocketKind::PreSocket { props, .. }
752 | InodeSocketKind::RemoteSocket { props, .. } => {
753 match option {
754 WasiSocketOption::OnlyV6 => props.only_v6 = val,
755 WasiSocketOption::ReusePort => props.reuse_port = val,
756 WasiSocketOption::ReuseAddr => props.reuse_addr = val,
757 WasiSocketOption::NoDelay => props.no_delay = Some(val),
758 WasiSocketOption::KeepAlive => props.keep_alive = Some(val),
759 WasiSocketOption::DontRoute => props.dont_route = Some(val),
760 _ => return Err(Errno::Inval),
761 };
762 }
763 InodeSocketKind::Raw(sock) => match option {
764 WasiSocketOption::Promiscuous => {
765 sock.set_promiscuous(val).map_err(net_error_into_wasi_err)?
766 }
767 _ => return Err(Errno::Inval),
768 },
769 InodeSocketKind::TcpStream { socket, .. } => match option {
770 WasiSocketOption::NoDelay => {
771 socket.set_nodelay(val).map_err(net_error_into_wasi_err)?
772 }
773 WasiSocketOption::KeepAlive => {
774 socket.set_keepalive(val).map_err(net_error_into_wasi_err)?
775 }
776 WasiSocketOption::DontRoute => {
777 socket.set_dontroute(val).map_err(net_error_into_wasi_err)?
778 }
779 _ => return Err(Errno::Inval),
780 },
781 InodeSocketKind::TcpListener { .. } => return Err(Errno::Inval),
782 InodeSocketKind::UdpSocket { socket, .. } => match option {
783 WasiSocketOption::Broadcast => {
784 socket.set_broadcast(val).map_err(net_error_into_wasi_err)?
785 }
786 WasiSocketOption::MulticastLoopV4 => socket
787 .set_multicast_loop_v4(val)
788 .map_err(net_error_into_wasi_err)?,
789 WasiSocketOption::MulticastLoopV6 => socket
790 .set_multicast_loop_v6(val)
791 .map_err(net_error_into_wasi_err)?,
792 _ => return Err(Errno::Inval),
793 },
794 _ => return Err(Errno::Notsup),
795 }
796 Ok(())
797 }
798
799 pub fn get_opt_flag(&self, option: WasiSocketOption) -> Result<bool, Errno> {
800 let mut inner = self.inner.protected.write().unwrap();
801 Ok(match &mut inner.kind {
802 InodeSocketKind::PreSocket { props, .. }
803 | InodeSocketKind::RemoteSocket { props, .. } => match option {
804 WasiSocketOption::OnlyV6 => props.only_v6,
805 WasiSocketOption::ReusePort => props.reuse_port,
806 WasiSocketOption::ReuseAddr => props.reuse_addr,
807 WasiSocketOption::NoDelay => props.no_delay.unwrap_or_default(),
808 WasiSocketOption::KeepAlive => props.keep_alive.unwrap_or_default(),
809 _ => return Err(Errno::Inval),
810 },
811 InodeSocketKind::Raw(sock) => match option {
812 WasiSocketOption::Promiscuous => {
813 sock.promiscuous().map_err(net_error_into_wasi_err)?
814 }
815 _ => return Err(Errno::Inval),
816 },
817 InodeSocketKind::TcpStream { socket, .. } => match option {
818 WasiSocketOption::NoDelay => socket.nodelay().map_err(net_error_into_wasi_err)?,
819 WasiSocketOption::KeepAlive => {
820 socket.keepalive().map_err(net_error_into_wasi_err)?
821 }
822 WasiSocketOption::DontRoute => {
823 socket.dontroute().map_err(net_error_into_wasi_err)?
824 }
825 _ => return Err(Errno::Inval),
826 },
827 InodeSocketKind::UdpSocket { socket, .. } => match option {
828 WasiSocketOption::Broadcast => {
829 socket.broadcast().map_err(net_error_into_wasi_err)?
830 }
831 WasiSocketOption::MulticastLoopV4 => socket
832 .multicast_loop_v4()
833 .map_err(net_error_into_wasi_err)?,
834 WasiSocketOption::MulticastLoopV6 => socket
835 .multicast_loop_v6()
836 .map_err(net_error_into_wasi_err)?,
837 _ => return Err(Errno::Inval),
838 },
839 _ => return Err(Errno::Notsup),
840 })
841 }
842
843 pub fn set_send_buf_size(&mut self, size: usize) -> Result<(), Errno> {
844 let mut inner = self.inner.protected.write().unwrap();
845 match &mut inner.kind {
846 InodeSocketKind::PreSocket { props, .. }
847 | InodeSocketKind::RemoteSocket { props, .. } => {
848 props.send_buf_size = Some(size);
849 }
850 InodeSocketKind::TcpStream { socket, .. } => {
851 socket
852 .set_send_buf_size(size)
853 .map_err(net_error_into_wasi_err)?;
854 }
855 _ => return Err(Errno::Notsup),
856 }
857 Ok(())
858 }
859
860 pub fn send_buf_size(&self) -> Result<usize, Errno> {
861 let inner = self.inner.protected.read().unwrap();
862 match &inner.kind {
863 InodeSocketKind::PreSocket { props, .. }
864 | InodeSocketKind::RemoteSocket { props, .. } => {
865 Ok(props.send_buf_size.unwrap_or_default())
866 }
867 InodeSocketKind::TcpStream { socket, .. } => {
868 socket.send_buf_size().map_err(net_error_into_wasi_err)
869 }
870 _ => Err(Errno::Notsup),
871 }
872 }
873
874 pub fn set_recv_buf_size(&mut self, size: usize) -> Result<(), Errno> {
875 let mut inner = self.inner.protected.write().unwrap();
876 match &mut inner.kind {
877 InodeSocketKind::PreSocket { props, .. }
878 | InodeSocketKind::RemoteSocket { props, .. } => {
879 props.recv_buf_size = Some(size);
880 }
881 InodeSocketKind::TcpStream { socket, .. } => {
882 socket
883 .set_recv_buf_size(size)
884 .map_err(net_error_into_wasi_err)?;
885 }
886 _ => return Err(Errno::Notsup),
887 }
888 Ok(())
889 }
890
891 pub fn recv_buf_size(&self) -> Result<usize, Errno> {
892 let inner = self.inner.protected.read().unwrap();
893 match &inner.kind {
894 InodeSocketKind::PreSocket { props, .. }
895 | InodeSocketKind::RemoteSocket { props, .. } => {
896 Ok(props.recv_buf_size.unwrap_or_default())
897 }
898 InodeSocketKind::TcpStream { socket, .. } => {
899 socket.recv_buf_size().map_err(net_error_into_wasi_err)
900 }
901 _ => Err(Errno::Notsup),
902 }
903 }
904
905 pub fn set_linger(&mut self, linger: Option<std::time::Duration>) -> Result<(), Errno> {
906 let mut inner = self.inner.protected.write().unwrap();
907 match &mut inner.kind {
908 InodeSocketKind::TcpStream { socket, .. } => {
909 socket.set_linger(linger).map_err(net_error_into_wasi_err)
910 }
911 InodeSocketKind::RemoteSocket { .. } => Ok(()),
912 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
913 _ => Err(Errno::Notsup),
914 }
915 }
916
917 pub fn linger(&self) -> Result<Option<std::time::Duration>, Errno> {
918 let inner = self.inner.protected.read().unwrap();
919 match &inner.kind {
920 InodeSocketKind::TcpStream { socket, .. } => {
921 socket.linger().map_err(net_error_into_wasi_err)
922 }
923 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
924 _ => Err(Errno::Notsup),
925 }
926 }
927
928 pub fn set_opt_time(
929 &self,
930 ty: TimeType,
931 timeout: Option<std::time::Duration>,
932 ) -> Result<(), Errno> {
933 let mut inner = self.inner.protected.write().unwrap();
934 match &mut inner.kind {
935 InodeSocketKind::TcpStream {
936 write_timeout,
937 read_timeout,
938 ..
939 } => {
940 match ty {
941 TimeType::WriteTimeout => *write_timeout = timeout,
942 TimeType::ReadTimeout => *read_timeout = timeout,
943 _ => return Err(Errno::Inval),
944 }
945 Ok(())
946 }
947 InodeSocketKind::TcpListener { accept_timeout, .. } => {
948 match ty {
949 TimeType::AcceptTimeout => *accept_timeout = timeout,
950 _ => return Err(Errno::Inval),
951 }
952 Ok(())
953 }
954 InodeSocketKind::PreSocket { props, .. }
955 | InodeSocketKind::RemoteSocket { props, .. } => {
956 match ty {
957 TimeType::ConnectTimeout => props.connect_timeout = timeout,
958 TimeType::AcceptTimeout => props.accept_timeout = timeout,
959 TimeType::ReadTimeout => props.read_timeout = timeout,
960 TimeType::WriteTimeout => props.write_timeout = timeout,
961 _ => return Err(Errno::Io),
962 }
963 Ok(())
964 }
965 _ => Err(Errno::Notsup),
966 }
967 }
968
969 pub fn opt_time(&self, ty: TimeType) -> Result<Option<std::time::Duration>, Errno> {
970 let inner = self.inner.protected.read().unwrap();
971 match &inner.kind {
972 InodeSocketKind::TcpStream {
973 read_timeout,
974 write_timeout,
975 ..
976 } => Ok(match ty {
977 TimeType::ReadTimeout => *read_timeout,
978 TimeType::WriteTimeout => *write_timeout,
979 _ => return Err(Errno::Inval),
980 }),
981 InodeSocketKind::TcpListener { accept_timeout, .. } => Ok(match ty {
982 TimeType::AcceptTimeout => *accept_timeout,
983 _ => return Err(Errno::Inval),
984 }),
985 InodeSocketKind::PreSocket { props, .. }
986 | InodeSocketKind::RemoteSocket { props, .. } => match ty {
987 TimeType::ConnectTimeout => Ok(props.connect_timeout),
988 TimeType::AcceptTimeout => Ok(props.accept_timeout),
989 TimeType::ReadTimeout => Ok(props.read_timeout),
990 TimeType::WriteTimeout => Ok(props.write_timeout),
991 _ => Err(Errno::Inval),
992 },
993 _ => Err(Errno::Notsup),
994 }
995 }
996
997 pub fn set_ttl(&self, ttl: u32) -> Result<(), Errno> {
998 let mut inner = self.inner.protected.write().unwrap();
999 match &mut inner.kind {
1000 InodeSocketKind::TcpStream { socket, .. } => {
1001 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1002 }
1003 InodeSocketKind::UdpSocket { socket, .. } => {
1004 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1005 }
1006 InodeSocketKind::RemoteSocket { ttl: set_ttl, .. } => {
1007 *set_ttl = ttl;
1008 Ok(())
1009 }
1010 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1011 _ => Err(Errno::Notsup),
1012 }
1013 }
1014
1015 pub fn ttl(&self) -> Result<u32, Errno> {
1016 let inner = self.inner.protected.read().unwrap();
1017 match &inner.kind {
1018 InodeSocketKind::TcpStream { socket, .. } => {
1019 socket.ttl().map_err(net_error_into_wasi_err)
1020 }
1021 InodeSocketKind::UdpSocket { socket, .. } => {
1022 socket.ttl().map_err(net_error_into_wasi_err)
1023 }
1024 InodeSocketKind::RemoteSocket { ttl, .. } => Ok(*ttl),
1025 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1026 _ => Err(Errno::Notsup),
1027 }
1028 }
1029
1030 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> Result<(), Errno> {
1031 let mut inner = self.inner.protected.write().unwrap();
1032 match &mut inner.kind {
1033 InodeSocketKind::UdpSocket { socket, .. } => socket
1034 .set_multicast_ttl_v4(ttl)
1035 .map_err(net_error_into_wasi_err),
1036 InodeSocketKind::RemoteSocket {
1037 multicast_ttl: set_ttl,
1038 ..
1039 } => {
1040 *set_ttl = ttl;
1041 Ok(())
1042 }
1043 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1044 _ => Err(Errno::Notsup),
1045 }
1046 }
1047
1048 pub fn multicast_ttl_v4(&self) -> Result<u32, Errno> {
1049 let inner = self.inner.protected.read().unwrap();
1050 match &inner.kind {
1051 InodeSocketKind::UdpSocket { socket, .. } => {
1052 socket.multicast_ttl_v4().map_err(net_error_into_wasi_err)
1053 }
1054 InodeSocketKind::RemoteSocket { multicast_ttl, .. } => Ok(*multicast_ttl),
1055 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1056 _ => Err(Errno::Notsup),
1057 }
1058 }
1059
1060 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1061 let mut inner = self.inner.protected.write().unwrap();
1062 match &mut inner.kind {
1063 InodeSocketKind::UdpSocket { socket, .. } => socket
1064 .join_multicast_v4(multiaddr, iface)
1065 .map_err(net_error_into_wasi_err),
1066 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1067 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1068 _ => Err(Errno::Notsup),
1069 }
1070 }
1071
1072 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1073 let mut inner = self.inner.protected.write().unwrap();
1074 match &mut inner.kind {
1075 InodeSocketKind::UdpSocket { socket, .. } => socket
1076 .leave_multicast_v4(multiaddr, iface)
1077 .map_err(net_error_into_wasi_err),
1078 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1079 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1080 _ => Err(Errno::Notsup),
1081 }
1082 }
1083
1084 pub fn join_multicast_v6(&self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1085 let mut inner = self.inner.protected.write().unwrap();
1086 match &mut inner.kind {
1087 InodeSocketKind::UdpSocket { socket, .. } => socket
1088 .join_multicast_v6(multiaddr, iface)
1089 .map_err(net_error_into_wasi_err),
1090 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1091 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1092 _ => Err(Errno::Notsup),
1093 }
1094 }
1095
1096 pub fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1097 let mut inner = self.inner.protected.write().unwrap();
1098 match &mut inner.kind {
1099 InodeSocketKind::UdpSocket { socket, .. } => socket
1100 .leave_multicast_v6(multiaddr, iface)
1101 .map_err(net_error_into_wasi_err),
1102 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1103 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1104 _ => Err(Errno::Notsup),
1105 }
1106 }
1107
1108 pub async fn send(
1109 &self,
1110 tasks: &dyn VirtualTaskManager,
1111 buf: &[u8],
1112 timeout: Option<Duration>,
1113 nonblocking: bool,
1114 ) -> Result<usize, Errno> {
1115 struct SocketSender<'a, 'b> {
1116 inner: &'a InodeSocketInner,
1117 data: &'b [u8],
1118 nonblocking: bool,
1119 handler_registered: bool,
1120 }
1121 impl Drop for SocketSender<'_, '_> {
1122 fn drop(&mut self) {
1123 if self.handler_registered {
1124 let mut inner = self.inner.protected.write().unwrap();
1125 inner.remove_handler();
1126 }
1127 }
1128 }
1129 impl Future for SocketSender<'_, '_> {
1130 type Output = Result<usize, Errno>;
1131 fn poll(
1132 mut self: Pin<&mut Self>,
1133 cx: &mut std::task::Context<'_>,
1134 ) -> Poll<Self::Output> {
1135 loop {
1136 let mut inner = self.inner.protected.write().unwrap();
1137 let res = match &mut inner.kind {
1138 InodeSocketKind::Raw(socket) => socket.try_send(self.data),
1139 InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1140 InodeSocketKind::UdpSocket { socket, peer } => {
1141 if let Some(peer) = peer {
1142 socket.try_send_to(self.data, *peer)
1143 } else {
1144 Err(NetworkError::NotConnected)
1145 }
1146 }
1147 InodeSocketKind::PreSocket { .. } => {
1148 return Poll::Ready(Err(Errno::Notconn));
1149 }
1150 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1151 return match is_dead {
1152 true => Poll::Ready(Err(Errno::Connreset)),
1153 false => Poll::Ready(Ok(self.data.len())),
1154 };
1155 }
1156 _ => return Poll::Ready(Err(Errno::Notsup)),
1157 };
1158 return match res {
1159 Ok(amt) => Poll::Ready(Ok(amt)),
1160 Err(NetworkError::WouldBlock) if self.nonblocking => {
1161 Poll::Ready(Err(Errno::Again))
1162 }
1163 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1164 inner
1165 .set_handler(cx.waker().into())
1166 .map_err(net_error_into_wasi_err)?;
1167 drop(inner);
1168 self.handler_registered = true;
1169 continue;
1170 }
1171 Err(NetworkError::WouldBlock) => Poll::Pending,
1172 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1173 };
1174 }
1175 }
1176 }
1177
1178 let poller = SocketSender {
1179 inner: &self.inner,
1180 data: buf,
1181 nonblocking,
1182 handler_registered: false,
1183 };
1184 if let Some(timeout) = timeout {
1185 tokio::select! {
1186 res = poller => res,
1187 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1188 }
1189 } else {
1190 poller.await
1191 }
1192 }
1193
1194 pub async fn send_to<M: MemorySize>(
1195 &self,
1196 tasks: &dyn VirtualTaskManager,
1197 buf: &[u8],
1198 addr: SocketAddr,
1199 timeout: Option<Duration>,
1200 nonblocking: bool,
1201 ) -> Result<usize, Errno> {
1202 struct SocketSender<'a, 'b> {
1203 inner: &'a InodeSocketInner,
1204 data: &'b [u8],
1205 addr: SocketAddr,
1206 nonblocking: bool,
1207 handler_registered: bool,
1208 }
1209 impl Drop for SocketSender<'_, '_> {
1210 fn drop(&mut self) {
1211 if self.handler_registered {
1212 let mut inner = self.inner.protected.write().unwrap();
1213 inner.remove_handler();
1214 }
1215 }
1216 }
1217 impl Future for SocketSender<'_, '_> {
1218 type Output = Result<usize, Errno>;
1219 fn poll(
1220 mut self: Pin<&mut Self>,
1221 cx: &mut std::task::Context<'_>,
1222 ) -> Poll<Self::Output> {
1223 loop {
1224 let mut inner = self.inner.protected.write().unwrap();
1225 let res = match &mut inner.kind {
1226 InodeSocketKind::Icmp(socket) => socket.try_send_to(self.data, self.addr),
1227 InodeSocketKind::UdpSocket { socket, .. } => {
1228 socket.try_send_to(self.data, self.addr)
1229 }
1230 InodeSocketKind::PreSocket { .. } => {
1231 return Poll::Ready(Err(Errno::Notconn));
1232 }
1233 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1234 return match is_dead {
1235 true => Poll::Ready(Err(Errno::Connreset)),
1236 false => Poll::Ready(Ok(self.data.len())),
1237 };
1238 }
1239 _ => return Poll::Ready(Err(Errno::Notsup)),
1240 };
1241 return match res {
1242 Ok(amt) => Poll::Ready(Ok(amt)),
1243 Err(NetworkError::WouldBlock) if self.nonblocking => {
1244 Poll::Ready(Err(Errno::Again))
1245 }
1246 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1247 inner
1248 .set_handler(cx.waker().into())
1249 .map_err(net_error_into_wasi_err)?;
1250 self.handler_registered = true;
1251 drop(inner);
1252 continue;
1253 }
1254 Err(NetworkError::WouldBlock) => Poll::Pending,
1255 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1256 };
1257 }
1258 }
1259 }
1260
1261 let poller = SocketSender {
1262 inner: &self.inner,
1263 data: buf,
1264 addr,
1265 nonblocking,
1266 handler_registered: false,
1267 };
1268 if let Some(timeout) = timeout {
1269 tokio::select! {
1270 res = poller => res,
1271 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1272 }
1273 } else {
1274 poller.await
1275 }
1276 }
1277
1278 pub async fn recv(
1279 &self,
1280 tasks: &dyn VirtualTaskManager,
1281 buf: &mut [MaybeUninit<u8>],
1282 timeout: Option<Duration>,
1283 nonblocking: bool,
1284 peek: bool,
1285 ) -> Result<usize, Errno> {
1286 struct SocketReceiver<'a, 'b> {
1287 inner: &'a InodeSocketInner,
1288 data: &'b mut [MaybeUninit<u8>],
1289 nonblocking: bool,
1290 peek: bool,
1291 handler_registered: bool,
1292 }
1293 impl Drop for SocketReceiver<'_, '_> {
1294 fn drop(&mut self) {
1295 if self.handler_registered {
1296 let mut inner = self.inner.protected.write().unwrap();
1297 inner.remove_handler();
1298 }
1299 }
1300 }
1301 impl Future for SocketReceiver<'_, '_> {
1302 type Output = Result<usize, Errno>;
1303 fn poll(
1304 mut self: Pin<&mut Self>,
1305 cx: &mut std::task::Context<'_>,
1306 ) -> Poll<Self::Output> {
1307 loop {
1308 let peek = self.peek;
1309 let mut inner = self.inner.protected.write().unwrap();
1310 let res = match &mut inner.kind {
1311 InodeSocketKind::Raw(socket) => socket.try_recv(self.data, peek),
1312 InodeSocketKind::TcpStream { socket, .. } => {
1313 socket.try_recv(self.data, peek)
1314 }
1315 InodeSocketKind::UdpSocket { socket, peer } => {
1316 if let Some(peer) = peer {
1317 match socket.try_recv_from(self.data, peek) {
1318 Ok((amt, addr)) if addr == *peer => Ok(amt),
1319 Ok(_) => Err(NetworkError::WouldBlock),
1320 Err(err) => Err(err),
1321 }
1322 } else {
1323 match socket.try_recv_from(self.data, peek) {
1324 Ok((amt, _)) => Ok(amt),
1325 Err(err) => Err(err),
1326 }
1327 }
1328 }
1329 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1330 return match is_dead {
1331 true => Poll::Ready(Ok(0)),
1332 false => Poll::Pending,
1333 };
1334 }
1335 InodeSocketKind::PreSocket { .. } => {
1336 return Poll::Ready(Err(Errno::Notconn));
1337 }
1338 _ => return Poll::Ready(Err(Errno::Notsup)),
1339 };
1340 return match res {
1341 Ok(amt) => Poll::Ready(Ok(amt)),
1342 Err(NetworkError::WouldBlock) if self.nonblocking => {
1343 Poll::Ready(Err(Errno::Again))
1344 }
1345 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1346 inner
1347 .set_handler(cx.waker().into())
1348 .map_err(net_error_into_wasi_err)?;
1349 self.handler_registered = true;
1350 drop(inner);
1351 continue;
1352 }
1353
1354 Err(NetworkError::WouldBlock) => Poll::Pending,
1355 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1356 };
1357 }
1358 }
1359 }
1360
1361 let poller = SocketReceiver {
1362 inner: &self.inner,
1363 data: buf,
1364 nonblocking,
1365 peek,
1366 handler_registered: false,
1367 };
1368 if let Some(timeout) = timeout {
1369 tokio::select! {
1370 res = poller => res,
1371 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1372 }
1373 } else {
1374 poller.await
1375 }
1376 }
1377
1378 pub async fn recv_from(
1379 &self,
1380 tasks: &dyn VirtualTaskManager,
1381 buf: &mut [MaybeUninit<u8>],
1382 timeout: Option<Duration>,
1383 nonblocking: bool,
1384 peek: bool,
1385 ) -> Result<(usize, SocketAddr), Errno> {
1386 struct SocketReceiver<'a, 'b> {
1387 inner: &'a InodeSocketInner,
1388 data: &'b mut [MaybeUninit<u8>],
1389 nonblocking: bool,
1390 peek: bool,
1391 handler_registered: bool,
1392 }
1393 impl Drop for SocketReceiver<'_, '_> {
1394 fn drop(&mut self) {
1395 if self.handler_registered {
1396 let mut inner = self.inner.protected.write().unwrap();
1397 inner.remove_handler();
1398 }
1399 }
1400 }
1401 impl Future for SocketReceiver<'_, '_> {
1402 type Output = Result<(usize, SocketAddr), Errno>;
1403 fn poll(
1404 mut self: Pin<&mut Self>,
1405 cx: &mut std::task::Context<'_>,
1406 ) -> Poll<Self::Output> {
1407 let peek = self.peek;
1408 let mut inner = self.inner.protected.write().unwrap();
1409 loop {
1410 let res = match &mut inner.kind {
1411 InodeSocketKind::Icmp(socket) => socket.try_recv_from(self.data, peek),
1412 InodeSocketKind::UdpSocket { socket, .. } => {
1413 socket.try_recv_from(self.data, peek)
1414 }
1415 InodeSocketKind::RemoteSocket {
1416 is_dead, peer_addr, ..
1417 } => {
1418 return match is_dead {
1419 true => Poll::Ready(Ok((0, *peer_addr))),
1420 false => Poll::Pending,
1421 };
1422 }
1423 InodeSocketKind::PreSocket { .. } => {
1424 return Poll::Ready(Err(Errno::Notconn));
1425 }
1426 _ => return Poll::Ready(Err(Errno::Notsup)),
1427 };
1428 return match res {
1429 Ok((amt, addr)) => Poll::Ready(Ok((amt, addr))),
1430 Err(NetworkError::WouldBlock) if self.nonblocking => {
1431 Poll::Ready(Err(Errno::Again))
1432 }
1433 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1434 inner
1435 .set_handler(cx.waker().into())
1436 .map_err(net_error_into_wasi_err)?;
1437 self.handler_registered = true;
1438 continue;
1439 }
1440 Err(NetworkError::WouldBlock) => Poll::Pending,
1441 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1442 };
1443 }
1444 }
1445 }
1446
1447 let poller = SocketReceiver {
1448 inner: &self.inner,
1449 data: buf,
1450 nonblocking,
1451 peek,
1452 handler_registered: false,
1453 };
1454 if let Some(timeout) = timeout {
1455 tokio::select! {
1456 res = poller => res,
1457 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1458 }
1459 } else {
1460 poller.await
1461 }
1462 }
1463
1464 pub fn shutdown(&mut self, how: std::net::Shutdown) -> Result<(), Errno> {
1465 let mut inner = self.inner.protected.write().unwrap();
1466 match &mut inner.kind {
1467 InodeSocketKind::TcpStream { socket, .. } => {
1468 socket.shutdown(how).map_err(net_error_into_wasi_err)?;
1469 }
1470 InodeSocketKind::RemoteSocket { .. } => return Ok(()),
1471 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
1472 _ => return Err(Errno::Notsup),
1473 }
1474 Ok(())
1475 }
1476
1477 pub async fn can_write(&self) -> bool {
1478 if let Ok(mut guard) = self.inner.protected.try_write() {
1479 #[allow(clippy::match_like_matches_macro)]
1480 match &mut guard.kind {
1481 InodeSocketKind::TcpStream { .. }
1482 | InodeSocketKind::UdpSocket { .. }
1483 | InodeSocketKind::Raw(..) => true,
1484 InodeSocketKind::RemoteSocket { is_dead, .. } => !(*is_dead),
1485 _ => false,
1486 }
1487 } else {
1488 false
1489 }
1490 }
1491}
1492
1493impl InodeSocketProtected {
1494 pub fn remove_handler(&mut self) {
1495 match &mut self.kind {
1496 InodeSocketKind::TcpListener { socket, .. } => socket.remove_handler(),
1497 InodeSocketKind::TcpStream { socket, .. } => socket.remove_handler(),
1498 InodeSocketKind::UdpSocket { socket, .. } => socket.remove_handler(),
1499 InodeSocketKind::Raw(socket) => socket.remove_handler(),
1500 InodeSocketKind::Icmp(socket) => socket.remove_handler(),
1501 InodeSocketKind::PreSocket { props, .. } => {
1502 props.handler.take();
1503 }
1504 InodeSocketKind::RemoteSocket { props, .. } => {
1505 props.handler.take();
1506 }
1507 }
1508 }
1509
1510 pub fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1511 match &mut self.kind {
1512 InodeSocketKind::TcpListener { socket, .. } => socket.poll_read_ready(cx),
1513 InodeSocketKind::TcpStream { socket, .. } => socket.poll_read_ready(cx),
1514 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_read_ready(cx),
1515 InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
1516 InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
1517 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1518 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1519 true => Poll::Ready(Ok(0)),
1520 false => Poll::Pending,
1521 },
1522 }
1523 .map_err(net_error_into_io_err)
1524 }
1525
1526 pub fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1527 match &mut self.kind {
1528 InodeSocketKind::TcpListener { socket, .. } => socket.poll_write_ready(cx),
1529 InodeSocketKind::TcpStream { socket, .. } => socket.poll_write_ready(cx),
1530 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_write_ready(cx),
1531 InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
1532 InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
1533 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1534 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1535 true => Poll::Ready(Ok(0)),
1536 false => Poll::Pending,
1537 },
1538 }
1539 .map_err(net_error_into_io_err)
1540 }
1541
1542 pub fn set_handler(
1543 &mut self,
1544 handler: Box<dyn InterestHandler + Send + Sync>,
1545 ) -> virtual_net::Result<()> {
1546 match &mut self.kind {
1547 InodeSocketKind::TcpListener { socket, .. } => socket.set_handler(handler),
1548 InodeSocketKind::TcpStream { socket, .. } => socket.set_handler(handler),
1549 InodeSocketKind::UdpSocket { socket, .. } => socket.set_handler(handler),
1550 InodeSocketKind::Raw(socket) => socket.set_handler(handler),
1551 InodeSocketKind::Icmp(socket) => socket.set_handler(handler),
1552 InodeSocketKind::PreSocket { props, .. }
1553 | InodeSocketKind::RemoteSocket { props, .. } => {
1554 props.handler.replace(handler);
1555 Ok(())
1556 }
1557 }
1558 }
1559}
1560
1561#[allow(dead_code)]
1563pub(crate) fn all_socket_rights() -> Rights {
1564 Rights::FD_FDSTAT_SET_FLAGS
1565 .union(Rights::FD_FILESTAT_GET)
1566 .union(Rights::FD_READ)
1567 .union(Rights::FD_WRITE)
1568 .union(Rights::POLL_FD_READWRITE)
1569 .union(Rights::SOCK_SHUTDOWN)
1570 .union(Rights::SOCK_CONNECT)
1571 .union(Rights::SOCK_LISTEN)
1572 .union(Rights::SOCK_BIND)
1573 .union(Rights::SOCK_ACCEPT)
1574 .union(Rights::SOCK_RECV)
1575 .union(Rights::SOCK_SEND)
1576 .union(Rights::SOCK_ADDR_LOCAL)
1577 .union(Rights::SOCK_ADDR_REMOTE)
1578 .union(Rights::SOCK_RECV_FROM)
1579 .union(Rights::SOCK_SEND_TO)
1580}