wasmer_wasix/syscalls/wasix/
sock_send.rs1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%fd, nsent = field::Empty), ret)]
20pub fn sock_send<M: MemorySize>(
21 mut ctx: FunctionEnvMut<'_, WasiEnv>,
22 fd: WasiFd,
23 si_data: WasmPtr<__wasi_ciovec_t<M>, M>,
24 si_data_len: M::Offset,
25 si_flags: SiFlags,
26 ret_data_len: WasmPtr<M::Offset, M>,
27) -> Result<Errno, WasiError> {
28 WasiEnv::do_pending_operations(&mut ctx)?;
29
30 let env = ctx.data();
31 let fd_entry = wasi_try_ok!(env.state.fs.get_fd(fd));
32 let enable_journal = env.enable_journal;
33 let guard = fd_entry.inode.read();
34 let use_write = matches!(guard.deref(), Kind::DuplexPipe { .. });
36 drop(guard);
37
38 let bytes_written = if use_write {
39 let offset = {
40 let state = env.state.clone();
41 let inodes = state.inodes.clone();
42
43 let fd_entry = wasi_try_ok!(state.fs.get_fd(fd));
44 fd_entry.inner.offset.load(Ordering::Acquire) as usize
45 };
46
47 wasi_try_ok!(fd_write_internal::<M>(
48 &mut ctx,
49 fd,
50 FdWriteSource::Iovs {
51 iovs: si_data,
52 iovs_len: si_data_len
53 },
54 offset as u64,
55 true,
56 enable_journal
57 )?)
58 } else {
59 wasi_try_ok!(sock_send_internal::<M>(
60 &ctx,
61 fd,
62 FdWriteSource::Iovs {
63 iovs: si_data,
64 iovs_len: si_data_len
65 },
66 si_flags,
67 )?)
68 };
69
70 #[cfg(feature = "journal")]
71 if ctx.data().enable_journal {
72 JournalEffector::save_sock_send(&ctx, fd, bytes_written, si_data, si_data_len, si_flags)
73 .map_err(|err| {
74 tracing::error!("failed to save sock_send event - {}", err);
75 WasiError::Exit(ExitCode::from(Errno::Fault))
76 })?;
77 }
78
79 Span::current().record("nsent", bytes_written);
80
81 let env = ctx.data();
82 let memory = unsafe { env.memory_view(&ctx) };
83 let bytes_written: M::Offset =
84 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
85 wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));
86
87 Ok(Errno::Success)
88}
89
90pub(crate) fn sock_send_internal<M: MemorySize>(
91 ctx: &FunctionEnvMut<'_, WasiEnv>,
92 sock: WasiFd,
93 si_data: FdWriteSource<'_, M>,
94 si_flags: SiFlags,
95) -> Result<Result<usize, Errno>, WasiError> {
96 let env = ctx.data();
97 let memory = unsafe { env.memory_view(&ctx) };
98 let runtime = env.runtime.clone();
99
100 let nonblocking_flag = (si_flags & __WASI_SOCK_SEND_INPUT_DONT_WAIT) != 0;
101
102 let bytes_written = wasi_try_ok_ok!(__sock_asyncify(
103 env,
104 sock,
105 Rights::SOCK_SEND,
106 |socket, fd| async move {
107 let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
108 let timeout = socket
109 .opt_time(TimeType::WriteTimeout)
110 .ok()
111 .flatten()
112 .unwrap_or(Duration::from_secs(30));
113
114 match si_data {
115 FdWriteSource::Iovs { iovs, iovs_len } => {
116 let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
117 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
118
119 let mut sent = 0usize;
120 for iovs in iovs_arr.iter() {
121 let buf = WasmPtr::<u8, M>::new(iovs.buf)
122 .slice(&memory, iovs.buf_len)
123 .map_err(mem_error_to_wasi)?
124 .access()
125 .map_err(mem_error_to_wasi)?;
126 let local_sent = match socket
127 .send(
128 env.tasks().deref(),
129 buf.as_ref(),
130 Some(timeout),
131 nonblocking,
132 )
133 .await
134 {
135 Ok(s) => s,
136 Err(_) if sent > 0 => break,
137 Err(err) => return Err(err),
138 };
139 sent += local_sent;
140 if local_sent != buf.len() {
141 break;
142 }
143 }
144 Ok(sent)
145 }
146 FdWriteSource::Buffer(data) => {
147 socket
148 .send(
149 env.tasks().deref(),
150 data.as_ref(),
151 Some(timeout),
152 nonblocking,
153 )
154 .await
155 }
156 }
157 }
158 ));
159 trace!(
160 %bytes_written,
161 );
162
163 Ok(Ok(bytes_written))
164}