1use std::{
2 future::Future,
3 io,
4 mem::MaybeUninit,
5 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
6 pin::Pin,
7 sync::{Arc, RwLock},
8 task::{Context, Poll},
9 time::Duration,
10};
11
12#[cfg(feature = "enable-serde")]
13use serde_derive::{Deserialize, Serialize};
14use virtual_mio::InterestHandler;
15use virtual_net::{
16 NetworkError, VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualTcpBoundSocket,
17 VirtualTcpListener, VirtualTcpSocket, VirtualUdpSocket, net_error_into_io_err,
18};
19use wasmer_types::MemorySize;
20use wasmer_wasix_types::wasi::{Addressfamily, Errno, Rights, SockProto, Sockoption, Socktype};
21
22use crate::{VirtualTaskManager, net::net_error_into_wasi_err};
23
24#[derive(Debug)]
25#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
26pub enum InodeHttpSocketType {
27 Request,
29 Response,
31 Headers,
33}
34
35type TcpConnectFuture<'a> =
36 Pin<Box<dyn Future<Output = Result<Box<dyn VirtualTcpSocket + Sync>, Errno>> + 'a>>;
37
38#[derive(Debug)]
39pub struct SocketProperties {
40 pub family: Addressfamily,
41 pub ty: Socktype,
42 pub pt: SockProto,
43 pub only_v6: bool,
44 pub reuse_port: bool,
45 pub reuse_addr: bool,
46 pub no_delay: Option<bool>,
47 pub keep_alive: Option<bool>,
48 pub dont_route: Option<bool>,
49 pub send_buf_size: Option<usize>,
50 pub recv_buf_size: Option<usize>,
51 pub write_timeout: Option<Duration>,
52 pub read_timeout: Option<Duration>,
53 pub accept_timeout: Option<Duration>,
54 pub connect_timeout: Option<Duration>,
55 pub handler: Option<Box<dyn InterestHandler + Send + Sync>>,
56}
57
58impl SocketProperties {
59 fn snapshot_for_bound(&self) -> Self {
60 Self {
61 family: self.family,
62 ty: self.ty,
63 pt: self.pt,
64 only_v6: self.only_v6,
65 reuse_port: self.reuse_port,
66 reuse_addr: self.reuse_addr,
67 no_delay: self.no_delay,
68 keep_alive: self.keep_alive,
69 dont_route: self.dont_route,
70 send_buf_size: self.send_buf_size,
71 recv_buf_size: self.recv_buf_size,
72 write_timeout: self.write_timeout,
73 read_timeout: self.read_timeout,
74 accept_timeout: self.accept_timeout,
75 connect_timeout: self.connect_timeout,
76 handler: None,
77 }
78 }
79}
80
81#[derive(Debug)]
82pub enum InodeSocketKind {
84 PreSocket {
85 props: SocketProperties,
86 addr: Option<SocketAddr>,
87 },
88 Icmp(Box<dyn VirtualIcmpSocket + Sync>),
89 Raw(Box<dyn VirtualRawSocket + Sync>),
90 TcpListener {
91 socket: Box<dyn VirtualTcpListener + Sync>,
92 accept_timeout: Option<Duration>,
93 },
94 TcpStream {
95 socket: Box<dyn VirtualTcpSocket + Sync>,
96 write_timeout: Option<Duration>,
97 read_timeout: Option<Duration>,
98 },
99 BoundTcp {
100 socket: Box<dyn VirtualTcpBoundSocket + Sync>,
101 props: SocketProperties,
102 },
103 UdpSocket {
104 socket: Box<dyn VirtualUdpSocket + Sync>,
105 peer: Option<SocketAddr>,
106 },
107 RemoteSocket {
108 props: SocketProperties,
109 local_addr: SocketAddr,
110 peer_addr: SocketAddr,
111 ttl: u32,
112 multicast_ttl: u32,
113 is_dead: bool,
114 },
115}
116
117pub enum WasiSocketOption {
118 Noop,
119 ReusePort,
120 ReuseAddr,
121 NoDelay,
122 DontRoute,
123 OnlyV6,
124 Broadcast,
125 MulticastLoopV4,
126 MulticastLoopV6,
127 Promiscuous,
128 Listening,
129 LastError,
130 KeepAlive,
131 Linger,
132 OobInline,
133 RecvBufSize,
134 SendBufSize,
135 RecvLowat,
136 SendLowat,
137 RecvTimeout,
138 SendTimeout,
139 ConnectTimeout,
140 AcceptTimeout,
141 Ttl,
142 MulticastTtlV4,
143 Type,
144 Proto,
145}
146
147impl TryFrom<Sockoption> for WasiSocketOption {
148 type Error = Errno;
149
150 fn try_from(opt: Sockoption) -> Result<Self, Self::Error> {
151 use WasiSocketOption::*;
152 match opt {
153 Sockoption::Noop => Ok(Noop),
154 Sockoption::ReusePort => Ok(ReusePort),
155 Sockoption::ReuseAddr => Ok(ReuseAddr),
156 Sockoption::NoDelay => Ok(NoDelay),
157 Sockoption::DontRoute => Ok(DontRoute),
158 Sockoption::OnlyV6 => Ok(OnlyV6),
159 Sockoption::Broadcast => Ok(Broadcast),
160 Sockoption::MulticastLoopV4 => Ok(MulticastLoopV4),
161 Sockoption::MulticastLoopV6 => Ok(MulticastLoopV6),
162 Sockoption::Promiscuous => Ok(Promiscuous),
163 Sockoption::Listening => Ok(Listening),
164 Sockoption::LastError => Ok(LastError),
165 Sockoption::KeepAlive => Ok(KeepAlive),
166 Sockoption::Linger => Ok(Linger),
167 Sockoption::OobInline => Ok(OobInline),
168 Sockoption::RecvBufSize => Ok(RecvBufSize),
169 Sockoption::SendBufSize => Ok(SendBufSize),
170 Sockoption::RecvLowat => Ok(RecvLowat),
171 Sockoption::SendLowat => Ok(SendLowat),
172 Sockoption::RecvTimeout => Ok(RecvTimeout),
173 Sockoption::SendTimeout => Ok(SendTimeout),
174 Sockoption::ConnectTimeout => Ok(ConnectTimeout),
175 Sockoption::AcceptTimeout => Ok(AcceptTimeout),
176 Sockoption::Ttl => Ok(Ttl),
177 Sockoption::MulticastTtlV4 => Ok(MulticastTtlV4),
178 Sockoption::Type => Ok(Type),
179 Sockoption::Proto => Ok(Proto),
180 _ => Err(Errno::Inval),
181 }
182 }
183}
184
185#[derive(Debug)]
186pub enum WasiSocketStatus {
187 Opening,
188 Opened,
189 Closed,
190 Failed,
191}
192
193#[derive(Debug, Copy, Clone, PartialEq, Eq)]
194#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
195pub enum TimeType {
196 ReadTimeout,
197 WriteTimeout,
198 AcceptTimeout,
199 ConnectTimeout,
200 BindTimeout,
201 Linger,
202}
203
204impl From<TimeType> for wasmer_journal::SocketOptTimeType {
205 fn from(value: TimeType) -> Self {
206 match value {
207 TimeType::ReadTimeout => Self::ReadTimeout,
208 TimeType::WriteTimeout => Self::WriteTimeout,
209 TimeType::AcceptTimeout => Self::AcceptTimeout,
210 TimeType::ConnectTimeout => Self::ConnectTimeout,
211 TimeType::BindTimeout => Self::BindTimeout,
212 TimeType::Linger => Self::Linger,
213 }
214 }
215}
216
217impl From<wasmer_journal::SocketOptTimeType> for TimeType {
218 fn from(value: wasmer_journal::SocketOptTimeType) -> Self {
219 use wasmer_journal::SocketOptTimeType;
220 match value {
221 SocketOptTimeType::ReadTimeout => TimeType::ReadTimeout,
222 SocketOptTimeType::WriteTimeout => TimeType::WriteTimeout,
223 SocketOptTimeType::AcceptTimeout => TimeType::AcceptTimeout,
224 SocketOptTimeType::ConnectTimeout => TimeType::ConnectTimeout,
225 SocketOptTimeType::BindTimeout => TimeType::BindTimeout,
226 SocketOptTimeType::Linger => TimeType::Linger,
227 }
228 }
229}
230
231#[derive(Debug)]
232pub(crate) struct InodeSocketProtected {
234 pub kind: InodeSocketKind,
235}
236
237#[derive(Debug)]
238pub(crate) struct InodeSocketInner {
240 pub protected: RwLock<InodeSocketProtected>,
241}
242
243#[derive(Debug, Clone)]
244pub struct InodeSocket {
246 pub(crate) inner: Arc<InodeSocketInner>,
247}
248
249impl InodeSocket {
250 pub fn new(kind: InodeSocketKind) -> Self {
251 let protected = InodeSocketProtected { kind };
252 Self {
253 inner: Arc::new(InodeSocketInner {
254 protected: RwLock::new(protected),
255 }),
256 }
257 }
258
259 pub fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
260 let mut inner = self.inner.protected.write().unwrap();
261 inner.poll_read_ready(cx)
262 }
263
264 pub fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
265 let mut inner = self.inner.protected.write().unwrap();
266 inner.poll_write_ready(cx)
267 }
268
269 pub async fn auto_bind_udp(
272 &self,
273 tasks: &dyn VirtualTaskManager,
274 net: &dyn VirtualNetworking,
275 ) -> Result<Option<InodeSocket>, Errno> {
276 let timeout = self
277 .opt_time(TimeType::BindTimeout)
278 .ok()
279 .flatten()
280 .unwrap_or(Duration::from_secs(30));
281 let family = {
282 let inner = self.inner.protected.read().unwrap();
283 match &inner.kind {
284 InodeSocketKind::PreSocket { props, .. } if props.ty == Socktype::Dgram => {
285 Some(props.family)
286 }
287 _ => None,
288 }
289 };
290 let Some(family) = family else {
291 return Ok(None);
292 };
293
294 let addr = match family {
295 Addressfamily::Inet4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
296 Addressfamily::Inet6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
297 _ => return Err(Errno::Notsup),
298 };
299
300 self.bind_internal(tasks, net, addr, timeout).await
301 }
302
303 pub async fn bind(
304 &self,
305 tasks: &dyn VirtualTaskManager,
306 net: &dyn VirtualNetworking,
307 set_addr: SocketAddr,
308 ) -> Result<Option<InodeSocket>, Errno> {
309 let timeout = self
310 .opt_time(TimeType::BindTimeout)
311 .ok()
312 .flatten()
313 .unwrap_or(Duration::from_secs(30));
314 self.bind_internal(tasks, net, set_addr, timeout).await
315 }
316
317 async fn bind_internal(
318 &self,
319 tasks: &dyn VirtualTaskManager,
320 net: &dyn VirtualNetworking,
321 set_addr: SocketAddr,
322 timeout: Duration,
323 ) -> Result<Option<InodeSocket>, Errno> {
324 enum PendingBind {
325 Tcp {
326 addr: SocketAddr,
327 props: SocketProperties,
328 },
329 Udp {
330 addr: SocketAddr,
331 reuse_port: bool,
332 reuse_addr: bool,
333 },
334 }
335
336 let bind = {
337 let mut inner = self.inner.protected.write().unwrap();
338 match &mut inner.kind {
339 InodeSocketKind::PreSocket { props, addr, .. } => {
340 match props.family {
341 Addressfamily::Inet4 => {
342 if !set_addr.is_ipv4() {
343 tracing::debug!(
344 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
345 );
346 return Err(Errno::Inval);
347 }
348 }
349 Addressfamily::Inet6 => {
350 if !set_addr.is_ipv6() {
351 tracing::debug!(
352 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
353 );
354 return Err(Errno::Inval);
355 }
356 }
357 _ => {
358 return Err(Errno::Notsup);
359 }
360 }
361
362 addr.replace(set_addr);
363 let addr = (*addr).unwrap();
364
365 match props.ty {
366 Socktype::Stream => PendingBind::Tcp {
367 addr,
368 props: props.snapshot_for_bound(),
369 },
370 Socktype::Dgram => PendingBind::Udp {
371 addr,
372 reuse_port: props.reuse_port,
373 reuse_addr: props.reuse_addr,
374 },
375 _ => return Err(Errno::Inval),
376 }
377 }
378 InodeSocketKind::RemoteSocket {
379 props,
380 local_addr: addr,
381 ..
382 } => {
383 match props.family {
384 Addressfamily::Inet4 => {
385 if !set_addr.is_ipv4() {
386 tracing::debug!(
387 "IP address is the wrong type IPv4 ({set_addr}) vs IPv6 family"
388 );
389 return Err(Errno::Inval);
390 }
391 }
392 Addressfamily::Inet6 => {
393 if !set_addr.is_ipv6() {
394 tracing::debug!(
395 "IP address is the wrong type IPv6 ({set_addr}) vs IPv4 family"
396 );
397 return Err(Errno::Inval);
398 }
399 }
400 _ => {
401 return Err(Errno::Notsup);
402 }
403 }
404
405 *addr = set_addr;
406 let addr = *addr;
407
408 match props.ty {
409 Socktype::Stream => {
410 return Ok(None);
413 }
414 Socktype::Dgram => PendingBind::Udp {
415 addr,
416 reuse_port: props.reuse_port,
417 reuse_addr: props.reuse_addr,
418 },
419 _ => return Err(Errno::Inval),
420 }
421 }
422 InodeSocketKind::BoundTcp { .. } => return Err(Errno::Inval),
423 _ => return Err(Errno::Notsup),
424 }
425 };
426
427 match bind {
428 PendingBind::Tcp { addr, mut props } => {
429 tokio::select! {
430 socket = net.bind_tcp(
431 addr,
432 props.only_v6,
433 props.reuse_port,
434 props.reuse_addr,
435 ) => {
436 match socket {
437 Ok(socket) => {
438 props.handler = {
439 let mut inner = self.inner.protected.write().unwrap();
440 match &mut inner.kind {
441 InodeSocketKind::PreSocket { props: live_props, .. } => {
442 live_props.handler.take()
443 }
444 _ => return Err(Errno::Inval),
445 }
446 };
447 Ok(Some(InodeSocket::new(InodeSocketKind::BoundTcp { socket, props })))
448 }
449 Err(NetworkError::Unsupported) => {
450 tracing::debug!(
451 "bind_tcp unsupported by networking backend; leaving socket unbound"
452 );
453 let mut inner = self.inner.protected.write().unwrap();
454 if let InodeSocketKind::PreSocket { addr, .. } = &mut inner.kind {
455 addr.take();
456 }
457 Err(Errno::Notsup)
458 }
459 Err(err) => {
460 let mut inner = self.inner.protected.write().unwrap();
464 if let InodeSocketKind::PreSocket { addr, .. } = &mut inner.kind {
465 addr.take();
466 }
467 Err(net_error_into_wasi_err(err))
468 }
469 }
470 },
471 _ = tasks.sleep_now(timeout) => {
472 let mut inner = self.inner.protected.write().unwrap();
474 if let InodeSocketKind::PreSocket { addr, .. } = &mut inner.kind {
475 addr.take();
476 }
477 Err(Errno::Timedout)
478 }
479 }
480 }
481 PendingBind::Udp {
482 addr,
483 reuse_port,
484 reuse_addr,
485 } => {
486 tokio::select! {
487 socket = net.bind_udp(addr, reuse_port, reuse_addr) => {
488 match socket {
489 Ok(socket) => Ok(Some(InodeSocket::new(InodeSocketKind::UdpSocket {
490 socket,
491 peer: None,
492 }))),
493 Err(err) => {
494 let mut inner = self.inner.protected.write().unwrap();
495 if let InodeSocketKind::PreSocket { addr, .. } = &mut inner.kind {
496 addr.take();
497 }
498 Err(net_error_into_wasi_err(err))
499 }
500 }
501 },
502 _ = tasks.sleep_now(timeout) => {
503 let mut inner = self.inner.protected.write().unwrap();
504 if let InodeSocketKind::PreSocket { addr, .. } = &mut inner.kind {
505 addr.take();
506 }
507 Err(Errno::Timedout)
508 }
509 }
510 }
511 }
512 }
513
514 pub async fn listen(
515 &self,
516 tasks: &dyn VirtualTaskManager,
517 net: &dyn VirtualNetworking,
518 _backlog: usize,
519 ) -> Result<Option<InodeSocket>, Errno> {
520 let timeout = self
521 .opt_time(TimeType::AcceptTimeout)
522 .ok()
523 .flatten()
524 .unwrap_or(Duration::from_secs(30));
525
526 let socket = {
527 let mut inner = self.inner.protected.write().unwrap();
528 match &mut inner.kind {
529 InodeSocketKind::PreSocket { props, addr, .. } => match props.ty {
530 Socktype::Stream => {
531 if addr.is_none() {
532 tracing::warn!("wasi[?]::sock_listen - failed - address not set");
533 return Err(Errno::Inval);
534 }
535 let addr = *addr.as_ref().unwrap();
536 let only_v6 = props.only_v6;
537 let reuse_port = props.reuse_port;
538 let reuse_addr = props.reuse_addr;
539 drop(inner);
540
541 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
542 }
543 ty => {
544 tracing::warn!(
545 "wasi[?]::sock_listen - failed - not supported(pre-socket:{:?})",
546 ty
547 );
548 return Err(Errno::Notsup);
549 }
550 },
551 InodeSocketKind::RemoteSocket {
552 props,
553 local_addr: addr,
554 ..
555 } => match props.ty {
556 Socktype::Stream => {
557 let addr = *addr;
558 let only_v6 = props.only_v6;
559 let reuse_port = props.reuse_port;
560 let reuse_addr = props.reuse_addr;
561 drop(inner);
562
563 net.listen_tcp(addr, only_v6, reuse_port, reuse_addr)
564 }
565 ty => {
566 tracing::warn!(
567 "wasi[?]::sock_listen - failed - not supported(remote-socket:{:?})",
568 ty
569 );
570 return Err(Errno::Notsup);
571 }
572 },
573 InodeSocketKind::BoundTcp { socket, .. } => {
574 return Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
575 socket: socket.listen().map_err(net_error_into_wasi_err)?,
576 accept_timeout: Some(timeout),
577 })));
578 }
579 InodeSocketKind::Icmp(_) => {
580 tracing::warn!("wasi[?]::sock_listen - failed - not supported(icmp)");
581 return Err(Errno::Notsup);
582 }
583 InodeSocketKind::Raw(_) => {
584 tracing::warn!("wasi[?]::sock_listen - failed - not supported(raw)");
585 return Err(Errno::Notsup);
586 }
587 InodeSocketKind::TcpListener { .. } => {
588 tracing::warn!(
589 "wasi[?]::sock_listen - failed - already listening (tcp-listener)"
590 );
591 return Err(Errno::Notsup);
592 }
593 InodeSocketKind::TcpStream { .. } => {
594 tracing::warn!("wasi[?]::sock_listen - failed - not supported(tcp-stream)");
595 return Err(Errno::Notsup);
596 }
597 InodeSocketKind::UdpSocket { .. } => {
598 tracing::warn!("wasi[?]::sock_listen - failed - not supported(udp-socket)");
599 return Err(Errno::Notsup);
600 }
601 }
602 };
603
604 tokio::select! {
605 socket = socket => {
606 let socket = socket.map_err(net_error_into_wasi_err)?;
607 Ok(Some(InodeSocket::new(InodeSocketKind::TcpListener {
608 socket,
609 accept_timeout: Some(timeout),
610 })))
611 },
612 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
613 }
614 }
615
616 pub async fn accept(
617 &self,
618 tasks: &dyn VirtualTaskManager,
619 nonblocking: bool,
620 timeout: Option<Duration>,
621 ) -> Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno> {
622 struct SocketAccepter<'a> {
623 sock: &'a InodeSocket,
624 nonblocking: bool,
625 handler_registered: bool,
626 }
627 impl Drop for SocketAccepter<'_> {
628 fn drop(&mut self) {
629 if self.handler_registered {
630 let mut inner = self.sock.inner.protected.write().unwrap();
631 inner.remove_handler();
632 }
633 }
634 }
635 impl Future for SocketAccepter<'_> {
636 type Output = Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr), Errno>;
637 fn poll(
638 mut self: Pin<&mut Self>,
639 cx: &mut std::task::Context<'_>,
640 ) -> std::task::Poll<Self::Output> {
641 loop {
642 let mut inner = self.sock.inner.protected.write().unwrap();
643 return match &mut inner.kind {
644 InodeSocketKind::TcpListener { socket, .. } => match socket.try_accept() {
645 Ok((child, addr)) => Poll::Ready(Ok((child, addr))),
646 Err(NetworkError::WouldBlock) if self.nonblocking => {
647 Poll::Ready(Err(Errno::Again))
648 }
649 Err(NetworkError::WouldBlock) if !self.handler_registered => {
650 let res = socket.set_handler(cx.waker().into());
651 if let Err(err) = res {
652 return Poll::Ready(Err(net_error_into_wasi_err(err)));
653 }
654 drop(inner);
655 self.handler_registered = true;
656 continue;
657 }
658 Err(NetworkError::WouldBlock) => Poll::Pending,
659 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
660 },
661 InodeSocketKind::PreSocket { .. } => Poll::Ready(Err(Errno::Notconn)),
662 _ => Poll::Ready(Err(Errno::Notsup)),
663 };
664 }
665 }
666 }
667
668 let acceptor = SocketAccepter {
669 sock: self,
670 nonblocking,
671 handler_registered: false,
672 };
673 if let Some(timeout) = timeout {
674 tokio::select! {
675 res = acceptor => res,
676 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
677 }
678 } else {
679 acceptor.await
680 }
681 }
682
683 pub fn close(&self) -> Result<(), Errno> {
684 let mut inner = self.inner.protected.write().unwrap();
685 match &mut inner.kind {
686 InodeSocketKind::TcpListener { .. } => {}
687 InodeSocketKind::TcpStream { socket, .. } => {
688 socket.close().map_err(net_error_into_wasi_err)?;
689 }
690 InodeSocketKind::BoundTcp { .. } => {}
691 InodeSocketKind::Icmp(_) => {}
692 InodeSocketKind::UdpSocket { .. } => {}
693 InodeSocketKind::Raw(_) => {}
694 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
695 InodeSocketKind::RemoteSocket { .. } => {}
696 };
697 Ok(())
698 }
699
700 pub async fn connect(
701 &mut self,
702 tasks: &dyn VirtualTaskManager,
703 net: &dyn VirtualNetworking,
704 peer: SocketAddr,
705 timeout: Option<std::time::Duration>,
706 nonblocking: bool,
707 ) -> Result<Option<InodeSocket>, Errno> {
708 let new_write_timeout;
709 let new_read_timeout;
710
711 let timeout = timeout
712 .or_else(|| self.opt_time(TimeType::ConnectTimeout).ok().flatten())
713 .unwrap_or(Duration::from_secs(30));
714
715 let handler;
716 let connect: TcpConnectFuture<'_> = {
717 let mut inner = self.inner.protected.write().unwrap();
718 match &mut inner.kind {
719 InodeSocketKind::PreSocket { props, addr, .. } => {
720 handler = props.handler.take();
721 new_write_timeout = props.write_timeout;
722 new_read_timeout = props.read_timeout;
723 match props.ty {
724 Socktype::Stream => {
725 let no_delay = props.no_delay;
726 let keep_alive = props.keep_alive;
727 let dont_route = props.dont_route;
728 let addr = match addr {
729 Some(a) => *a,
730 None => {
731 let ip = match peer.is_ipv4() {
732 true => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
733 false => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
734 };
735 SocketAddr::new(ip, 0)
736 }
737 };
738 Box::pin(async move {
739 let mut ret = net
740 .connect_tcp(addr, peer)
741 .await
742 .map_err(net_error_into_wasi_err)?;
743 if let Some(no_delay) = no_delay {
744 ret.set_nodelay(no_delay).ok();
745 }
746 if let Some(keep_alive) = keep_alive {
747 ret.set_keepalive(keep_alive).ok();
748 }
749 if let Some(dont_route) = dont_route {
750 ret.set_dontroute(dont_route).ok();
751 }
752 if !nonblocking {
753 futures::future::poll_fn(|cx| ret.poll_write_ready(cx))
754 .await
755 .map_err(net_error_into_wasi_err)?;
756 }
757 Ok(ret)
758 })
759 }
760 Socktype::Dgram => return Err(Errno::Inval),
761 _ => return Err(Errno::Notsup),
762 }
763 }
764 InodeSocketKind::BoundTcp { socket, props } => {
765 handler = props.handler.take();
766 new_write_timeout = props.write_timeout;
767 new_read_timeout = props.read_timeout;
768 match props.ty {
769 Socktype::Stream => {
770 let no_delay = props.no_delay;
771 let keep_alive = props.keep_alive;
772 let dont_route = props.dont_route;
773 let mut ret = socket.connect(peer).map_err(net_error_into_wasi_err)?;
774 if let Some(no_delay) = no_delay {
775 ret.set_nodelay(no_delay).ok();
776 }
777 if let Some(keep_alive) = keep_alive {
778 ret.set_keepalive(keep_alive).ok();
779 }
780 if let Some(dont_route) = dont_route {
781 ret.set_dontroute(dont_route).ok();
782 }
783 Box::pin(async move {
784 if !nonblocking {
785 futures::future::poll_fn(|cx| ret.poll_write_ready(cx))
786 .await
787 .map_err(net_error_into_wasi_err)?;
788 }
789 Ok(ret)
790 })
791 }
792 Socktype::Dgram => return Err(Errno::Inval),
793 _ => return Err(Errno::Notsup),
794 }
795 }
796 InodeSocketKind::UdpSocket {
797 peer: target_peer, ..
798 } => {
799 target_peer.replace(peer);
800 return Ok(None);
801 }
802 InodeSocketKind::RemoteSocket { peer_addr, .. } => {
803 *peer_addr = peer;
804 return Ok(None);
805 }
806 _ => return Err(Errno::Notsup),
807 }
808 };
809
810 let mut socket = tokio::select! {
811 res = connect => res?,
812 _ = tasks.sleep_now(timeout) => return Err(Errno::Timedout)
813 };
814
815 if let Some(handler) = handler {
816 socket
817 .set_handler(handler)
818 .map_err(net_error_into_wasi_err)?;
819 }
820
821 let socket = InodeSocket::new(InodeSocketKind::TcpStream {
822 socket,
823 write_timeout: new_write_timeout,
824 read_timeout: new_read_timeout,
825 });
826
827 Ok(Some(socket))
828 }
829
830 pub fn status(&self) -> Result<WasiSocketStatus, Errno> {
831 let inner = self.inner.protected.read().unwrap();
832 Ok(match &inner.kind {
833 InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
834 InodeSocketKind::BoundTcp { .. } => WasiSocketStatus::Opened,
835 InodeSocketKind::TcpListener { .. } => WasiSocketStatus::Opened,
836 InodeSocketKind::TcpStream { socket, .. } => match socket.status() {
837 Ok(virtual_net::SocketStatus::Opening) => WasiSocketStatus::Opening,
838 Ok(virtual_net::SocketStatus::Opened) => WasiSocketStatus::Opened,
839 Ok(virtual_net::SocketStatus::Closed) => WasiSocketStatus::Closed,
840 Ok(virtual_net::SocketStatus::Failed) => WasiSocketStatus::Failed,
841 Err(_) => WasiSocketStatus::Failed,
842 },
843 InodeSocketKind::UdpSocket { .. } => WasiSocketStatus::Opened,
844 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
845 true => WasiSocketStatus::Closed,
846 false => WasiSocketStatus::Opened,
847 },
848 _ => WasiSocketStatus::Failed,
849 })
850 }
851
852 pub fn addr_local(&self) -> Result<SocketAddr, Errno> {
853 let inner = self.inner.protected.read().unwrap();
854 Ok(match &inner.kind {
855 InodeSocketKind::PreSocket { props, addr, .. } => {
856 if let Some(addr) = addr {
857 *addr
858 } else {
859 SocketAddr::new(
860 match props.family {
861 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
862 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
863 _ => return Err(Errno::Inval),
864 },
865 0,
866 )
867 }
868 }
869 InodeSocketKind::Icmp(sock) => sock.addr_local().map_err(net_error_into_wasi_err)?,
870 InodeSocketKind::TcpListener { socket, .. } => {
871 socket.addr_local().map_err(net_error_into_wasi_err)?
872 }
873 InodeSocketKind::TcpStream { socket, .. } => {
874 socket.addr_local().map_err(net_error_into_wasi_err)?
875 }
876 InodeSocketKind::BoundTcp { socket, .. } => {
877 socket.addr_local().map_err(net_error_into_wasi_err)?
878 }
879 InodeSocketKind::UdpSocket { socket, .. } => {
880 socket.addr_local().map_err(net_error_into_wasi_err)?
881 }
882 InodeSocketKind::RemoteSocket {
883 local_addr: addr, ..
884 } => *addr,
885 _ => return Err(Errno::Notsup),
886 })
887 }
888
889 pub fn addr_peer(&self) -> Result<SocketAddr, Errno> {
890 let inner = self.inner.protected.read().unwrap();
891 match &inner.kind {
892 InodeSocketKind::BoundTcp { .. } => Err(Errno::Notconn),
893 InodeSocketKind::PreSocket { props, .. } => Ok(SocketAddr::new(
894 match props.family {
895 Addressfamily::Inet4 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
896 Addressfamily::Inet6 => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
897 _ => return Err(Errno::Inval),
898 },
899 0,
900 )),
901 InodeSocketKind::TcpStream { socket, .. } => {
902 socket.addr_peer().map_err(net_error_into_wasi_err)
903 }
904 InodeSocketKind::UdpSocket { socket, .. } => socket
905 .addr_peer()
906 .map_err(net_error_into_wasi_err)?
907 .map(Ok)
908 .unwrap_or_else(|| {
909 socket
910 .addr_local()
911 .map_err(net_error_into_wasi_err)
912 .map(|addr| {
913 SocketAddr::new(
914 match addr {
915 SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
916 SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
917 },
918 0,
919 )
920 })
921 }),
922 InodeSocketKind::RemoteSocket { peer_addr, .. } => Ok(*peer_addr),
923 _ => Err(Errno::Notsup),
924 }
925 }
926
927 pub fn set_opt_flag(&mut self, option: WasiSocketOption, val: bool) -> Result<(), Errno> {
928 let mut inner = self.inner.protected.write().unwrap();
929 match &mut inner.kind {
930 InodeSocketKind::PreSocket { props, .. }
931 | InodeSocketKind::BoundTcp { props, .. }
932 | InodeSocketKind::RemoteSocket { props, .. } => {
933 match option {
934 WasiSocketOption::OnlyV6 => props.only_v6 = val,
935 WasiSocketOption::ReusePort => props.reuse_port = val,
936 WasiSocketOption::ReuseAddr => props.reuse_addr = val,
937 WasiSocketOption::NoDelay => props.no_delay = Some(val),
938 WasiSocketOption::KeepAlive => props.keep_alive = Some(val),
939 WasiSocketOption::DontRoute => props.dont_route = Some(val),
940 _ => return Err(Errno::Inval),
941 };
942 }
943 InodeSocketKind::Raw(sock) => match option {
944 WasiSocketOption::Promiscuous => {
945 sock.set_promiscuous(val).map_err(net_error_into_wasi_err)?
946 }
947 _ => return Err(Errno::Inval),
948 },
949 InodeSocketKind::TcpStream { socket, .. } => match option {
950 WasiSocketOption::NoDelay => {
951 socket.set_nodelay(val).map_err(net_error_into_wasi_err)?
952 }
953 WasiSocketOption::KeepAlive => {
954 socket.set_keepalive(val).map_err(net_error_into_wasi_err)?
955 }
956 WasiSocketOption::DontRoute => {
957 socket.set_dontroute(val).map_err(net_error_into_wasi_err)?
958 }
959 _ => return Err(Errno::Inval),
960 },
961 InodeSocketKind::TcpListener { .. } => return Err(Errno::Inval),
962 InodeSocketKind::UdpSocket { socket, .. } => match option {
963 WasiSocketOption::Broadcast => {
964 socket.set_broadcast(val).map_err(net_error_into_wasi_err)?
965 }
966 WasiSocketOption::MulticastLoopV4 => socket
967 .set_multicast_loop_v4(val)
968 .map_err(net_error_into_wasi_err)?,
969 WasiSocketOption::MulticastLoopV6 => socket
970 .set_multicast_loop_v6(val)
971 .map_err(net_error_into_wasi_err)?,
972 _ => return Err(Errno::Inval),
973 },
974 _ => return Err(Errno::Notsup),
975 }
976 Ok(())
977 }
978
979 pub fn get_opt_flag(&self, option: WasiSocketOption) -> Result<bool, Errno> {
980 let mut inner = self.inner.protected.write().unwrap();
981 Ok(match &mut inner.kind {
982 InodeSocketKind::PreSocket { props, .. }
983 | InodeSocketKind::BoundTcp { props, .. }
984 | InodeSocketKind::RemoteSocket { props, .. } => match option {
985 WasiSocketOption::OnlyV6 => props.only_v6,
986 WasiSocketOption::ReusePort => props.reuse_port,
987 WasiSocketOption::ReuseAddr => props.reuse_addr,
988 WasiSocketOption::NoDelay => props.no_delay.unwrap_or_default(),
989 WasiSocketOption::KeepAlive => props.keep_alive.unwrap_or_default(),
990 _ => return Err(Errno::Inval),
991 },
992 InodeSocketKind::Raw(sock) => match option {
993 WasiSocketOption::Promiscuous => {
994 sock.promiscuous().map_err(net_error_into_wasi_err)?
995 }
996 _ => return Err(Errno::Inval),
997 },
998 InodeSocketKind::TcpStream { socket, .. } => match option {
999 WasiSocketOption::NoDelay => socket.nodelay().map_err(net_error_into_wasi_err)?,
1000 WasiSocketOption::KeepAlive => {
1001 socket.keepalive().map_err(net_error_into_wasi_err)?
1002 }
1003 WasiSocketOption::DontRoute => {
1004 socket.dontroute().map_err(net_error_into_wasi_err)?
1005 }
1006 _ => return Err(Errno::Inval),
1007 },
1008 InodeSocketKind::UdpSocket { socket, .. } => match option {
1009 WasiSocketOption::Broadcast => {
1010 socket.broadcast().map_err(net_error_into_wasi_err)?
1011 }
1012 WasiSocketOption::MulticastLoopV4 => socket
1013 .multicast_loop_v4()
1014 .map_err(net_error_into_wasi_err)?,
1015 WasiSocketOption::MulticastLoopV6 => socket
1016 .multicast_loop_v6()
1017 .map_err(net_error_into_wasi_err)?,
1018 _ => return Err(Errno::Inval),
1019 },
1020 _ => return Err(Errno::Notsup),
1021 })
1022 }
1023
1024 pub fn set_send_buf_size(&mut self, size: usize) -> Result<(), Errno> {
1025 let mut inner = self.inner.protected.write().unwrap();
1026 match &mut inner.kind {
1027 InodeSocketKind::PreSocket { props, .. }
1028 | InodeSocketKind::BoundTcp { props, .. }
1029 | InodeSocketKind::RemoteSocket { props, .. } => {
1030 props.send_buf_size = Some(size);
1031 }
1032 InodeSocketKind::TcpStream { socket, .. } => {
1033 socket
1034 .set_send_buf_size(size)
1035 .map_err(net_error_into_wasi_err)?;
1036 }
1037 _ => return Err(Errno::Notsup),
1038 }
1039 Ok(())
1040 }
1041
1042 pub fn send_buf_size(&self) -> Result<usize, Errno> {
1043 let inner = self.inner.protected.read().unwrap();
1044 match &inner.kind {
1045 InodeSocketKind::PreSocket { props, .. }
1046 | InodeSocketKind::BoundTcp { props, .. }
1047 | InodeSocketKind::RemoteSocket { props, .. } => {
1048 Ok(props.send_buf_size.unwrap_or_default())
1049 }
1050 InodeSocketKind::TcpStream { socket, .. } => {
1051 socket.send_buf_size().map_err(net_error_into_wasi_err)
1052 }
1053 _ => Err(Errno::Notsup),
1054 }
1055 }
1056
1057 pub fn set_recv_buf_size(&mut self, size: usize) -> Result<(), Errno> {
1058 let mut inner = self.inner.protected.write().unwrap();
1059 match &mut inner.kind {
1060 InodeSocketKind::PreSocket { props, .. }
1061 | InodeSocketKind::BoundTcp { props, .. }
1062 | InodeSocketKind::RemoteSocket { props, .. } => {
1063 props.recv_buf_size = Some(size);
1064 }
1065 InodeSocketKind::TcpStream { socket, .. } => {
1066 socket
1067 .set_recv_buf_size(size)
1068 .map_err(net_error_into_wasi_err)?;
1069 }
1070 _ => return Err(Errno::Notsup),
1071 }
1072 Ok(())
1073 }
1074
1075 pub fn recv_buf_size(&self) -> Result<usize, Errno> {
1076 let inner = self.inner.protected.read().unwrap();
1077 match &inner.kind {
1078 InodeSocketKind::PreSocket { props, .. }
1079 | InodeSocketKind::BoundTcp { props, .. }
1080 | InodeSocketKind::RemoteSocket { props, .. } => {
1081 Ok(props.recv_buf_size.unwrap_or_default())
1082 }
1083 InodeSocketKind::TcpStream { socket, .. } => {
1084 socket.recv_buf_size().map_err(net_error_into_wasi_err)
1085 }
1086 _ => Err(Errno::Notsup),
1087 }
1088 }
1089
1090 pub fn set_linger(&mut self, linger: Option<std::time::Duration>) -> Result<(), Errno> {
1091 let mut inner = self.inner.protected.write().unwrap();
1092 match &mut inner.kind {
1093 InodeSocketKind::TcpStream { socket, .. } => {
1094 socket.set_linger(linger).map_err(net_error_into_wasi_err)
1095 }
1096 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1097 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1098 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1099 _ => Err(Errno::Notsup),
1100 }
1101 }
1102
1103 pub fn linger(&self) -> Result<Option<std::time::Duration>, Errno> {
1104 let inner = self.inner.protected.read().unwrap();
1105 match &inner.kind {
1106 InodeSocketKind::TcpStream { socket, .. } => {
1107 socket.linger().map_err(net_error_into_wasi_err)
1108 }
1109 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1110 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1111 _ => Err(Errno::Notsup),
1112 }
1113 }
1114
1115 pub fn set_opt_time(
1116 &self,
1117 ty: TimeType,
1118 timeout: Option<std::time::Duration>,
1119 ) -> Result<(), Errno> {
1120 let mut inner = self.inner.protected.write().unwrap();
1121 match &mut inner.kind {
1122 InodeSocketKind::TcpStream {
1123 write_timeout,
1124 read_timeout,
1125 ..
1126 } => {
1127 match ty {
1128 TimeType::WriteTimeout => *write_timeout = timeout,
1129 TimeType::ReadTimeout => *read_timeout = timeout,
1130 _ => return Err(Errno::Inval),
1131 }
1132 Ok(())
1133 }
1134 InodeSocketKind::TcpListener { accept_timeout, .. } => {
1135 match ty {
1136 TimeType::AcceptTimeout => *accept_timeout = timeout,
1137 _ => return Err(Errno::Inval),
1138 }
1139 Ok(())
1140 }
1141 InodeSocketKind::PreSocket { props, .. }
1142 | InodeSocketKind::BoundTcp { props, .. }
1143 | InodeSocketKind::RemoteSocket { props, .. } => {
1144 match ty {
1145 TimeType::ConnectTimeout => props.connect_timeout = timeout,
1146 TimeType::AcceptTimeout => props.accept_timeout = timeout,
1147 TimeType::ReadTimeout => props.read_timeout = timeout,
1148 TimeType::WriteTimeout => props.write_timeout = timeout,
1149 _ => return Err(Errno::Io),
1150 }
1151 Ok(())
1152 }
1153 _ => Err(Errno::Notsup),
1154 }
1155 }
1156
1157 pub fn opt_time(&self, ty: TimeType) -> Result<Option<std::time::Duration>, Errno> {
1158 let inner = self.inner.protected.read().unwrap();
1159 match &inner.kind {
1160 InodeSocketKind::TcpStream {
1161 read_timeout,
1162 write_timeout,
1163 ..
1164 } => Ok(match ty {
1165 TimeType::ReadTimeout => *read_timeout,
1166 TimeType::WriteTimeout => *write_timeout,
1167 _ => return Err(Errno::Inval),
1168 }),
1169 InodeSocketKind::TcpListener { accept_timeout, .. } => Ok(match ty {
1170 TimeType::AcceptTimeout => *accept_timeout,
1171 _ => return Err(Errno::Inval),
1172 }),
1173 InodeSocketKind::PreSocket { props, .. }
1174 | InodeSocketKind::BoundTcp { props, .. }
1175 | InodeSocketKind::RemoteSocket { props, .. } => match ty {
1176 TimeType::ConnectTimeout => Ok(props.connect_timeout),
1177 TimeType::AcceptTimeout => Ok(props.accept_timeout),
1178 TimeType::ReadTimeout => Ok(props.read_timeout),
1179 TimeType::WriteTimeout => Ok(props.write_timeout),
1180 _ => Err(Errno::Inval),
1181 },
1182 _ => Err(Errno::Notsup),
1183 }
1184 }
1185
1186 pub fn set_ttl(&self, ttl: u32) -> Result<(), Errno> {
1187 let mut inner = self.inner.protected.write().unwrap();
1188 match &mut inner.kind {
1189 InodeSocketKind::BoundTcp { socket, .. } => {
1190 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1191 }
1192 InodeSocketKind::TcpStream { socket, .. } => {
1193 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1194 }
1195 InodeSocketKind::UdpSocket { socket, .. } => {
1196 socket.set_ttl(ttl).map_err(net_error_into_wasi_err)
1197 }
1198 InodeSocketKind::RemoteSocket { ttl: set_ttl, .. } => {
1199 *set_ttl = ttl;
1200 Ok(())
1201 }
1202 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1203 _ => Err(Errno::Notsup),
1204 }
1205 }
1206
1207 pub fn ttl(&self) -> Result<u32, Errno> {
1208 let inner = self.inner.protected.read().unwrap();
1209 match &inner.kind {
1210 InodeSocketKind::BoundTcp { socket, .. } => {
1211 socket.ttl().map_err(net_error_into_wasi_err)
1212 }
1213 InodeSocketKind::TcpStream { socket, .. } => {
1214 socket.ttl().map_err(net_error_into_wasi_err)
1215 }
1216 InodeSocketKind::UdpSocket { socket, .. } => {
1217 socket.ttl().map_err(net_error_into_wasi_err)
1218 }
1219 InodeSocketKind::RemoteSocket { ttl, .. } => Ok(*ttl),
1220 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1221 _ => Err(Errno::Notsup),
1222 }
1223 }
1224
1225 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> Result<(), Errno> {
1226 let mut inner = self.inner.protected.write().unwrap();
1227 match &mut inner.kind {
1228 InodeSocketKind::UdpSocket { socket, .. } => socket
1229 .set_multicast_ttl_v4(ttl)
1230 .map_err(net_error_into_wasi_err),
1231 InodeSocketKind::RemoteSocket {
1232 multicast_ttl: set_ttl,
1233 ..
1234 } => {
1235 *set_ttl = ttl;
1236 Ok(())
1237 }
1238 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1239 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1240 _ => Err(Errno::Notsup),
1241 }
1242 }
1243
1244 pub fn multicast_ttl_v4(&self) -> Result<u32, Errno> {
1245 let inner = self.inner.protected.read().unwrap();
1246 match &inner.kind {
1247 InodeSocketKind::UdpSocket { socket, .. } => {
1248 socket.multicast_ttl_v4().map_err(net_error_into_wasi_err)
1249 }
1250 InodeSocketKind::RemoteSocket { multicast_ttl, .. } => Ok(*multicast_ttl),
1251 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1252 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1253 _ => Err(Errno::Notsup),
1254 }
1255 }
1256
1257 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1258 let mut inner = self.inner.protected.write().unwrap();
1259 match &mut inner.kind {
1260 InodeSocketKind::UdpSocket { socket, .. } => socket
1261 .join_multicast_v4(multiaddr, iface)
1262 .map_err(net_error_into_wasi_err),
1263 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1264 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1265 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1266 _ => Err(Errno::Notsup),
1267 }
1268 }
1269
1270 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, iface: Ipv4Addr) -> Result<(), Errno> {
1271 let mut inner = self.inner.protected.write().unwrap();
1272 match &mut inner.kind {
1273 InodeSocketKind::UdpSocket { socket, .. } => socket
1274 .leave_multicast_v4(multiaddr, iface)
1275 .map_err(net_error_into_wasi_err),
1276 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1277 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1278 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1279 _ => Err(Errno::Notsup),
1280 }
1281 }
1282
1283 pub fn join_multicast_v6(&self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1284 let mut inner = self.inner.protected.write().unwrap();
1285 match &mut inner.kind {
1286 InodeSocketKind::UdpSocket { socket, .. } => socket
1287 .join_multicast_v6(multiaddr, iface)
1288 .map_err(net_error_into_wasi_err),
1289 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1290 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1291 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1292 _ => Err(Errno::Notsup),
1293 }
1294 }
1295
1296 pub fn leave_multicast_v6(&mut self, multiaddr: Ipv6Addr, iface: u32) -> Result<(), Errno> {
1297 let mut inner = self.inner.protected.write().unwrap();
1298 match &mut inner.kind {
1299 InodeSocketKind::UdpSocket { socket, .. } => socket
1300 .leave_multicast_v6(multiaddr, iface)
1301 .map_err(net_error_into_wasi_err),
1302 InodeSocketKind::RemoteSocket { .. } => Ok(()),
1303 InodeSocketKind::BoundTcp { .. } => Err(Errno::Io),
1304 InodeSocketKind::PreSocket { .. } => Err(Errno::Io),
1305 _ => Err(Errno::Notsup),
1306 }
1307 }
1308
1309 pub async fn send(
1310 &self,
1311 tasks: &dyn VirtualTaskManager,
1312 buf: &[u8],
1313 timeout: Option<Duration>,
1314 nonblocking: bool,
1315 ) -> Result<usize, Errno> {
1316 struct SocketSender<'a, 'b> {
1317 inner: &'a InodeSocketInner,
1318 data: &'b [u8],
1319 nonblocking: bool,
1320 handler_registered: bool,
1321 }
1322 impl Drop for SocketSender<'_, '_> {
1323 fn drop(&mut self) {
1324 if self.handler_registered {
1325 let mut inner = self.inner.protected.write().unwrap();
1326 inner.remove_handler();
1327 }
1328 }
1329 }
1330 impl Future for SocketSender<'_, '_> {
1331 type Output = Result<usize, Errno>;
1332 fn poll(
1333 mut self: Pin<&mut Self>,
1334 cx: &mut std::task::Context<'_>,
1335 ) -> Poll<Self::Output> {
1336 loop {
1337 let mut inner = self.inner.protected.write().unwrap();
1338 let res = match &mut inner.kind {
1339 InodeSocketKind::Raw(socket) => socket.try_send(self.data),
1340 InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1341 InodeSocketKind::UdpSocket { socket, peer } => {
1342 if let Some(peer) = peer {
1343 socket.try_send_to(self.data, *peer)
1344 } else {
1345 Err(NetworkError::NotConnected)
1346 }
1347 }
1348 InodeSocketKind::PreSocket { .. } => {
1349 return Poll::Ready(Err(Errno::Notconn));
1350 }
1351 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1352 return match is_dead {
1353 true => Poll::Ready(Err(Errno::Connreset)),
1354 false => Poll::Ready(Ok(self.data.len())),
1355 };
1356 }
1357 _ => return Poll::Ready(Err(Errno::Notsup)),
1358 };
1359 return match res {
1360 Ok(amt) => Poll::Ready(Ok(amt)),
1361 Err(NetworkError::WouldBlock) if self.nonblocking => {
1362 Poll::Ready(Err(Errno::Again))
1363 }
1364 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1365 inner
1366 .set_handler(cx.waker().into())
1367 .map_err(net_error_into_wasi_err)?;
1368 drop(inner);
1369 self.handler_registered = true;
1370 continue;
1371 }
1372 Err(NetworkError::WouldBlock) => Poll::Pending,
1373 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1374 };
1375 }
1376 }
1377 }
1378
1379 let poller = SocketSender {
1380 inner: &self.inner,
1381 data: buf,
1382 nonblocking,
1383 handler_registered: false,
1384 };
1385 if let Some(timeout) = timeout {
1386 tokio::select! {
1387 res = poller => res,
1388 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1389 }
1390 } else {
1391 poller.await
1392 }
1393 }
1394
1395 pub async fn send_to<M: MemorySize>(
1396 &self,
1397 tasks: &dyn VirtualTaskManager,
1398 buf: &[u8],
1399 addr: SocketAddr,
1400 timeout: Option<Duration>,
1401 nonblocking: bool,
1402 ) -> Result<usize, Errno> {
1403 struct SocketSender<'a, 'b> {
1404 inner: &'a InodeSocketInner,
1405 data: &'b [u8],
1406 addr: SocketAddr,
1407 nonblocking: bool,
1408 handler_registered: bool,
1409 }
1410 impl Drop for SocketSender<'_, '_> {
1411 fn drop(&mut self) {
1412 if self.handler_registered {
1413 let mut inner = self.inner.protected.write().unwrap();
1414 inner.remove_handler();
1415 }
1416 }
1417 }
1418 impl Future for SocketSender<'_, '_> {
1419 type Output = Result<usize, Errno>;
1420 fn poll(
1421 mut self: Pin<&mut Self>,
1422 cx: &mut std::task::Context<'_>,
1423 ) -> Poll<Self::Output> {
1424 loop {
1425 let mut inner = self.inner.protected.write().unwrap();
1426 let res = match &mut inner.kind {
1427 InodeSocketKind::Icmp(socket) => socket.try_send_to(self.data, self.addr),
1428 InodeSocketKind::TcpStream { socket, .. } => socket.try_send(self.data),
1429 InodeSocketKind::UdpSocket { socket, .. } => {
1430 socket.try_send_to(self.data, self.addr)
1431 }
1432 InodeSocketKind::PreSocket { .. } => {
1433 return Poll::Ready(Err(Errno::Notconn));
1434 }
1435 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1436 return match is_dead {
1437 true => Poll::Ready(Err(Errno::Connreset)),
1438 false => Poll::Ready(Ok(self.data.len())),
1439 };
1440 }
1441 _ => return Poll::Ready(Err(Errno::Notsup)),
1442 };
1443 return match res {
1444 Ok(amt) => Poll::Ready(Ok(amt)),
1445 Err(NetworkError::WouldBlock) if self.nonblocking => {
1446 Poll::Ready(Err(Errno::Again))
1447 }
1448 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1449 inner
1450 .set_handler(cx.waker().into())
1451 .map_err(net_error_into_wasi_err)?;
1452 self.handler_registered = true;
1453 drop(inner);
1454 continue;
1455 }
1456 Err(NetworkError::WouldBlock) => Poll::Pending,
1457 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1458 };
1459 }
1460 }
1461 }
1462
1463 let poller = SocketSender {
1464 inner: &self.inner,
1465 data: buf,
1466 addr,
1467 nonblocking,
1468 handler_registered: false,
1469 };
1470 if let Some(timeout) = timeout {
1471 tokio::select! {
1472 res = poller => res,
1473 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1474 }
1475 } else {
1476 poller.await
1477 }
1478 }
1479
1480 pub async fn recv(
1481 &self,
1482 tasks: &dyn VirtualTaskManager,
1483 buf: &mut [MaybeUninit<u8>],
1484 timeout: Option<Duration>,
1485 nonblocking: bool,
1486 peek: bool,
1487 ) -> Result<usize, Errno> {
1488 struct SocketReceiver<'a, 'b> {
1489 inner: &'a InodeSocketInner,
1490 data: &'b mut [MaybeUninit<u8>],
1491 nonblocking: bool,
1492 peek: bool,
1493 handler_registered: bool,
1494 }
1495 impl Drop for SocketReceiver<'_, '_> {
1496 fn drop(&mut self) {
1497 if self.handler_registered {
1498 let mut inner = self.inner.protected.write().unwrap();
1499 inner.remove_handler();
1500 }
1501 }
1502 }
1503 impl Future for SocketReceiver<'_, '_> {
1504 type Output = Result<usize, Errno>;
1505 fn poll(
1506 mut self: Pin<&mut Self>,
1507 cx: &mut std::task::Context<'_>,
1508 ) -> Poll<Self::Output> {
1509 loop {
1510 let peek = self.peek;
1511 let mut inner = self.inner.protected.write().unwrap();
1512 let res = match &mut inner.kind {
1513 InodeSocketKind::Raw(socket) => socket.try_recv(self.data, peek),
1514 InodeSocketKind::TcpStream { socket, .. } => {
1515 socket.try_recv(self.data, peek)
1516 }
1517 InodeSocketKind::UdpSocket { socket, peer } => {
1518 if let Some(peer) = peer {
1519 match socket.try_recv_from(self.data, peek) {
1520 Ok((amt, addr)) if addr == *peer => Ok(amt),
1521 Ok(_) => Err(NetworkError::WouldBlock),
1522 Err(err) => Err(err),
1523 }
1524 } else {
1525 match socket.try_recv_from(self.data, peek) {
1526 Ok((amt, _)) => Ok(amt),
1527 Err(err) => Err(err),
1528 }
1529 }
1530 }
1531 InodeSocketKind::RemoteSocket { is_dead, .. } => {
1532 return match is_dead {
1533 true => Poll::Ready(Ok(0)),
1534 false => Poll::Pending,
1535 };
1536 }
1537 InodeSocketKind::PreSocket { .. } => {
1538 return Poll::Ready(Err(Errno::Notconn));
1539 }
1540 _ => return Poll::Ready(Err(Errno::Notsup)),
1541 };
1542 return match res {
1543 Ok(amt) => Poll::Ready(Ok(amt)),
1544 Err(NetworkError::WouldBlock) if self.nonblocking => {
1545 Poll::Ready(Err(Errno::Again))
1546 }
1547 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1548 inner
1549 .set_handler(cx.waker().into())
1550 .map_err(net_error_into_wasi_err)?;
1551 self.handler_registered = true;
1552 drop(inner);
1553 continue;
1554 }
1555
1556 Err(NetworkError::WouldBlock) => Poll::Pending,
1557 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1558 };
1559 }
1560 }
1561 }
1562
1563 let poller = SocketReceiver {
1564 inner: &self.inner,
1565 data: buf,
1566 nonblocking,
1567 peek,
1568 handler_registered: false,
1569 };
1570 if let Some(timeout) = timeout {
1571 tokio::select! {
1572 res = poller => res,
1573 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1574 }
1575 } else {
1576 poller.await
1577 }
1578 }
1579
1580 pub async fn recv_from(
1581 &self,
1582 tasks: &dyn VirtualTaskManager,
1583 buf: &mut [MaybeUninit<u8>],
1584 timeout: Option<Duration>,
1585 nonblocking: bool,
1586 peek: bool,
1587 ) -> Result<(usize, SocketAddr), Errno> {
1588 struct SocketReceiver<'a, 'b> {
1589 inner: &'a InodeSocketInner,
1590 data: &'b mut [MaybeUninit<u8>],
1591 nonblocking: bool,
1592 peek: bool,
1593 handler_registered: bool,
1594 }
1595 impl Drop for SocketReceiver<'_, '_> {
1596 fn drop(&mut self) {
1597 if self.handler_registered {
1598 let mut inner = self.inner.protected.write().unwrap();
1599 inner.remove_handler();
1600 }
1601 }
1602 }
1603 impl Future for SocketReceiver<'_, '_> {
1604 type Output = Result<(usize, SocketAddr), Errno>;
1605 fn poll(
1606 mut self: Pin<&mut Self>,
1607 cx: &mut std::task::Context<'_>,
1608 ) -> Poll<Self::Output> {
1609 let peek = self.peek;
1610 let mut inner = self.inner.protected.write().unwrap();
1611 loop {
1612 let res = match &mut inner.kind {
1613 InodeSocketKind::Icmp(socket) => socket.try_recv_from(self.data, peek),
1614 InodeSocketKind::UdpSocket { socket, .. } => {
1615 socket.try_recv_from(self.data, peek)
1616 }
1617 InodeSocketKind::RemoteSocket {
1618 is_dead, peer_addr, ..
1619 } => {
1620 return match is_dead {
1621 true => Poll::Ready(Ok((0, *peer_addr))),
1622 false => Poll::Pending,
1623 };
1624 }
1625 InodeSocketKind::PreSocket { .. } => {
1626 return Poll::Ready(Err(Errno::Notconn));
1627 }
1628 _ => return Poll::Ready(Err(Errno::Notsup)),
1629 };
1630 return match res {
1631 Ok((amt, addr)) => Poll::Ready(Ok((amt, addr))),
1632 Err(NetworkError::WouldBlock) if self.nonblocking => {
1633 Poll::Ready(Err(Errno::Again))
1634 }
1635 Err(NetworkError::WouldBlock) if !self.handler_registered => {
1636 inner
1637 .set_handler(cx.waker().into())
1638 .map_err(net_error_into_wasi_err)?;
1639 self.handler_registered = true;
1640 continue;
1641 }
1642 Err(NetworkError::WouldBlock) => Poll::Pending,
1643 Err(err) => Poll::Ready(Err(net_error_into_wasi_err(err))),
1644 };
1645 }
1646 }
1647 }
1648
1649 let poller = SocketReceiver {
1650 inner: &self.inner,
1651 data: buf,
1652 nonblocking,
1653 peek,
1654 handler_registered: false,
1655 };
1656 if let Some(timeout) = timeout {
1657 tokio::select! {
1658 res = poller => res,
1659 _ = tasks.sleep_now(timeout) => Err(Errno::Timedout)
1660 }
1661 } else {
1662 poller.await
1663 }
1664 }
1665
1666 pub fn shutdown(&mut self, how: std::net::Shutdown) -> Result<(), Errno> {
1667 let mut inner = self.inner.protected.write().unwrap();
1668 match &mut inner.kind {
1669 InodeSocketKind::TcpStream { socket, .. } => {
1670 socket.shutdown(how).map_err(net_error_into_wasi_err)?;
1671 }
1672 InodeSocketKind::RemoteSocket { .. } => return Ok(()),
1673 InodeSocketKind::BoundTcp { .. } => return Err(Errno::Notconn),
1674 InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
1675 _ => return Err(Errno::Notsup),
1676 }
1677 Ok(())
1678 }
1679
1680 pub async fn can_write(&self) -> bool {
1681 if let Ok(mut guard) = self.inner.protected.try_write() {
1682 #[allow(clippy::match_like_matches_macro)]
1683 match &mut guard.kind {
1684 InodeSocketKind::TcpStream { .. }
1685 | InodeSocketKind::BoundTcp { .. }
1686 | InodeSocketKind::UdpSocket { .. }
1687 | InodeSocketKind::Raw(..) => true,
1688 InodeSocketKind::RemoteSocket { is_dead, .. } => !(*is_dead),
1689 _ => false,
1690 }
1691 } else {
1692 false
1693 }
1694 }
1695}
1696
1697impl InodeSocketProtected {
1698 pub fn remove_handler(&mut self) {
1699 match &mut self.kind {
1700 InodeSocketKind::TcpListener { socket, .. } => socket.remove_handler(),
1701 InodeSocketKind::TcpStream { socket, .. } => socket.remove_handler(),
1702 InodeSocketKind::UdpSocket { socket, .. } => socket.remove_handler(),
1703 InodeSocketKind::Raw(socket) => socket.remove_handler(),
1704 InodeSocketKind::Icmp(socket) => socket.remove_handler(),
1705 InodeSocketKind::PreSocket { props, .. } => {
1706 props.handler.take();
1707 }
1708 InodeSocketKind::BoundTcp { props, .. } => {
1709 props.handler.take();
1710 }
1711 InodeSocketKind::RemoteSocket { props, .. } => {
1712 props.handler.take();
1713 }
1714 }
1715 }
1716
1717 pub fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1718 match &mut self.kind {
1719 InodeSocketKind::TcpListener { socket, .. } => socket.poll_read_ready(cx),
1720 InodeSocketKind::TcpStream { socket, .. } => socket.poll_read_ready(cx),
1721 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_read_ready(cx),
1722 InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
1723 InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
1724 InodeSocketKind::BoundTcp { .. } => Poll::Pending,
1725 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1726 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1727 true => Poll::Ready(Ok(0)),
1728 false => Poll::Pending,
1729 },
1730 }
1731 .map_err(net_error_into_io_err)
1732 }
1733
1734 pub fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
1735 match &mut self.kind {
1736 InodeSocketKind::TcpListener { socket, .. } => socket.poll_write_ready(cx),
1737 InodeSocketKind::TcpStream { socket, .. } => socket.poll_write_ready(cx),
1738 InodeSocketKind::UdpSocket { socket, .. } => socket.poll_write_ready(cx),
1739 InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
1740 InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
1741 InodeSocketKind::BoundTcp { .. } => Poll::Ready(Ok(1)),
1744 InodeSocketKind::PreSocket { .. } => Poll::Pending,
1745 InodeSocketKind::RemoteSocket { is_dead, .. } => match is_dead {
1746 true => Poll::Ready(Ok(0)),
1747 false => Poll::Pending,
1748 },
1749 }
1750 .map_err(net_error_into_io_err)
1751 }
1752
1753 pub fn set_handler(
1754 &mut self,
1755 handler: Box<dyn InterestHandler + Send + Sync>,
1756 ) -> virtual_net::Result<()> {
1757 match &mut self.kind {
1758 InodeSocketKind::TcpListener { socket, .. } => socket.set_handler(handler),
1759 InodeSocketKind::TcpStream { socket, .. } => socket.set_handler(handler),
1760 InodeSocketKind::UdpSocket { socket, .. } => socket.set_handler(handler),
1761 InodeSocketKind::Raw(socket) => socket.set_handler(handler),
1762 InodeSocketKind::Icmp(socket) => socket.set_handler(handler),
1763 InodeSocketKind::PreSocket { props, .. }
1764 | InodeSocketKind::BoundTcp { props, .. }
1765 | InodeSocketKind::RemoteSocket { props, .. } => {
1766 props.handler.replace(handler);
1767 Ok(())
1768 }
1769 }
1770 }
1771}
1772
1773#[allow(dead_code)]
1775pub(crate) fn all_socket_rights() -> Rights {
1776 Rights::FD_FDSTAT_SET_FLAGS
1777 .union(Rights::FD_FILESTAT_GET)
1778 .union(Rights::FD_READ)
1779 .union(Rights::FD_WRITE)
1780 .union(Rights::POLL_FD_READWRITE)
1781 .union(Rights::SOCK_SHUTDOWN)
1782 .union(Rights::SOCK_CONNECT)
1783 .union(Rights::SOCK_LISTEN)
1784 .union(Rights::SOCK_BIND)
1785 .union(Rights::SOCK_ACCEPT)
1786 .union(Rights::SOCK_RECV)
1787 .union(Rights::SOCK_SEND)
1788 .union(Rights::SOCK_ADDR_LOCAL)
1789 .union(Rights::SOCK_ADDR_REMOTE)
1790 .union(Rights::SOCK_RECV_FROM)
1791 .union(Rights::SOCK_SEND_TO)
1792}
1793
1794#[cfg(test)]
1795mod tests {
1796 use super::{InodeSocket, InodeSocketKind, SocketProperties, WasiSocketStatus};
1797 use std::{
1798 future::pending,
1799 mem::MaybeUninit,
1800 net::{Ipv4Addr, Shutdown, SocketAddr},
1801 pin::Pin,
1802 sync::{
1803 Arc,
1804 atomic::{AtomicUsize, Ordering},
1805 },
1806 task::{Context, Poll},
1807 time::Duration,
1808 };
1809 use virtual_mio::InterestHandler;
1810 use virtual_net::{
1811 NetworkError, Result as NetResult, SocketStatus, VirtualConnectedSocket, VirtualIoSource,
1812 VirtualNetworking, VirtualSocket, VirtualTcpBoundSocket, VirtualTcpSocket,
1813 };
1814 use wasmer_wasix_types::wasi::{Addressfamily, Errno, SockProto, Socktype};
1815
1816 #[derive(Debug)]
1817 struct MockTcpSocket {
1818 read_calls: Arc<AtomicUsize>,
1819 write_calls: Arc<AtomicUsize>,
1820 status: Arc<AtomicUsize>,
1821 }
1822
1823 const MOCK_STATUS_OPENING: usize = 0;
1824 const MOCK_STATUS_OPENED: usize = 1;
1825
1826 fn decode_mock_status(value: usize) -> SocketStatus {
1827 match value {
1828 MOCK_STATUS_OPENED => SocketStatus::Opened,
1829 _ => SocketStatus::Opening,
1830 }
1831 }
1832
1833 impl VirtualIoSource for MockTcpSocket {
1834 fn remove_handler(&mut self) {}
1835
1836 fn poll_read_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1837 self.read_calls.fetch_add(1, Ordering::Relaxed);
1838 Poll::Ready(Ok(3))
1839 }
1840
1841 fn poll_write_ready(&mut self, _cx: &mut Context<'_>) -> Poll<NetResult<usize>> {
1842 self.write_calls.fetch_add(1, Ordering::Relaxed);
1843 self.status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
1844 Poll::Ready(Ok(7))
1845 }
1846 }
1847
1848 impl VirtualSocket for MockTcpSocket {
1849 fn set_ttl(&mut self, _ttl: u32) -> NetResult<()> {
1850 Ok(())
1851 }
1852
1853 fn ttl(&self) -> NetResult<u32> {
1854 Ok(64)
1855 }
1856
1857 fn addr_local(&self) -> NetResult<SocketAddr> {
1858 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)))
1859 }
1860
1861 fn status(&self) -> NetResult<SocketStatus> {
1862 Ok(decode_mock_status(self.status.load(Ordering::Relaxed)))
1863 }
1864
1865 fn set_handler(
1866 &mut self,
1867 _handler: Box<dyn InterestHandler + Send + Sync>,
1868 ) -> NetResult<()> {
1869 Ok(())
1870 }
1871 }
1872
1873 impl VirtualConnectedSocket for MockTcpSocket {
1874 fn set_linger(&mut self, _linger: Option<Duration>) -> NetResult<()> {
1875 Ok(())
1876 }
1877
1878 fn linger(&self) -> NetResult<Option<Duration>> {
1879 Ok(None)
1880 }
1881
1882 fn try_send(&mut self, _data: &[u8]) -> NetResult<usize> {
1883 Err(NetworkError::Unsupported)
1884 }
1885
1886 fn try_flush(&mut self) -> NetResult<()> {
1887 Err(NetworkError::Unsupported)
1888 }
1889
1890 fn close(&mut self) -> NetResult<()> {
1891 Ok(())
1892 }
1893
1894 fn try_recv(&mut self, _buf: &mut [MaybeUninit<u8>], _peek: bool) -> NetResult<usize> {
1895 Err(NetworkError::Unsupported)
1896 }
1897 }
1898
1899 impl VirtualTcpSocket for MockTcpSocket {
1900 fn set_recv_buf_size(&mut self, _size: usize) -> NetResult<()> {
1901 Ok(())
1902 }
1903
1904 fn recv_buf_size(&self) -> NetResult<usize> {
1905 Ok(0)
1906 }
1907
1908 fn set_send_buf_size(&mut self, _size: usize) -> NetResult<()> {
1909 Ok(())
1910 }
1911
1912 fn send_buf_size(&self) -> NetResult<usize> {
1913 Ok(0)
1914 }
1915
1916 fn set_nodelay(&mut self, _reuse: bool) -> NetResult<()> {
1917 Ok(())
1918 }
1919
1920 fn nodelay(&self) -> NetResult<bool> {
1921 Ok(true)
1922 }
1923
1924 fn set_keepalive(&mut self, _keepalive: bool) -> NetResult<()> {
1925 Ok(())
1926 }
1927
1928 fn keepalive(&self) -> NetResult<bool> {
1929 Ok(false)
1930 }
1931
1932 fn set_dontroute(&mut self, _keepalive: bool) -> NetResult<()> {
1933 Ok(())
1934 }
1935
1936 fn dontroute(&self) -> NetResult<bool> {
1937 Ok(false)
1938 }
1939
1940 fn addr_peer(&self) -> NetResult<SocketAddr> {
1941 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 80)))
1942 }
1943
1944 fn shutdown(&mut self, _how: Shutdown) -> NetResult<()> {
1945 Ok(())
1946 }
1947
1948 fn is_closed(&self) -> bool {
1949 false
1950 }
1951 }
1952
1953 #[derive(Debug)]
1954 struct MockTcpBoundSocket {
1955 ttl: Arc<AtomicUsize>,
1956 }
1957
1958 impl VirtualTcpBoundSocket for MockTcpBoundSocket {
1959 fn addr_local(&self) -> NetResult<SocketAddr> {
1960 Ok(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)))
1961 }
1962
1963 fn listen(&mut self) -> NetResult<Box<dyn virtual_net::VirtualTcpListener + Sync>> {
1964 Err(NetworkError::Unsupported)
1965 }
1966
1967 fn connect(
1968 &mut self,
1969 _peer: SocketAddr,
1970 ) -> NetResult<Box<dyn virtual_net::VirtualTcpSocket + Sync>> {
1971 Err(NetworkError::Unsupported)
1972 }
1973
1974 fn set_ttl(&mut self, ttl: u32) -> NetResult<()> {
1975 self.ttl.store(ttl as usize, Ordering::Relaxed);
1976 Ok(())
1977 }
1978
1979 fn ttl(&self) -> NetResult<u32> {
1980 Ok(self.ttl.load(Ordering::Relaxed) as u32)
1981 }
1982 }
1983
1984 #[derive(Debug)]
1985 struct PendingBindNetworking;
1986
1987 #[async_trait::async_trait]
1988 impl VirtualNetworking for PendingBindNetworking {
1989 async fn bind_tcp(
1990 &self,
1991 _addr: SocketAddr,
1992 _only_v6: bool,
1993 _reuse_port: bool,
1994 _reuse_addr: bool,
1995 ) -> NetResult<Box<dyn VirtualTcpBoundSocket + Sync>> {
1996 pending::<()>().await;
1997 unreachable!("pending bind_tcp future should never complete")
1998 }
1999 }
2000
2001 #[test]
2002 fn inode_socket_poll_write_ready_uses_write_path() {
2003 let read_calls = Arc::new(AtomicUsize::new(0));
2004 let write_calls = Arc::new(AtomicUsize::new(0));
2005 let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENED));
2006 let mut inode = InodeSocket::new(InodeSocketKind::TcpStream {
2007 socket: Box::new(MockTcpSocket {
2008 read_calls: read_calls.clone(),
2009 write_calls: write_calls.clone(),
2010 status,
2011 }),
2012 write_timeout: None,
2013 read_timeout: None,
2014 });
2015
2016 let waker = futures::task::noop_waker();
2017 let mut cx = Context::from_waker(&waker);
2018 let ready = Pin::new(&mut inode).poll_write_ready(&mut cx);
2019
2020 assert!(matches!(ready, Poll::Ready(Ok(7))));
2021 assert_eq!(read_calls.load(Ordering::Relaxed), 0);
2022 assert_eq!(write_calls.load(Ordering::Relaxed), 1);
2023 }
2024
2025 #[test]
2026 fn inode_socket_status_tracks_tcp_socket_status() {
2027 let status = Arc::new(AtomicUsize::new(MOCK_STATUS_OPENING));
2028 let inode = InodeSocket::new(InodeSocketKind::TcpStream {
2029 socket: Box::new(MockTcpSocket {
2030 read_calls: Arc::new(AtomicUsize::new(0)),
2031 write_calls: Arc::new(AtomicUsize::new(0)),
2032 status: status.clone(),
2033 }),
2034 write_timeout: None,
2035 read_timeout: None,
2036 });
2037
2038 assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opening));
2039 status.store(MOCK_STATUS_OPENED, Ordering::Relaxed);
2040 assert!(matches!(inode.status().unwrap(), WasiSocketStatus::Opened));
2041 }
2042
2043 #[test]
2044 fn inode_socket_bound_tcp_forwards_ttl() {
2045 let ttl = Arc::new(AtomicUsize::new(64));
2046 let inode = InodeSocket::new(InodeSocketKind::BoundTcp {
2047 socket: Box::new(MockTcpBoundSocket { ttl: ttl.clone() }),
2048 props: SocketProperties {
2049 family: Addressfamily::Inet4,
2050 ty: Socktype::Stream,
2051 pt: SockProto::Tcp,
2052 only_v6: false,
2053 reuse_port: false,
2054 reuse_addr: false,
2055 no_delay: None,
2056 keep_alive: None,
2057 dont_route: None,
2058 send_buf_size: None,
2059 recv_buf_size: None,
2060 write_timeout: None,
2061 read_timeout: None,
2062 accept_timeout: None,
2063 connect_timeout: None,
2064 handler: None,
2065 },
2066 });
2067
2068 inode.set_ttl(42).unwrap();
2069 assert_eq!(inode.ttl().unwrap(), 42);
2070 assert_eq!(ttl.load(Ordering::Relaxed), 42);
2071 }
2072
2073 #[cfg(feature = "sys")]
2074 #[tokio::test(flavor = "current_thread")]
2075 async fn inode_socket_tcp_bind_respects_bind_timeout() {
2076 let inode = InodeSocket::new(InodeSocketKind::PreSocket {
2077 props: SocketProperties {
2078 family: Addressfamily::Inet4,
2079 ty: Socktype::Stream,
2080 pt: SockProto::Tcp,
2081 only_v6: false,
2082 reuse_port: false,
2083 reuse_addr: false,
2084 no_delay: None,
2085 keep_alive: None,
2086 dont_route: None,
2087 send_buf_size: None,
2088 recv_buf_size: None,
2089 write_timeout: None,
2090 read_timeout: None,
2091 accept_timeout: None,
2092 connect_timeout: None,
2093 handler: None,
2094 },
2095 addr: None,
2096 });
2097 let tasks = crate::runtime::task_manager::tokio::TokioTaskManager::default();
2098 let net = PendingBindNetworking;
2099
2100 let err = inode
2101 .bind_internal(
2102 &tasks,
2103 &net,
2104 SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
2105 Duration::from_millis(10),
2106 )
2107 .await
2108 .unwrap_err();
2109 assert_eq!(err, Errno::Timedout);
2110 }
2111}