wasmer_wasix/syscalls/wasi/
fd_read.rs1use std::{collections::VecDeque, task::Waker};
2
3use virtual_fs::{AsyncReadExt, DeviceFile, ReadBuf};
4
5use super::*;
6use crate::{
7 fs::NotificationInner,
8 journal::SnapshotTrigger,
9 net::socket::TimeType,
10 os::task::process::{MaybeCheckpointResult, WasiProcessCheckpoint, WasiProcessInner},
11 syscalls::*,
12};
13
14#[instrument(level = "trace", skip_all, fields(%fd, nread = field::Empty), ret)]
28pub fn fd_read<M: MemorySize>(
29 mut ctx: FunctionEnvMut<'_, WasiEnv>,
30 fd: WasiFd,
31 iovs: WasmPtr<__wasi_iovec_t<M>, M>,
32 iovs_len: M::Offset,
33 nread: WasmPtr<M::Offset, M>,
34) -> Result<Errno, WasiError> {
35 WasiEnv::do_pending_operations(&mut ctx)?;
36
37 let pid = ctx.data().pid();
38 let tid = ctx.data().tid();
39
40 let fd_entry = {
41 let env = ctx.data();
42 let state = env.state.clone();
43 wasi_try_ok!(state.fs.get_fd(fd))
44 };
45 let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
46
47 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
48 if fd == DeviceFile::STDIN {
49 ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
50 }
51
52 let res = fd_read_internal::<M>(&mut ctx, fd, fd_entry, iovs, iovs_len, offset, nread, true)?;
53 fd_read_internal_handler(ctx, res, nread)
54}
55
56#[instrument(level = "trace", skip_all, fields(%fd, %offset, ?nread), ret)]
72pub fn fd_pread<M: MemorySize>(
73 mut ctx: FunctionEnvMut<'_, WasiEnv>,
74 fd: WasiFd,
75 iovs: WasmPtr<__wasi_iovec_t<M>, M>,
76 iovs_len: M::Offset,
77 offset: Filesize,
78 nread: WasmPtr<M::Offset, M>,
79) -> Result<Errno, WasiError> {
80 let pid = ctx.data().pid();
81 let tid = ctx.data().tid();
82
83 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
84 if fd == DeviceFile::STDIN {
85 ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
86 }
87
88 let fd_entry = {
89 let env = ctx.data();
90 let state = env.state.clone();
91 wasi_try_ok!(state.fs.get_fd(fd))
92 };
93 let res = fd_read_internal::<M>(
94 &mut ctx,
95 fd,
96 fd_entry,
97 iovs,
98 iovs_len,
99 offset as usize,
100 nread,
101 false,
102 )?;
103 fd_read_internal_handler::<M>(ctx, res, nread)
104}
105
106pub(crate) fn fd_read_internal_handler<M: MemorySize>(
107 mut ctx: FunctionEnvMut<'_, WasiEnv>,
108 res: Result<usize, Errno>,
109 nread: WasmPtr<M::Offset, M>,
110) -> Result<Errno, WasiError> {
111 let mut ret = Errno::Success;
112 let bytes_read = match res {
113 Ok(bytes_read) => bytes_read,
114 Err(err) => {
115 ret = err;
116 0
117 }
118 };
119 Span::current().record("nread", bytes_read);
120
121 let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
122
123 let env = ctx.data();
124 let memory = unsafe { env.memory_view(&ctx) };
125
126 let env = ctx.data();
127 let memory = unsafe { env.memory_view(&ctx) };
128 let nread_ref = nread.deref(&memory);
129 wasi_try_mem_ok!(nread_ref.write(bytes_read));
130
131 Ok(ret)
132}
133
134#[allow(clippy::await_holding_lock)]
135pub(crate) fn fd_read_internal<M: MemorySize>(
136 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
137 fd: WasiFd,
138 fd_entry: Fd,
139 iovs: WasmPtr<__wasi_iovec_t<M>, M>,
140 iovs_len: M::Offset,
141 offset: usize,
142 nread: WasmPtr<M::Offset, M>,
143 should_update_cursor: bool,
144) -> WasiResult<usize> {
145 let env = ctx.data();
146 let memory = unsafe { env.memory_view(&ctx) };
147 let state = env.state();
148 let is_stdio = fd_entry.is_stdio;
149
150 let bytes_read = {
151 if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_READ) {
152 return Ok(Err(Errno::Access));
154 }
155
156 let inode = fd_entry.inode;
157 let fd_flags = fd_entry.inner.flags;
158
159 let (bytes_read, can_update_cursor) = {
160 let mut guard = inode.write();
161 match guard.deref_mut() {
162 Kind::File { handle, .. } => {
163 let Some(handle) = handle else {
164 tracing::warn!("fd_read: file handle is None");
165 return Ok(Err(Errno::Badf));
166 };
167 let handle = handle.clone();
168
169 drop(guard);
170
171 let res = __asyncify_light(
172 env,
173 if fd_flags.contains(Fdflags::NONBLOCK) {
174 Some(Duration::ZERO)
175 } else {
176 None
177 },
178 async move {
179 let mut handle = match handle.write() {
180 Ok(a) => a,
181 Err(_) => return Err(Errno::Fault),
182 };
183 if !is_stdio {
184 handle
185 .seek(std::io::SeekFrom::Start(offset as u64))
186 .await
187 .map_err(map_io_err)?;
188 }
189
190 let mut total_read = 0usize;
191
192 let iovs_arr =
193 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
194 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
195 for iovs in iovs_arr.iter() {
196 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
197 .slice(&memory, iovs.buf_len)
198 .map_err(mem_error_to_wasi)?
199 .access()
200 .map_err(mem_error_to_wasi)?;
201 let r = handle.read(buf.as_mut()).await.map_err(|err| {
202 let err = From::<std::io::Error>::from(err);
203 match err {
204 Errno::Again => {
205 if is_stdio {
206 Errno::Badf
207 } else {
208 Errno::Again
209 }
210 }
211 a => a,
212 }
213 });
214 let local_read = match r {
215 Ok(s) => s,
216 Err(_) if total_read > 0 => break,
217 Err(err) => return Err(err),
218 };
219 total_read += local_read;
220 if local_read != buf.len() {
221 break;
222 }
223 }
224 Ok(total_read)
225 },
226 );
227 let read = wasi_try_ok_ok!(res?.map_err(|err| match err {
228 Errno::Timedout => Errno::Again,
229 a => a,
230 }));
231 (read, true)
232 }
233 Kind::Socket { socket } => {
234 let socket = socket.clone();
235
236 drop(guard);
237
238 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
239 let timeout = socket
240 .opt_time(TimeType::ReadTimeout)
241 .ok()
242 .flatten()
243 .unwrap_or(Duration::from_secs(30));
244
245 let tasks = env.tasks().clone();
246 let res = __asyncify_light(
247 env,
248 if fd_flags.contains(Fdflags::NONBLOCK) {
249 Some(Duration::ZERO)
250 } else {
251 None
252 },
253 async move {
254 let mut total_read = 0usize;
255
256 let iovs_arr =
257 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
258 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
259 for iovs in iovs_arr.iter() {
260 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
261 .slice(&memory, iovs.buf_len)
262 .map_err(mem_error_to_wasi)?
263 .access()
264 .map_err(mem_error_to_wasi)?;
265
266 let local_read = socket
267 .recv(
268 tasks.deref(),
269 buf.as_mut_uninit(),
270 Some(timeout),
271 nonblocking,
272 false,
273 )
274 .await?;
275 total_read += local_read;
276 if total_read != buf.len() {
277 break;
278 }
279 }
280 Ok(total_read)
281 },
282 );
283 let res = res?.map_err(|err| match err {
284 Errno::Timedout => Errno::Again,
285 a => a,
286 });
287 match res {
288 Err(Errno::Connaborted) | Err(Errno::Connreset) => (0, false),
289 res => {
290 let bytes_read = wasi_try_ok_ok!(res);
291 (bytes_read, false)
292 }
293 }
294 }
295 Kind::PipeTx { .. } => return Ok(Err(Errno::Badf)),
296 Kind::PipeRx { rx } => {
297 let mut rx = rx.clone();
298 drop(guard);
299
300 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
301
302 let res = __asyncify_light(
303 env,
304 if fd_flags.contains(Fdflags::NONBLOCK) {
305 Some(Duration::ZERO)
306 } else {
307 None
308 },
309 async move {
310 let mut total_read = 0usize;
311
312 let iovs_arr =
313 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
314 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
315 for iovs in iovs_arr.iter() {
316 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
317 .slice(&memory, iovs.buf_len)
318 .map_err(mem_error_to_wasi)?
319 .access()
320 .map_err(mem_error_to_wasi)?;
321
322 let local_read = match nonblocking {
323 true => match rx.try_read(buf.as_mut()) {
324 Some(amt) => amt,
325 None => {
326 return Err(Errno::Again);
327 }
328 },
329 false => {
330 virtual_fs::AsyncReadExt::read(&mut rx, buf.as_mut())
331 .await?
332 }
333 };
334 total_read += local_read;
335 if local_read != buf.len() {
336 break;
337 }
338 }
339 Ok(total_read)
340 },
341 );
342
343 let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
344 Errno::Timedout => Errno::Again,
345 a => a,
346 }));
347
348 (bytes_read, false)
349 }
350 Kind::DuplexPipe { pipe } => {
351 let mut pipe = pipe.clone();
352 drop(guard);
353
354 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
355
356 let res = __asyncify_light(
357 env,
358 if fd_flags.contains(Fdflags::NONBLOCK) {
359 Some(Duration::ZERO)
360 } else {
361 None
362 },
363 async move {
364 let mut total_read = 0usize;
365
366 let iovs_arr =
367 iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
368 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
369 for iovs in iovs_arr.iter() {
370 let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
371 .slice(&memory, iovs.buf_len)
372 .map_err(mem_error_to_wasi)?
373 .access()
374 .map_err(mem_error_to_wasi)?;
375
376 let local_read = match nonblocking {
377 true => match pipe.try_read(buf.as_mut()) {
378 Some(amt) => amt,
379 None => {
380 return Err(Errno::Again);
381 }
382 },
383 false => {
384 virtual_fs::AsyncReadExt::read(&mut pipe, buf.as_mut())
385 .await?
386 }
387 };
388 total_read += local_read;
389 if local_read != buf.len() {
390 break;
391 }
392 }
393 Ok(total_read)
394 },
395 );
396
397 let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
398 Errno::Timedout => Errno::Again,
399 a => a,
400 }));
401
402 (bytes_read, false)
403 }
404 Kind::Dir { .. } | Kind::Root { .. } => {
405 return Ok(Err(Errno::Isdir));
407 }
408 Kind::EventNotifications { inner } => {
409 struct NotifyPoller {
411 inner: Arc<NotificationInner>,
412 non_blocking: bool,
413 }
414 let poller = NotifyPoller {
415 inner: inner.clone(),
416 non_blocking: fd_flags.contains(Fdflags::NONBLOCK),
417 };
418
419 drop(guard);
420
421 impl Future for NotifyPoller {
424 type Output = Result<u64, Errno>;
425 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
426 if self.non_blocking {
427 Poll::Ready(self.inner.try_read().ok_or(Errno::Again))
428 } else {
429 self.inner.read(cx.waker()).map(Ok)
430 }
431 }
432 }
433
434 let tasks_inner = env.tasks().clone();
436
437 let res = __asyncify_light(env, None, poller)?.map_err(|err| match err {
438 Errno::Timedout => Errno::Again,
439 a => a,
440 });
441 let val = wasi_try_ok_ok!(res);
442
443 let mut memory = unsafe { env.memory_view(ctx) };
444 let reader = val.to_ne_bytes();
445 let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
446 let ret = wasi_try_ok_ok!(read_bytes(&reader[..], &memory, iovs_arr));
447 (ret, false)
448 }
449 Kind::Symlink { .. } | Kind::Epoll { .. } => {
450 return Ok(Err(Errno::Notsup));
451 }
452 Kind::Buffer { buffer } => {
453 let memory = unsafe { env.memory_view(ctx) };
454 let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
455 let read = wasi_try_ok_ok!(read_bytes(&buffer[offset..], &memory, iovs_arr));
456 (read, true)
457 }
458 }
459 };
460
461 if !is_stdio && should_update_cursor && can_update_cursor {
462 fd_entry
463 .inner
464 .offset
465 .fetch_add(bytes_read as u64, Ordering::AcqRel);
466 }
467
468 bytes_read
469 };
470
471 Ok(Ok(bytes_read))
472}