wasmer_wasix/syscalls/wasix/
sock_send.rs

1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6/// ### `sock_send()`
7/// Send a message on a socket.
8/// Note: This is similar to `send` in POSIX, though it also supports writing
9/// the data from multiple buffers in the manner of `writev`.
10///
11/// ## Parameters
12///
13/// * `si_data` - List of scatter/gather vectors to which to retrieve data
14/// * `si_flags` - Message flags.
15///
16/// ## Return
17///
18/// Number of bytes transmitted.
19#[instrument(level = "trace", skip_all, fields(%fd, nsent = field::Empty), ret)]
20pub fn sock_send<M: MemorySize>(
21    mut ctx: FunctionEnvMut<'_, WasiEnv>,
22    fd: WasiFd,
23    si_data: WasmPtr<__wasi_ciovec_t<M>, M>,
24    si_data_len: M::Offset,
25    si_flags: SiFlags,
26    ret_data_len: WasmPtr<M::Offset, M>,
27) -> Result<Errno, WasiError> {
28    WasiEnv::do_pending_operations(&mut ctx)?;
29
30    let env = ctx.data();
31    let fd_entry = wasi_try_ok!(env.state.fs.get_fd(fd));
32    let enable_journal = env.enable_journal;
33    let guard = fd_entry.inode.read();
34    // We need this hack because we use a pipe to back socket pairs
35    let use_write = matches!(guard.deref(), Kind::DuplexPipe { .. });
36    drop(guard);
37
38    let bytes_written = if use_write {
39        let offset = {
40            let state = env.state.clone();
41            let inodes = state.inodes.clone();
42
43            let fd_entry = wasi_try_ok!(state.fs.get_fd(fd));
44            fd_entry.inner.offset.load(Ordering::Acquire) as usize
45        };
46
47        wasi_try_ok!(fd_write_internal::<M>(
48            &mut ctx,
49            fd,
50            FdWriteSource::Iovs {
51                iovs: si_data,
52                iovs_len: si_data_len
53            },
54            offset as u64,
55            true,
56            enable_journal
57        )?)
58    } else {
59        wasi_try_ok!(sock_send_internal::<M>(
60            &ctx,
61            fd,
62            FdWriteSource::Iovs {
63                iovs: si_data,
64                iovs_len: si_data_len
65            },
66            si_flags,
67        )?)
68    };
69
70    #[cfg(feature = "journal")]
71    if ctx.data().enable_journal {
72        JournalEffector::save_sock_send(&ctx, fd, bytes_written, si_data, si_data_len, si_flags)
73            .map_err(|err| {
74                tracing::error!("failed to save sock_send event - {}", err);
75                WasiError::Exit(ExitCode::from(Errno::Fault))
76            })?;
77    }
78
79    Span::current().record("nsent", bytes_written);
80
81    let env = ctx.data();
82    let memory = unsafe { env.memory_view(&ctx) };
83    let bytes_written: M::Offset =
84        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
85    wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));
86
87    Ok(Errno::Success)
88}
89
90pub(crate) fn sock_send_internal<M: MemorySize>(
91    ctx: &FunctionEnvMut<'_, WasiEnv>,
92    sock: WasiFd,
93    si_data: FdWriteSource<'_, M>,
94    si_flags: SiFlags,
95) -> Result<Result<usize, Errno>, WasiError> {
96    let env = ctx.data();
97    let memory = unsafe { env.memory_view(&ctx) };
98    let runtime = env.runtime.clone();
99
100    let nonblocking_flag = (si_flags & __WASI_SOCK_SEND_INPUT_DONT_WAIT) != 0;
101
102    let bytes_written = wasi_try_ok_ok!(__sock_asyncify(
103        env,
104        sock,
105        Rights::SOCK_SEND,
106        |socket, fd| async move {
107            let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
108            let timeout = socket
109                .opt_time(TimeType::WriteTimeout)
110                .ok()
111                .flatten()
112                .unwrap_or(Duration::from_secs(30));
113
114            match si_data {
115                FdWriteSource::Iovs { iovs, iovs_len } => {
116                    let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
117                    let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
118
119                    let mut sent = 0usize;
120                    for iovs in iovs_arr.iter() {
121                        let buf = WasmPtr::<u8, M>::new(iovs.buf)
122                            .slice(&memory, iovs.buf_len)
123                            .map_err(mem_error_to_wasi)?
124                            .access()
125                            .map_err(mem_error_to_wasi)?;
126                        let local_sent = match socket
127                            .send(
128                                env.tasks().deref(),
129                                buf.as_ref(),
130                                Some(timeout),
131                                nonblocking,
132                            )
133                            .await
134                        {
135                            Ok(s) => s,
136                            Err(_) if sent > 0 => break,
137                            Err(err) => return Err(err),
138                        };
139                        sent += local_sent;
140                        if local_sent != buf.len() {
141                            break;
142                        }
143                    }
144                    Ok(sent)
145                }
146                FdWriteSource::Buffer(data) => {
147                    socket
148                        .send(
149                            env.tasks().deref(),
150                            data.as_ref(),
151                            Some(timeout),
152                            nonblocking,
153                        )
154                        .await
155                }
156            }
157        }
158    ));
159    trace!(
160        %bytes_written,
161    );
162
163    Ok(Ok(bytes_written))
164}