wasmer_wasix/syscalls/wasix/
sock_send_file.rs1use virtual_fs::AsyncReadExt;
2
3use super::*;
4use crate::{WasiInodes, net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%sock, %in_fd, %offset, %count, nsent = field::Empty), ret)]
19pub fn sock_send_file<M: MemorySize>(
20 mut ctx: FunctionEnvMut<'_, WasiEnv>,
21 sock: WasiFd,
22 in_fd: WasiFd,
23 offset: Filesize,
24 count: Filesize,
25 ret_sent: WasmPtr<Filesize, M>,
26) -> Result<Errno, WasiError> {
27 WasiEnv::do_pending_operations(&mut ctx)?;
28
29 let total_written = wasi_try_ok!(sock_send_file_internal(
30 &mut ctx, sock, in_fd, offset, count
31 )?);
32
33 #[cfg(feature = "journal")]
34 if ctx.data().enable_journal {
35 JournalEffector::save_sock_send_file::<M>(&mut ctx, sock, in_fd, offset, total_written)
36 .map_err(|err| {
37 tracing::error!("failed to save sock_send_file event - {}", err);
38 WasiError::Exit(ExitCode::from(Errno::Fault))
39 })?;
40 }
41
42 Span::current().record("nsent", total_written);
43
44 let env = ctx.data();
45 let memory = unsafe { env.memory_view(&ctx) };
46 wasi_try_mem_ok!(ret_sent.write(&memory, total_written as Filesize));
47
48 Ok(Errno::Success)
49}
50
51#[allow(clippy::await_holding_lock)]
52pub(crate) fn sock_send_file_internal(
53 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
54 sock: WasiFd,
55 in_fd: WasiFd,
56 offset: Filesize,
57 mut count: Filesize,
58) -> Result<Result<Filesize, Errno>, WasiError> {
59 let mut env = ctx.data();
60 let net = env.net();
61 let tasks = env.tasks().clone();
62 let state = env.state.clone();
63
64 {
66 let mut fd_map = state.fs.fd_map.write().unwrap();
67 let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(in_fd).ok_or(Errno::Badf));
68 fd_entry.offset.store(offset, Ordering::Release);
69 }
70
71 let mut total_written: Filesize = 0;
73 while (count > 0) {
74 let sub_count = count.min(4096);
75 count -= sub_count;
76
77 let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(in_fd));
78 let fd_flags = fd_entry.inner.flags;
79
80 let data = {
81 match in_fd {
82 __WASI_STDIN_FILENO => {
83 let mut stdin = wasi_try_ok_ok!(
84 WasiInodes::stdin_mut(&state.fs.fd_map).map_err(fs_error_into_wasi_err)
85 );
86 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
87 let mut buf = vec![0u8; sub_count as usize];
89 let amt = stdin.read(&mut buf[..]).await.map_err(map_io_err)?;
90 buf.truncate(amt);
91 Ok(buf)
92 })?);
93 env = ctx.data();
94 data
95 }
96 __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => return Ok(Err(Errno::Inval)),
97 _ => {
98 if !fd_entry.inner.rights.contains(Rights::FD_READ) {
99 return Ok(Err(Errno::Access));
101 }
102
103 let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
104 let inode = fd_entry.inode;
105 let data = {
106 let mut guard = inode.write();
107 match guard.deref_mut() {
108 Kind::File { handle, .. } => {
109 if let Some(handle) = handle {
110 let data =
111 wasi_try_ok_ok!(__asyncify(ctx, None, async move {
112 let mut buf = vec![0u8; sub_count as usize];
113
114 let mut handle = handle.write().unwrap();
115 handle
116 .seek(std::io::SeekFrom::Start(offset as u64))
117 .await
118 .map_err(map_io_err)?;
119 let amt = handle
120 .read(&mut buf[..])
121 .await
122 .map_err(map_io_err)?;
123 buf.truncate(amt);
124 Ok(buf)
125 })?);
126 env = ctx.data();
127 data
128 } else {
129 return Ok(Err(Errno::Inval));
130 }
131 }
132 Kind::Socket { socket, .. } => {
133 let socket = socket.clone();
134 let tasks = tasks.clone();
135 drop(guard);
136
137 let read_timeout = socket
138 .opt_time(TimeType::WriteTimeout)
139 .ok()
140 .flatten()
141 .unwrap_or(Duration::from_secs(30));
142
143 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async {
144 let mut buf = Vec::with_capacity(sub_count as usize);
145 unsafe {
146 buf.set_len(sub_count as usize);
147 }
148 socket
149 .recv(
150 tasks.deref(),
151 &mut buf,
152 Some(read_timeout),
153 false,
154 false,
155 )
156 .await
157 .map(|amt| {
158 unsafe {
159 buf.set_len(amt);
160 }
161 let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
162 buf
163 })
164 })?);
165 env = ctx.data();
166 data
167 }
168 Kind::PipeRx { rx } => {
169 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
170 let mut buf = vec![0u8; sub_count as usize];
172 let amt = virtual_fs::AsyncReadExt::read(rx, &mut buf[..])
173 .await
174 .map_err(map_io_err)?;
175 buf.truncate(amt);
176 Ok(buf)
177 })?);
178 env = ctx.data();
179 data
180 }
181 Kind::DuplexPipe { pipe } => {
182 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
183 let mut buf = vec![0u8; sub_count as usize];
185 let amt = virtual_fs::AsyncReadExt::read(pipe, &mut buf[..])
186 .await
187 .map_err(map_io_err)?;
188 buf.truncate(amt);
189 Ok(buf)
190 })?);
191 env = ctx.data();
192 data
193 }
194 Kind::PipeTx { .. }
195 | Kind::Epoll { .. }
196 | Kind::EventNotifications { .. } => {
197 return Ok(Err(Errno::Inval));
198 }
199 Kind::Dir { .. } | Kind::Root { .. } => {
200 return Ok(Err(Errno::Isdir));
201 }
202 Kind::Symlink { .. } => unimplemented!("Symlinks in wasi::fd_read"),
203 Kind::Buffer { buffer } => {
204 let mut buf = vec![0u8; sub_count as usize];
206
207 let mut buf_read = &buffer[offset..];
208 let amt = wasi_try_ok_ok!(
209 std::io::Read::read(&mut buf_read, &mut buf[..])
210 .map_err(map_io_err)
211 );
212 buf.truncate(amt);
213 buf
214 }
215 }
216 };
217
218 let mut fd_map = state.fs.fd_map.write().unwrap();
220 let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(in_fd).ok_or(Errno::Badf));
221 fd_entry
222 .offset
223 .fetch_add(data.len() as u64, Ordering::AcqRel);
224
225 data
226 }
227 }
228 };
229
230 let tasks = ctx.data().tasks().clone();
232 let bytes_written = wasi_try_ok_ok!(__sock_asyncify_mut(
233 ctx,
234 sock,
235 Rights::SOCK_SEND,
236 |socket, fd| async move {
237 let write_timeout = socket
238 .opt_time(TimeType::ReadTimeout)
239 .ok()
240 .flatten()
241 .unwrap_or(Duration::from_secs(30));
242 socket
243 .send(tasks.deref(), &data, Some(write_timeout), true)
244 .await
245 },
246 ));
247 env = ctx.data();
248
249 total_written += bytes_written as u64;
250 }
251
252 Ok(Ok(total_written))
253}