wasmer_wasix/syscalls/wasix/
sock_send_file.rs1use virtual_fs::AsyncReadExt;
2
3use super::*;
4use crate::{WasiInodes, net::socket::TimeType, syscalls::*};
5
6#[instrument(level = "trace", skip_all, fields(%sock, %in_fd, %offset, %count, nsent = field::Empty), ret)]
19pub fn sock_send_file<M: MemorySize>(
20 mut ctx: FunctionEnvMut<'_, WasiEnv>,
21 sock: WasiFd,
22 in_fd: WasiFd,
23 offset: Filesize,
24 count: Filesize,
25 ret_sent: WasmPtr<Filesize, M>,
26) -> Result<Errno, WasiError> {
27 WasiEnv::do_pending_operations(&mut ctx)?;
28
29 let total_written = wasi_try_ok!(sock_send_file_internal(
30 &mut ctx, sock, in_fd, offset, count
31 )?);
32
33 #[cfg(feature = "journal")]
34 if ctx.data().enable_journal {
35 JournalEffector::save_sock_send_file::<M>(&mut ctx, sock, in_fd, offset, total_written)
36 .map_err(|err| {
37 tracing::error!("failed to save sock_send_file event - {}", err);
38 WasiError::Exit(ExitCode::from(Errno::Fault))
39 })?;
40 }
41
42 Span::current().record("nsent", total_written);
43
44 let env = ctx.data();
45 let memory = unsafe { env.memory_view(&ctx) };
46 wasi_try_mem_ok!(ret_sent.write(&memory, total_written as Filesize));
47
48 Ok(Errno::Success)
49}
50
51#[allow(clippy::await_holding_lock)]
52pub(crate) fn sock_send_file_internal(
53 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
54 sock: WasiFd,
55 in_fd: WasiFd,
56 offset: Filesize,
57 mut count: Filesize,
58) -> Result<Result<Filesize, Errno>, WasiError> {
59 let mut env = ctx.data();
60 let net = env.net();
61 let tasks = env.tasks().clone();
62 let state = env.state.clone();
63 let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(in_fd));
64
65 fd_entry.inner.offset.store(offset, Ordering::Release);
67
68 let mut total_written: Filesize = 0;
70 while (count > 0) {
71 let sub_count = count.min(4096);
72 count -= sub_count;
73
74 let fd_flags = fd_entry.inner.flags;
75
76 let data = {
77 match in_fd {
78 __WASI_STDIN_FILENO => {
79 let mut stdin = wasi_try_ok_ok!(
80 WasiInodes::stdin_mut(&state.fs.fd_map).map_err(fs_error_into_wasi_err)
81 );
82 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
83 let mut buf = vec![0u8; sub_count as usize];
85 let amt = stdin.read(&mut buf[..]).await.map_err(map_io_err)?;
86 buf.truncate(amt);
87 Ok(buf)
88 })?);
89 env = ctx.data();
90 data
91 }
92 __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => return Ok(Err(Errno::Inval)),
93 _ => {
94 if !fd_entry.inner.rights.contains(Rights::FD_READ) {
95 return Ok(Err(Errno::Access));
97 }
98
99 let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
100 let inode = fd_entry.inode.clone();
101 let data = {
102 let mut guard = inode.write();
103 match guard.deref_mut() {
104 Kind::File { handle, .. } => {
105 if let Some(handle) = handle {
106 let data =
107 wasi_try_ok_ok!(__asyncify(ctx, None, async move {
108 let mut buf = vec![0u8; sub_count as usize];
109
110 let mut handle = handle.write().unwrap();
111 handle
112 .seek(std::io::SeekFrom::Start(offset as u64))
113 .await
114 .map_err(map_io_err)?;
115 let amt = handle
116 .read(&mut buf[..])
117 .await
118 .map_err(map_io_err)?;
119 buf.truncate(amt);
120 Ok(buf)
121 })?);
122 env = ctx.data();
123 data
124 } else {
125 return Ok(Err(Errno::Inval));
126 }
127 }
128 Kind::Socket { socket, .. } => {
129 let socket = socket.clone();
130 let tasks = tasks.clone();
131 drop(guard);
132
133 let read_timeout = socket
134 .opt_time(TimeType::WriteTimeout)
135 .ok()
136 .flatten()
137 .unwrap_or(Duration::from_secs(30));
138
139 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async {
140 let mut buf = Vec::with_capacity(sub_count as usize);
141 unsafe {
142 buf.set_len(sub_count as usize);
143 }
144 socket
145 .recv(
146 tasks.deref(),
147 &mut buf,
148 Some(read_timeout),
149 false,
150 false,
151 )
152 .await
153 .map(|amt| {
154 unsafe {
155 buf.set_len(amt);
156 }
157 let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
158 buf
159 })
160 })?);
161 env = ctx.data();
162 data
163 }
164 Kind::PipeRx { rx } => {
165 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
166 let mut buf = vec![0u8; sub_count as usize];
168 let amt = virtual_fs::AsyncReadExt::read(rx, &mut buf[..])
169 .await
170 .map_err(map_io_err)?;
171 buf.truncate(amt);
172 Ok(buf)
173 })?);
174 env = ctx.data();
175 data
176 }
177 Kind::DuplexPipe { pipe } => {
178 let data = wasi_try_ok_ok!(__asyncify(ctx, None, async move {
179 let mut buf = vec![0u8; sub_count as usize];
181 let amt = virtual_fs::AsyncReadExt::read(pipe, &mut buf[..])
182 .await
183 .map_err(map_io_err)?;
184 buf.truncate(amt);
185 Ok(buf)
186 })?);
187 env = ctx.data();
188 data
189 }
190 Kind::PipeTx { .. }
191 | Kind::Epoll { .. }
192 | Kind::EventNotifications { .. } => {
193 return Ok(Err(Errno::Inval));
194 }
195 Kind::Dir { .. } | Kind::Root { .. } => {
196 return Ok(Err(Errno::Isdir));
197 }
198 Kind::Symlink { .. } => unimplemented!("Symlinks in wasi::fd_read"),
199 Kind::Buffer { buffer } => {
200 let mut buf = vec![0u8; sub_count as usize];
202
203 let mut buf_read = &buffer[offset..];
204 let amt = wasi_try_ok_ok!(
205 std::io::Read::read(&mut buf_read, &mut buf[..])
206 .map_err(map_io_err)
207 );
208 buf.truncate(amt);
209 buf
210 }
211 }
212 };
213
214 fd_entry
215 .inner
216 .offset
217 .fetch_add(data.len() as u64, Ordering::AcqRel);
218
219 data
220 }
221 }
222 };
223
224 let tasks = ctx.data().tasks().clone();
226 let bytes_written = wasi_try_ok_ok!(__sock_asyncify_mut(
227 ctx,
228 sock,
229 Rights::SOCK_SEND,
230 |socket, fd| async move {
231 let write_timeout = socket
232 .opt_time(TimeType::ReadTimeout)
233 .ok()
234 .flatten()
235 .unwrap_or(Duration::from_secs(30));
236 socket
237 .send(tasks.deref(), &data, Some(write_timeout), true)
238 .await
239 },
240 ));
241 env = ctx.data();
242
243 total_written += bytes_written as u64;
244 }
245
246 Ok(Ok(total_written))
247}