wasmer_wasix/syscalls/wasix/
sock_send.rs1use std::{mem::MaybeUninit, task::Waker};
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%fd, nsent = field::Empty), ret)]
20pub fn sock_send<M: MemorySize>(
21 mut ctx: FunctionEnvMut<'_, WasiEnv>,
22 fd: WasiFd,
23 si_data: WasmPtr<__wasi_ciovec_t<M>, M>,
24 si_data_len: M::Offset,
25 si_flags: SiFlags,
26 ret_data_len: WasmPtr<M::Offset, M>,
27) -> Result<Errno, WasiError> {
28 WasiEnv::do_pending_operations(&mut ctx)?;
29
30 let env = ctx.data();
31 let fd_entry = wasi_try_ok!(env.state.fs.get_fd(fd));
32 let enable_journal = env.enable_journal;
33 let guard = fd_entry.inode.read();
34 let use_write = matches!(guard.deref(), Kind::DuplexPipe { .. } | Kind::PipeTx { .. });
36 drop(guard);
37
38 let bytes_written = if use_write {
39 let offset = { fd_entry.inner.offset.load(Ordering::Acquire) as usize };
40
41 wasi_try_ok!(fd_write_internal::<M>(
42 &mut ctx,
43 fd,
44 fd_entry,
45 FdWriteSource::Iovs {
46 iovs: si_data,
47 iovs_len: si_data_len
48 },
49 offset as u64,
50 true,
51 enable_journal
52 )?)
53 } else {
54 wasi_try_ok!(sock_send_internal::<M>(
55 &ctx,
56 fd,
57 FdWriteSource::Iovs {
58 iovs: si_data,
59 iovs_len: si_data_len
60 },
61 si_flags,
62 )?)
63 };
64
65 #[cfg(feature = "journal")]
66 if ctx.data().enable_journal {
67 JournalEffector::save_sock_send(&ctx, fd, bytes_written, si_data, si_data_len, si_flags)
68 .map_err(|err| {
69 tracing::error!("failed to save sock_send event - {}", err);
70 WasiError::Exit(ExitCode::from(Errno::Fault))
71 })?;
72 }
73
74 Span::current().record("nsent", bytes_written);
75
76 let env = ctx.data();
77 let memory = unsafe { env.memory_view(&ctx) };
78 let bytes_written: M::Offset =
79 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
80 wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));
81
82 Ok(Errno::Success)
83}
84
85pub(crate) fn sock_send_internal<M: MemorySize>(
86 ctx: &FunctionEnvMut<'_, WasiEnv>,
87 sock: WasiFd,
88 si_data: FdWriteSource<'_, M>,
89 si_flags: SiFlags,
90) -> Result<Result<usize, Errno>, WasiError> {
91 let env = ctx.data();
92 let memory = unsafe { env.memory_view(&ctx) };
93 let runtime = env.runtime.clone();
94
95 let nonblocking_flag = (si_flags & __WASI_SOCK_SEND_INPUT_DONT_WAIT) != 0;
96
97 let bytes_written = wasi_try_ok_ok!(__sock_asyncify(
98 env,
99 sock,
100 Rights::SOCK_SEND,
101 |socket, fd| async move {
102 let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
103 let timeout = socket
104 .opt_time(TimeType::WriteTimeout)
105 .ok()
106 .flatten()
107 .unwrap_or(Duration::from_secs(30));
108
109 match si_data {
110 FdWriteSource::Iovs { iovs, iovs_len } => {
111 let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
112 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
113
114 let mut sent = 0usize;
115 for iovs in iovs_arr.iter() {
116 let buf = WasmPtr::<u8, M>::new(iovs.buf)
117 .slice(&memory, iovs.buf_len)
118 .map_err(mem_error_to_wasi)?
119 .access()
120 .map_err(mem_error_to_wasi)?;
121 let local_sent = match socket
122 .send(
123 env.tasks().deref(),
124 buf.as_ref(),
125 Some(timeout),
126 nonblocking,
127 )
128 .await
129 {
130 Ok(s) => s,
131 Err(_) if sent > 0 => break,
132 Err(err) => return Err(err),
133 };
134 sent += local_sent;
135 if local_sent != buf.len() {
136 break;
137 }
138 }
139 Ok(sent)
140 }
141 FdWriteSource::Buffer(data) => {
142 socket
143 .send(
144 env.tasks().deref(),
145 data.as_ref(),
146 Some(timeout),
147 nonblocking,
148 )
149 .await
150 }
151 }
152 }
153 ));
154 trace!(
155 %bytes_written,
156 );
157
158 Ok(Ok(bytes_written))
159}