wasmer_wasix/syscalls/wasix/
sock_recv.rs

1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6/// ### `sock_recv()`
7/// Receive a message from a socket.
8/// Note: This is similar to `recv` in POSIX, though it also supports reading
9/// the data into multiple buffers in the manner of `readv`.
10///
11/// ## Parameters
12///
13/// * `ri_data` - List of scatter/gather vectors to which to store data.
14/// * `ri_flags` - Message flags.
15///
16/// ## Return
17///
18/// Number of bytes stored in ri_data and message flags.
19#[instrument(level = "trace", skip_all, fields(%sock, nread = field::Empty), ret)]
20pub fn sock_recv<M: MemorySize>(
21    mut ctx: FunctionEnvMut<'_, WasiEnv>,
22    sock: WasiFd,
23    ri_data: WasmPtr<__wasi_iovec_t<M>, M>,
24    ri_data_len: M::Offset,
25    ri_flags: RiFlags,
26    ro_data_len: WasmPtr<M::Offset, M>,
27    ro_flags: WasmPtr<RoFlags, M>,
28) -> Result<Errno, WasiError> {
29    WasiEnv::do_pending_operations(&mut ctx)?;
30
31    let env = ctx.data();
32    let fd_entry = wasi_try_ok!(env.state.fs.get_fd(sock));
33    let guard = fd_entry.inode.read();
34    // We need this hack because we use a pipe to back socket pairs
35    let use_read = matches!(guard.deref(), Kind::DuplexPipe { .. });
36    drop(guard);
37    if use_read {
38        fd_read(ctx, sock, ri_data, ri_data_len, ro_data_len)
39    } else {
40        let pid = ctx.data().pid();
41        let tid = ctx.data().tid();
42
43        let res = sock_recv_internal::<M>(
44            &mut ctx,
45            sock,
46            ri_data,
47            ri_data_len,
48            ri_flags,
49            ro_data_len,
50            ro_flags,
51        )?;
52
53        sock_recv_internal_handler(ctx, res, ro_data_len, ro_flags)
54    }
55}
56
57pub(super) fn sock_recv_internal_handler<M: MemorySize>(
58    mut ctx: FunctionEnvMut<'_, WasiEnv>,
59    res: Result<usize, Errno>,
60    ro_data_len: WasmPtr<M::Offset, M>,
61    ro_flags: WasmPtr<RoFlags, M>,
62) -> Result<Errno, WasiError> {
63    let mut ret = Errno::Success;
64    let bytes_read = match res {
65        Ok(bytes_read) => {
66            trace!(
67                %bytes_read,
68            );
69            bytes_read
70        }
71        Err(err) => {
72            let socket_err = err.name();
73            trace!(
74                %socket_err,
75            );
76            ret = err;
77            0
78        }
79    };
80    Span::current().record("nread", bytes_read);
81
82    let env = ctx.data();
83    let memory = unsafe { env.memory_view(&ctx) };
84
85    let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
86    wasi_try_mem_ok!(ro_flags.write(&memory, 0));
87    wasi_try_mem_ok!(ro_data_len.write(&memory, bytes_read));
88
89    Ok(ret)
90}
91
92/// ### `sock_recv()`
93/// Receive a message from a socket.
94/// Note: This is similar to `recv` in POSIX, though it also supports reading
95/// the data into multiple buffers in the manner of `readv`.
96///
97/// ## Parameters
98///
99/// * `ri_data` - List of scatter/gather vectors to which to store data.
100/// * `ri_flags` - Message flags.
101///
102/// ## Return
103///
104/// Number of bytes stored in ri_data and message flags.
105pub(super) fn sock_recv_internal<M: MemorySize>(
106    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
107    sock: WasiFd,
108    ri_data: WasmPtr<__wasi_iovec_t<M>, M>,
109    ri_data_len: M::Offset,
110    ri_flags: RiFlags,
111    ro_data_len: WasmPtr<M::Offset, M>,
112    ro_flags: WasmPtr<RoFlags, M>,
113) -> WasiResult<usize> {
114    let mut env = ctx.data();
115    let memory = unsafe { env.memory_view(ctx) };
116
117    let peek = (ri_flags & __WASI_SOCK_RECV_INPUT_PEEK) != 0;
118    let nonblocking_flag = (ri_flags & __WASI_SOCK_RECV_INPUT_DONT_WAIT) != 0;
119    let data = wasi_try_ok_ok!(__sock_asyncify(
120        env,
121        sock,
122        Rights::SOCK_RECV,
123        |socket, fd| async move {
124            let iovs_arr = ri_data
125                .slice(&memory, ri_data_len)
126                .map_err(mem_error_to_wasi)?;
127            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
128
129            let mut total_read = 0;
130            for iovs in iovs_arr.iter() {
131                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
132                    .slice(&memory, iovs.buf_len)
133                    .map_err(mem_error_to_wasi)?
134                    .access()
135                    .map_err(mem_error_to_wasi)?;
136
137                let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
138                let timeout = socket
139                    .opt_time(TimeType::ReadTimeout)
140                    .ok()
141                    .flatten()
142                    .unwrap_or(Duration::from_secs(30));
143
144                let local_read = match socket
145                    .recv(
146                        env.tasks().deref(),
147                        buf.as_mut_uninit(),
148                        Some(timeout),
149                        nonblocking,
150                        peek,
151                    )
152                    .await
153                {
154                    Ok(s) => s,
155                    Err(_) if total_read > 0 => break,
156                    Err(err) => return Err(err),
157                };
158                total_read += local_read;
159                if local_read != buf.len() {
160                    break;
161                }
162            }
163            Ok(total_read)
164        }
165    ));
166    Ok(Ok(data))
167}