wasmer_wasix/syscalls/wasix/
sock_send.rs

1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6/// ### `sock_send()`
7/// Send a message on a socket.
8/// Note: This is similar to `send` in POSIX, though it also supports writing
9/// the data from multiple buffers in the manner of `writev`.
10///
11/// ## Parameters
12///
13/// * `si_data` - List of scatter/gather vectors to which to retrieve data
14/// * `si_flags` - Message flags.
15///
16/// ## Return
17///
18/// Number of bytes transmitted.
19#[instrument(level = "trace", skip_all, fields(%fd, nsent = field::Empty), ret)]
20pub fn sock_send<M: MemorySize>(
21    mut ctx: FunctionEnvMut<'_, WasiEnv>,
22    fd: WasiFd,
23    si_data: WasmPtr<__wasi_ciovec_t<M>, M>,
24    si_data_len: M::Offset,
25    si_flags: SiFlags,
26    ret_data_len: WasmPtr<M::Offset, M>,
27) -> Result<Errno, WasiError> {
28    WasiEnv::do_pending_operations(&mut ctx)?;
29
30    let env = ctx.data();
31    let fd_entry = wasi_try_ok!(env.state.fs.get_fd(fd));
32    let enable_journal = env.enable_journal;
33    let guard = fd_entry.inode.read();
34    // Some guests route socket-like wakeups through pipe-backed fds.
35    let use_write = matches!(guard.deref(), Kind::DuplexPipe { .. } | Kind::PipeTx { .. });
36    drop(guard);
37
38    let bytes_written = if use_write {
39        let offset = { fd_entry.inner.offset.load(Ordering::Acquire) as usize };
40
41        wasi_try_ok!(fd_write_internal::<M>(
42            &mut ctx,
43            fd,
44            fd_entry,
45            FdWriteSource::Iovs {
46                iovs: si_data,
47                iovs_len: si_data_len
48            },
49            offset as u64,
50            true,
51            enable_journal
52        )?)
53    } else {
54        wasi_try_ok!(sock_send_internal::<M>(
55            &ctx,
56            fd,
57            FdWriteSource::Iovs {
58                iovs: si_data,
59                iovs_len: si_data_len
60            },
61            si_flags,
62        )?)
63    };
64
65    #[cfg(feature = "journal")]
66    if ctx.data().enable_journal {
67        JournalEffector::save_sock_send(&ctx, fd, bytes_written, si_data, si_data_len, si_flags)
68            .map_err(|err| {
69                tracing::error!("failed to save sock_send event - {}", err);
70                WasiError::Exit(ExitCode::from(Errno::Fault))
71            })?;
72    }
73
74    Span::current().record("nsent", bytes_written);
75
76    let env = ctx.data();
77    let memory = unsafe { env.memory_view(&ctx) };
78    let bytes_written: M::Offset =
79        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
80    wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));
81
82    Ok(Errno::Success)
83}
84
85pub(crate) fn sock_send_internal<M: MemorySize>(
86    ctx: &FunctionEnvMut<'_, WasiEnv>,
87    sock: WasiFd,
88    si_data: FdWriteSource<'_, M>,
89    si_flags: SiFlags,
90) -> Result<Result<usize, Errno>, WasiError> {
91    let env = ctx.data();
92    let memory = unsafe { env.memory_view(&ctx) };
93    let runtime = env.runtime.clone();
94
95    let nonblocking_flag = (si_flags & __WASI_SOCK_SEND_INPUT_DONT_WAIT) != 0;
96
97    let bytes_written = wasi_try_ok_ok!(__sock_asyncify(
98        env,
99        sock,
100        Rights::SOCK_SEND,
101        |socket, fd| async move {
102            let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
103            let timeout = socket
104                .opt_time(TimeType::WriteTimeout)
105                .ok()
106                .flatten()
107                .unwrap_or(Duration::from_secs(30));
108
109            match si_data {
110                FdWriteSource::Iovs { iovs, iovs_len } => {
111                    let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
112                    let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
113
114                    let mut sent = 0usize;
115                    for iovs in iovs_arr.iter() {
116                        let buf = WasmPtr::<u8, M>::new(iovs.buf)
117                            .slice(&memory, iovs.buf_len)
118                            .map_err(mem_error_to_wasi)?
119                            .access()
120                            .map_err(mem_error_to_wasi)?;
121                        let local_sent = match socket
122                            .send(
123                                env.tasks().deref(),
124                                buf.as_ref(),
125                                Some(timeout),
126                                nonblocking,
127                            )
128                            .await
129                        {
130                            Ok(s) => s,
131                            Err(_) if sent > 0 => break,
132                            Err(err) => return Err(err),
133                        };
134                        sent += local_sent;
135                        if local_sent != buf.len() {
136                            break;
137                        }
138                    }
139                    Ok(sent)
140                }
141                FdWriteSource::Buffer(data) => {
142                    socket
143                        .send(
144                            env.tasks().deref(),
145                            data.as_ref(),
146                            Some(timeout),
147                            nonblocking,
148                        )
149                        .await
150                }
151            }
152        }
153    ));
154    trace!(
155        %bytes_written,
156    );
157
158    Ok(Ok(bytes_written))
159}