wasmer_wasix/syscalls/wasix/
sock_send_to.rs1use std::task::Waker;
2
3use super::*;
4use crate::{net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%sock, ?addr, nsent = field::Empty), ret)]
21pub fn sock_send_to<M: MemorySize>(
22 mut ctx: FunctionEnvMut<'_, WasiEnv>,
23 sock: WasiFd,
24 si_data: WasmPtr<__wasi_ciovec_t<M>, M>,
25 si_data_len: M::Offset,
26 si_flags: SiFlags,
27 addr: WasmPtr<__wasi_addr_port_t, M>,
28 ret_data_len: WasmPtr<M::Offset, M>,
29) -> Result<Errno, WasiError> {
30 WasiEnv::do_pending_operations(&mut ctx)?;
31
32 let env = ctx.data();
33 let memory = unsafe { env.memory_view(&ctx) };
34 let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
35
36 let (addr_ip, addr_port) = {
37 let memory = unsafe { env.memory_view(&ctx) };
38 wasi_try_ok!(read_ip_port(&memory, addr))
39 };
40 let addr = SocketAddr::new(addr_ip, addr_port);
41 Span::current().record("addr", format!("{addr:?}"));
42
43 let bytes_written = wasi_try_ok!(sock_send_to_internal(
44 &mut ctx,
45 sock,
46 FdWriteSource::Iovs {
47 iovs: si_data,
48 iovs_len: si_data_len
49 },
50 si_flags,
51 addr,
52 )?);
53
54 #[cfg(feature = "journal")]
55 if ctx.data().enable_journal {
56 JournalEffector::save_sock_send_to::<M>(
57 &ctx,
58 sock,
59 bytes_written,
60 si_data,
61 si_data_len,
62 addr,
63 si_flags,
64 )
65 .map_err(|err| {
66 tracing::error!("failed to save sock_send_to event - {}", err);
67 WasiError::Exit(ExitCode::from(Errno::Fault))
68 })?;
69 }
70
71 Span::current().record("nsent", bytes_written);
72
73 let env = ctx.data();
74 let memory = unsafe { env.memory_view(&ctx) };
75 let bytes_written: M::Offset =
76 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
77 wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));
78
79 Ok(Errno::Success)
80}
81
82pub(crate) fn sock_send_to_internal<M: MemorySize>(
83 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
84 sock: WasiFd,
85 si_data: FdWriteSource<'_, M>,
86 si_flags: SiFlags,
87 addr: SocketAddr,
88) -> Result<Result<usize, Errno>, WasiError> {
89 let env = ctx.data();
90 let net = env.net().clone();
91 let tasks = ctx.data().tasks().clone();
92
93 wasi_try_ok_ok!(__sock_upgrade(
95 ctx,
96 sock,
97 Rights::SOCK_SEND_TO,
98 move |mut socket, flags| async move { socket.auto_bind_udp(tasks.deref(), net.deref()).await }
99 ));
100
101 let env = ctx.data();
102 let memory = unsafe { env.memory_view(&ctx) };
103
104 let nonblocking_flag = (si_flags & __WASI_SOCK_SEND_INPUT_DONT_WAIT) != 0;
105
106 let bytes_written = {
107 wasi_try_ok_ok!(__sock_asyncify(
108 env,
109 sock,
110 Rights::SOCK_SEND_TO,
111 |socket, fd| async move {
112 let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
113 let timeout = socket
114 .opt_time(TimeType::WriteTimeout)
115 .ok()
116 .flatten()
117 .unwrap_or(Duration::from_secs(30));
118
119 match si_data {
120 FdWriteSource::Iovs { iovs, iovs_len } => {
121 let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
122 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
123
124 let mut sent = 0usize;
125 for iovs in iovs_arr.iter() {
126 let buf = WasmPtr::<u8, M>::new(iovs.buf)
127 .slice(&memory, iovs.buf_len)
128 .map_err(mem_error_to_wasi)?
129 .access()
130 .map_err(mem_error_to_wasi)?;
131 let local_sent = match socket
132 .send_to::<M>(
133 env.tasks().deref(),
134 buf.as_ref(),
135 addr,
136 Some(timeout),
137 nonblocking,
138 )
139 .await
140 {
141 Ok(s) => s,
142 Err(_) if sent > 0 => break,
143 Err(err) => return Err(err),
144 };
145 sent += local_sent;
146 if local_sent != buf.len() {
147 break;
148 }
149 }
150 Ok(sent)
151 }
152 FdWriteSource::Buffer(data) => {
153 socket
154 .send_to::<M>(
155 env.tasks().deref(),
156 data.as_ref(),
157 addr,
158 Some(timeout),
159 nonblocking,
160 )
161 .await
162 }
163 }
164 },
165 ))
166 };
167 trace!(
168 %bytes_written,
169 );
170
171 Ok(Ok(bytes_written))
172}