wasmer_wasix/syscalls/wasix/
sock_recv.rs1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%sock, nread = field::Empty), ret)]
20pub fn sock_recv<M: MemorySize>(
21 mut ctx: FunctionEnvMut<'_, WasiEnv>,
22 sock: WasiFd,
23 ri_data: WasmPtr<__wasi_iovec_t<M>, M>,
24 ri_data_len: M::Offset,
25 ri_flags: RiFlags,
26 ro_data_len: WasmPtr<M::Offset, M>,
27 ro_flags: WasmPtr<RoFlags, M>,
28) -> Result<Errno, WasiError> {
29 WasiEnv::do_pending_operations(&mut ctx)?;
30
31 let env = ctx.data();
32 let fd_entry = wasi_try_ok!(env.state.fs.get_fd(sock));
33 let guard = fd_entry.inode.read();
34 let use_read = matches!(guard.deref(), Kind::DuplexPipe { .. });
36 drop(guard);
37 if use_read {
38 fd_read(ctx, sock, ri_data, ri_data_len, ro_data_len)
39 } else {
40 let pid = ctx.data().pid();
41 let tid = ctx.data().tid();
42
43 let res = sock_recv_internal::<M>(
44 &mut ctx,
45 sock,
46 ri_data,
47 ri_data_len,
48 ri_flags,
49 ro_data_len,
50 ro_flags,
51 )?;
52
53 sock_recv_internal_handler(ctx, res, ro_data_len, ro_flags)
54 }
55}
56
57pub(super) fn sock_recv_internal_handler<M: MemorySize>(
58 mut ctx: FunctionEnvMut<'_, WasiEnv>,
59 res: Result<usize, Errno>,
60 ro_data_len: WasmPtr<M::Offset, M>,
61 ro_flags: WasmPtr<RoFlags, M>,
62) -> Result<Errno, WasiError> {
63 let mut ret = Errno::Success;
64 let bytes_read = match res {
65 Ok(bytes_read) => {
66 trace!(
67 %bytes_read,
68 );
69 bytes_read
70 }
71 Err(err) => {
72 let socket_err = err.name();
73 trace!(
74 %socket_err,
75 );
76 ret = err;
77 0
78 }
79 };
80 Span::current().record("nread", bytes_read);
81
82 let env = ctx.data();
83 let memory = unsafe { env.memory_view(&ctx) };
84
85 let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
86 wasi_try_mem_ok!(ro_flags.write(&memory, 0));
87 wasi_try_mem_ok!(ro_data_len.write(&memory, bytes_read));
88
89 Ok(ret)
90}
91
92pub(super) fn sock_recv_internal<M: MemorySize>(
106 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
107 sock: WasiFd,
108 ri_data: WasmPtr<__wasi_iovec_t<M>, M>,
109 ri_data_len: M::Offset,
110 ri_flags: RiFlags,
111 ro_data_len: WasmPtr<M::Offset, M>,
112 ro_flags: WasmPtr<RoFlags, M>,
113) -> WasiResult<usize> {
114 let mut env = ctx.data();
115 let memory = unsafe { env.memory_view(ctx) };
116
117 let peek = (ri_flags & __WASI_SOCK_RECV_INPUT_PEEK) != 0;
118 let nonblocking_flag = (ri_flags & __WASI_SOCK_RECV_INPUT_DONT_WAIT) != 0;
119 let data = wasi_try_ok_ok!(__sock_asyncify(
120 env,
121 sock,
122 Rights::SOCK_RECV,
123 |socket, fd| async move {
124 let iovs_arr = ri_data
125 .slice(&memory, ri_data_len)
126 .map_err(mem_error_to_wasi)?;
127 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
128
129 let mut total_read = 0;
130 for iovs in iovs_arr.iter() {
131 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
132 .slice(&memory, iovs.buf_len)
133 .map_err(mem_error_to_wasi)?
134 .access()
135 .map_err(mem_error_to_wasi)?;
136
137 let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
138 let timeout = socket
139 .opt_time(TimeType::ReadTimeout)
140 .ok()
141 .flatten()
142 .unwrap_or(Duration::from_secs(30));
143
144 let local_read = match socket
145 .recv(
146 env.tasks().deref(),
147 buf.as_mut_uninit(),
148 Some(timeout),
149 nonblocking,
150 peek,
151 )
152 .await
153 {
154 Ok(s) => s,
155 Err(_) if total_read > 0 => break,
156 Err(err) => return Err(err),
157 };
158 total_read += local_read;
159 if local_read != buf.len() {
160 break;
161 }
162 }
163 Ok(total_read)
164 }
165 ));
166 Ok(Ok(data))
167}