wasmer_wasix/syscalls/wasix/
sock_recv_from.rs1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%sock, nread = field::Empty, peer = field::Empty), ret)]
20pub fn sock_recv_from<M: MemorySize>(
21 mut ctx: FunctionEnvMut<'_, WasiEnv>,
22 sock: WasiFd,
23 ri_data: WasmPtr<__wasi_iovec_t<M>, M>,
24 ri_data_len: M::Offset,
25 ri_flags: RiFlags,
26 ro_data_len: WasmPtr<M::Offset, M>,
27 ro_flags: WasmPtr<RoFlags, M>,
28 ro_addr: WasmPtr<__wasi_addr_port_t, M>,
29) -> Result<Errno, WasiError> {
30 WasiEnv::do_pending_operations(&mut ctx)?;
31
32 sock_recv_from_internal(
33 ctx,
34 sock,
35 ri_data,
36 ri_data_len,
37 ri_flags,
38 ro_data_len,
39 ro_flags,
40 ro_addr,
41 )
42}
43
44pub(super) fn sock_recv_from_internal<M: MemorySize>(
45 mut ctx: FunctionEnvMut<'_, WasiEnv>,
46 sock: WasiFd,
47 ri_data: WasmPtr<__wasi_iovec_t<M>, M>,
48 ri_data_len: M::Offset,
49 ri_flags: RiFlags,
50 ro_data_len: WasmPtr<M::Offset, M>,
51 ro_flags: WasmPtr<RoFlags, M>,
52 ro_addr: WasmPtr<__wasi_addr_port_t, M>,
53) -> Result<Errno, WasiError> {
54 let mut env = ctx.data();
55 let memory = unsafe { env.memory_view(&ctx) };
56 let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
57
58 let peek = (ri_flags & __WASI_SOCK_RECV_INPUT_PEEK) != 0;
59 let nonblocking_flag = (ri_flags & __WASI_SOCK_RECV_INPUT_DONT_WAIT) != 0;
60
61 let max_size = {
62 let mut max_size = 0usize;
63 for iovs in iovs_arr.iter() {
64 let iovs = wasi_try_mem_ok!(iovs.read());
65 let buf_len: usize = wasi_try_ok!(iovs.buf_len.try_into().map_err(|_| Errno::Overflow));
66 max_size += buf_len;
67 }
68 max_size
69 };
70
71 let (bytes_read, peer) = {
72 if max_size <= 10240 {
73 let mut buf: [MaybeUninit<u8>; 10240] = unsafe { MaybeUninit::uninit().assume_init() };
74 let writer = &mut buf[..max_size];
75 let (amt, peer) = wasi_try_ok!(__sock_asyncify(
76 env,
77 sock,
78 Rights::SOCK_RECV,
79 |socket, fd| async move {
80 let nonblocking =
81 nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
82 let timeout = socket
83 .opt_time(TimeType::ReadTimeout)
84 .ok()
85 .flatten()
86 .unwrap_or(Duration::from_secs(30));
87 socket
88 .recv_from(
89 env.tasks().deref(),
90 writer,
91 Some(timeout),
92 nonblocking,
93 peek,
94 )
95 .await
96 },
97 ));
98
99 if amt > 0 {
100 let buf: &[MaybeUninit<u8>] = &buf[..amt];
101 let buf: &[u8] = unsafe { std::mem::transmute(buf) };
102 wasi_try_ok!(copy_from_slice(buf, &memory, iovs_arr).map(|_| (amt, peer)))
103 } else {
104 (amt, peer)
105 }
106 } else {
107 let (data, peer) = wasi_try_ok!(__sock_asyncify(
108 env,
109 sock,
110 Rights::SOCK_RECV_FROM,
111 |socket, fd| async move {
112 let nonblocking = fd.inner.flags.contains(Fdflags::NONBLOCK);
113 let timeout = socket
114 .opt_time(TimeType::ReadTimeout)
115 .ok()
116 .flatten()
117 .unwrap_or(Duration::from_secs(30));
118
119 let mut buf = Vec::with_capacity(max_size);
120 unsafe {
121 buf.set_len(max_size);
122 }
123 socket
124 .recv_from(
125 env.tasks().deref(),
126 &mut buf,
127 Some(timeout),
128 nonblocking,
129 peek,
130 )
131 .await
132 .map(|(amt, addr)| {
133 unsafe {
134 buf.set_len(amt);
135 }
136 let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
137 (buf, addr)
138 })
139 }
140 ));
141
142 let data_len = data.len();
143 if data_len > 0 {
144 let mut reader = &data[..];
145 wasi_try_ok!(read_bytes(reader, &memory, iovs_arr).map(|_| (data_len, peer)))
146 } else {
147 (0, peer)
148 }
149 }
150 };
151 Span::current()
152 .record("nread", bytes_read)
153 .record("peer", format!("{peer:?}"));
154
155 wasi_try_ok!(write_ip_port(&memory, ro_addr, peer.ip(), peer.port()));
156
157 let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
158 wasi_try_mem_ok!(ro_flags.write(&memory, 0));
159 wasi_try_mem_ok!(ro_data_len.write(&memory, bytes_read));
160
161 Ok(Errno::Success)
162}