wasmer_wasix/syscalls/wasi/
fd_read.rs1use std::{collections::VecDeque, task::Waker};
2
3use virtual_fs::{AsyncReadExt, DeviceFile, ReadBuf};
4
5use super::*;
6use crate::{
7 fs::NotificationInner,
8 journal::SnapshotTrigger,
9 net::socket::TimeType,
10 os::task::process::{MaybeCheckpointResult, WasiProcessCheckpoint, WasiProcessInner},
11 syscalls::*,
12};
13
14#[instrument(level = "trace", skip_all, fields(%fd, nread = field::Empty), ret)]
28pub fn fd_read<M: MemorySize>(
29 mut ctx: FunctionEnvMut<'_, WasiEnv>,
30 fd: WasiFd,
31 iovs: WasmPtr<__wasi_iovec_t<M>, M>,
32 iovs_len: M::Offset,
33 nread: WasmPtr<M::Offset, M>,
34) -> Result<Errno, WasiError> {
35 WasiEnv::do_pending_operations(&mut ctx)?;
36
37 let pid = ctx.data().pid();
38 let tid = ctx.data().tid();
39
40 let offset = {
41 let mut env = ctx.data();
42 let state = env.state.clone();
43 let inodes = state.inodes.clone();
44
45 let fd_entry = wasi_try_ok!(state.fs.get_fd(fd));
46 fd_entry.inner.offset.load(Ordering::Acquire) as usize
47 };
48
49 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
50 if fd == DeviceFile::STDIN {
51 ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
52 }
53
54 let res = fd_read_internal::<M>(&mut ctx, fd, iovs, iovs_len, offset, nread, true)?;
55 fd_read_internal_handler(ctx, res, nread)
56}
57
58#[instrument(level = "trace", skip_all, fields(%fd, %offset, ?nread), ret)]
74pub fn fd_pread<M: MemorySize>(
75 mut ctx: FunctionEnvMut<'_, WasiEnv>,
76 fd: WasiFd,
77 iovs: WasmPtr<__wasi_iovec_t<M>, M>,
78 iovs_len: M::Offset,
79 offset: Filesize,
80 nread: WasmPtr<M::Offset, M>,
81) -> Result<Errno, WasiError> {
82 let pid = ctx.data().pid();
83 let tid = ctx.data().tid();
84
85 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
86 if fd == DeviceFile::STDIN {
87 ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
88 }
89
90 let res = fd_read_internal::<M>(&mut ctx, fd, iovs, iovs_len, offset as usize, nread, false)?;
91 fd_read_internal_handler::<M>(ctx, res, nread)
92}
93
94pub(crate) fn fd_read_internal_handler<M: MemorySize>(
95 mut ctx: FunctionEnvMut<'_, WasiEnv>,
96 res: Result<usize, Errno>,
97 nread: WasmPtr<M::Offset, M>,
98) -> Result<Errno, WasiError> {
99 let mut ret = Errno::Success;
100 let bytes_read = match res {
101 Ok(bytes_read) => bytes_read,
102 Err(err) => {
103 ret = err;
104 0
105 }
106 };
107 Span::current().record("nread", bytes_read);
108
109 let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
110
111 let env = ctx.data();
112 let memory = unsafe { env.memory_view(&ctx) };
113
114 let env = ctx.data();
115 let memory = unsafe { env.memory_view(&ctx) };
116 let nread_ref = nread.deref(&memory);
117 wasi_try_mem_ok!(nread_ref.write(bytes_read));
118
119 Ok(ret)
120}
121
122#[allow(clippy::await_holding_lock)]
123pub(crate) fn fd_read_internal<M: MemorySize>(
124 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
125 fd: WasiFd,
126 iovs: WasmPtr<__wasi_iovec_t<M>, M>,
127 iovs_len: M::Offset,
128 offset: usize,
129 nread: WasmPtr<M::Offset, M>,
130 should_update_cursor: bool,
131) -> WasiResult<usize> {
132 let env = ctx.data();
133 let memory = unsafe { env.memory_view(&ctx) };
134 let state = env.state();
135
136 let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(fd));
137 let is_stdio = fd_entry.is_stdio;
138
139 let bytes_read = {
140 if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_READ) {
141 return Ok(Err(Errno::Access));
143 }
144
145 let inode = fd_entry.inode;
146 let fd_flags = fd_entry.inner.flags;
147
148 let (bytes_read, can_update_cursor) = {
149 let mut guard = inode.write();
150 match guard.deref_mut() {
151 Kind::File { handle, .. } => {
152 if let Some(handle) = handle {
153 let handle = handle.clone();
154
155 drop(guard);
156
157 let res = __asyncify_light(
158 env,
159 if fd_flags.contains(Fdflags::NONBLOCK) {
160 Some(Duration::ZERO)
161 } else {
162 None
163 },
164 async move {
165 let mut handle = match handle.write() {
166 Ok(a) => a,
167 Err(_) => return Err(Errno::Fault),
168 };
169 if !is_stdio {
170 handle
171 .seek(std::io::SeekFrom::Start(offset as u64))
172 .await
173 .map_err(map_io_err)?;
174 }
175
176 let mut total_read = 0usize;
177
178 let iovs_arr =
179 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
180 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
181 for iovs in iovs_arr.iter() {
182 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
183 .slice(&memory, iovs.buf_len)
184 .map_err(mem_error_to_wasi)?
185 .access()
186 .map_err(mem_error_to_wasi)?;
187 let r = handle.read(buf.as_mut()).await.map_err(|err| {
188 let err = From::<std::io::Error>::from(err);
189 match err {
190 Errno::Again => {
191 if is_stdio {
192 Errno::Badf
193 } else {
194 Errno::Again
195 }
196 }
197 a => a,
198 }
199 });
200 let local_read = match r {
201 Ok(s) => s,
202 Err(_) if total_read > 0 => break,
203 Err(err) => return Err(err),
204 };
205 total_read += local_read;
206 if local_read != buf.len() {
207 break;
208 }
209 }
210 Ok(total_read)
211 },
212 );
213 let read = wasi_try_ok_ok!(res?.map_err(|err| match err {
214 Errno::Timedout => Errno::Again,
215 a => a,
216 }));
217 (read, true)
218 } else {
219 return Ok(Err(Errno::Badf));
220 }
221 }
222 Kind::Socket { socket } => {
223 let socket = socket.clone();
224
225 drop(guard);
226
227 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
228 let timeout = socket
229 .opt_time(TimeType::ReadTimeout)
230 .ok()
231 .flatten()
232 .unwrap_or(Duration::from_secs(30));
233
234 let tasks = env.tasks().clone();
235 let res = __asyncify_light(
236 env,
237 if fd_flags.contains(Fdflags::NONBLOCK) {
238 Some(Duration::ZERO)
239 } else {
240 None
241 },
242 async move {
243 let mut total_read = 0usize;
244
245 let iovs_arr =
246 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
247 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
248 for iovs in iovs_arr.iter() {
249 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
250 .slice(&memory, iovs.buf_len)
251 .map_err(mem_error_to_wasi)?
252 .access()
253 .map_err(mem_error_to_wasi)?;
254
255 let local_read = socket
256 .recv(
257 tasks.deref(),
258 buf.as_mut_uninit(),
259 Some(timeout),
260 nonblocking,
261 false,
262 )
263 .await?;
264 total_read += local_read;
265 if total_read != buf.len() {
266 break;
267 }
268 }
269 Ok(total_read)
270 },
271 );
272 let res = res?.map_err(|err| match err {
273 Errno::Timedout => Errno::Again,
274 a => a,
275 });
276 match res {
277 Err(Errno::Connaborted) | Err(Errno::Connreset) => (0, false),
278 res => {
279 let bytes_read = wasi_try_ok_ok!(res);
280 (bytes_read, false)
281 }
282 }
283 }
284 Kind::PipeTx { .. } => return Ok(Err(Errno::Badf)),
285 Kind::PipeRx { rx } => {
286 let mut rx = rx.clone();
287 drop(guard);
288
289 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
290
291 let res = __asyncify_light(
292 env,
293 if fd_flags.contains(Fdflags::NONBLOCK) {
294 Some(Duration::ZERO)
295 } else {
296 None
297 },
298 async move {
299 let mut total_read = 0usize;
300
301 let iovs_arr =
302 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
303 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
304 for iovs in iovs_arr.iter() {
305 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
306 .slice(&memory, iovs.buf_len)
307 .map_err(mem_error_to_wasi)?
308 .access()
309 .map_err(mem_error_to_wasi)?;
310
311 let local_read = match nonblocking {
312 true => match rx.try_read(buf.as_mut()) {
313 Some(amt) => amt,
314 None => {
315 return Err(Errno::Again);
316 }
317 },
318 false => {
319 virtual_fs::AsyncReadExt::read(&mut rx, buf.as_mut())
320 .await?
321 }
322 };
323 total_read += local_read;
324 if local_read != buf.len() {
325 break;
326 }
327 }
328 Ok(total_read)
329 },
330 );
331
332 let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
333 Errno::Timedout => Errno::Again,
334 a => a,
335 }));
336
337 (bytes_read, false)
338 }
339 Kind::DuplexPipe { pipe } => {
340 let mut pipe = pipe.clone();
341 drop(guard);
342
343 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
344
345 let res = __asyncify_light(
346 env,
347 if fd_flags.contains(Fdflags::NONBLOCK) {
348 Some(Duration::ZERO)
349 } else {
350 None
351 },
352 async move {
353 let mut total_read = 0usize;
354
355 let iovs_arr =
356 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
357 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
358 for iovs in iovs_arr.iter() {
359 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
360 .slice(&memory, iovs.buf_len)
361 .map_err(mem_error_to_wasi)?
362 .access()
363 .map_err(mem_error_to_wasi)?;
364
365 let local_read = match nonblocking {
366 true => match pipe.try_read(buf.as_mut()) {
367 Some(amt) => amt,
368 None => {
369 return Err(Errno::Again);
370 }
371 },
372 false => {
373 virtual_fs::AsyncReadExt::read(&mut pipe, buf.as_mut())
374 .await?
375 }
376 };
377 total_read += local_read;
378 if local_read != buf.len() {
379 break;
380 }
381 }
382 Ok(total_read)
383 },
384 );
385
386 let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
387 Errno::Timedout => Errno::Again,
388 a => a,
389 }));
390
391 (bytes_read, false)
392 }
393 Kind::Dir { .. } | Kind::Root { .. } => {
394 return Ok(Err(Errno::Isdir));
396 }
397 Kind::EventNotifications { inner } => {
398 struct NotifyPoller {
400 inner: Arc<NotificationInner>,
401 non_blocking: bool,
402 }
403 let poller = NotifyPoller {
404 inner: inner.clone(),
405 non_blocking: fd_flags.contains(Fdflags::NONBLOCK),
406 };
407
408 drop(guard);
409
410 impl Future for NotifyPoller {
413 type Output = Result<u64, Errno>;
414 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415 if self.non_blocking {
416 Poll::Ready(self.inner.try_read().ok_or(Errno::Again))
417 } else {
418 self.inner.read(cx.waker()).map(Ok)
419 }
420 }
421 }
422
423 let tasks_inner = env.tasks().clone();
425
426 let res = __asyncify_light(env, None, poller)?.map_err(|err| match err {
427 Errno::Timedout => Errno::Again,
428 a => a,
429 });
430 let val = wasi_try_ok_ok!(res);
431
432 let mut memory = unsafe { env.memory_view(ctx) };
433 let reader = val.to_ne_bytes();
434 let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
435 let ret = wasi_try_ok_ok!(read_bytes(&reader[..], &memory, iovs_arr));
436 (ret, false)
437 }
438 Kind::Symlink { .. } | Kind::Epoll { .. } => {
439 return Ok(Err(Errno::Notsup));
440 }
441 Kind::Buffer { buffer } => {
442 let memory = unsafe { env.memory_view(ctx) };
443 let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
444 let read = wasi_try_ok_ok!(read_bytes(&buffer[offset..], &memory, iovs_arr));
445 (read, true)
446 }
447 }
448 };
449
450 if !is_stdio && should_update_cursor && can_update_cursor {
451 let mut fd_map = state.fs.fd_map.write().unwrap();
453 let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(fd).ok_or(Errno::Badf));
454 let old = fd_entry
455 .offset
456 .fetch_add(bytes_read as u64, Ordering::AcqRel);
457 }
458
459 bytes_read
460 };
461
462 Ok(Ok(bytes_read))
463}