wasmer_wasix/syscalls/wasi/
fd_read.rs

1use std::{collections::VecDeque, task::Waker};
2
3use virtual_fs::{AsyncReadExt, DeviceFile, ReadBuf};
4
5use super::*;
6use crate::{
7    fs::NotificationInner,
8    journal::SnapshotTrigger,
9    net::socket::TimeType,
10    os::task::process::{MaybeCheckpointResult, WasiProcessCheckpoint, WasiProcessInner},
11    syscalls::*,
12};
13
14/// ### `fd_read()`
15/// Read data from file descriptor
16/// Inputs:
17/// - `Fd fd`
18///     File descriptor from which data will be read
19/// - `const __wasi_iovec_t *iovs`
20///     Vectors where data will be stored
21/// - `u32 iovs_len`
22///     Length of data in `iovs`
23/// Output:
24/// - `u32 *nread`
25///     Number of bytes read
26///
27#[instrument(level = "trace", skip_all, fields(%fd, nread = field::Empty), ret)]
28pub fn fd_read<M: MemorySize>(
29    mut ctx: FunctionEnvMut<'_, WasiEnv>,
30    fd: WasiFd,
31    iovs: WasmPtr<__wasi_iovec_t<M>, M>,
32    iovs_len: M::Offset,
33    nread: WasmPtr<M::Offset, M>,
34) -> Result<Errno, WasiError> {
35    WasiEnv::do_pending_operations(&mut ctx)?;
36
37    let pid = ctx.data().pid();
38    let tid = ctx.data().tid();
39
40    let fd_entry = {
41        let env = ctx.data();
42        let state = env.state.clone();
43        wasi_try_ok!(state.fs.get_fd(fd))
44    };
45    let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
46
47    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
48    if fd == DeviceFile::STDIN {
49        ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
50    }
51
52    let res = fd_read_internal::<M>(&mut ctx, fd, fd_entry, iovs, iovs_len, offset, nread, true)?;
53    fd_read_internal_handler(ctx, res, nread)
54}
55
56/// ### `fd_pread()`
57/// Read from the file at the given offset without updating the file cursor.
58/// This acts like a stateless version of Seek + Read
59/// Inputs:
60/// - `Fd fd`
61///     The file descriptor to read the data with
62/// - `const __wasi_iovec_t* iovs'
63///     Vectors where the data will be stored
64/// - `size_t iovs_len`
65///     The number of vectors to store the data into
66/// - `Filesize offset`
67///     The file cursor to use: the starting position from which data will be read
68/// Output:
69/// - `size_t nread`
70///     The number of bytes read
71#[instrument(level = "trace", skip_all, fields(%fd, %offset, ?nread), ret)]
72pub fn fd_pread<M: MemorySize>(
73    mut ctx: FunctionEnvMut<'_, WasiEnv>,
74    fd: WasiFd,
75    iovs: WasmPtr<__wasi_iovec_t<M>, M>,
76    iovs_len: M::Offset,
77    offset: Filesize,
78    nread: WasmPtr<M::Offset, M>,
79) -> Result<Errno, WasiError> {
80    let pid = ctx.data().pid();
81    let tid = ctx.data().tid();
82
83    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
84    if fd == DeviceFile::STDIN {
85        ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
86    }
87
88    let fd_entry = {
89        let env = ctx.data();
90        let state = env.state.clone();
91        wasi_try_ok!(state.fs.get_fd(fd))
92    };
93    let res = fd_read_internal::<M>(
94        &mut ctx,
95        fd,
96        fd_entry,
97        iovs,
98        iovs_len,
99        offset as usize,
100        nread,
101        false,
102    )?;
103    fd_read_internal_handler::<M>(ctx, res, nread)
104}
105
106pub(crate) fn fd_read_internal_handler<M: MemorySize>(
107    mut ctx: FunctionEnvMut<'_, WasiEnv>,
108    res: Result<usize, Errno>,
109    nread: WasmPtr<M::Offset, M>,
110) -> Result<Errno, WasiError> {
111    let mut ret = Errno::Success;
112    let bytes_read = match res {
113        Ok(bytes_read) => bytes_read,
114        Err(err) => {
115            ret = err;
116            0
117        }
118    };
119    Span::current().record("nread", bytes_read);
120
121    let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
122
123    let env = ctx.data();
124    let memory = unsafe { env.memory_view(&ctx) };
125
126    let env = ctx.data();
127    let memory = unsafe { env.memory_view(&ctx) };
128    let nread_ref = nread.deref(&memory);
129    wasi_try_mem_ok!(nread_ref.write(bytes_read));
130
131    Ok(ret)
132}
133
134#[allow(clippy::await_holding_lock)]
135pub(crate) fn fd_read_internal<M: MemorySize>(
136    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
137    fd: WasiFd,
138    fd_entry: Fd,
139    iovs: WasmPtr<__wasi_iovec_t<M>, M>,
140    iovs_len: M::Offset,
141    offset: usize,
142    nread: WasmPtr<M::Offset, M>,
143    should_update_cursor: bool,
144) -> WasiResult<usize> {
145    let env = ctx.data();
146    let memory = unsafe { env.memory_view(&ctx) };
147    let state = env.state();
148    let is_stdio = fd_entry.is_stdio;
149
150    let bytes_read = {
151        if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_READ) {
152            // TODO: figure out the error to return when lacking rights
153            return Ok(Err(Errno::Access));
154        }
155
156        let inode = fd_entry.inode;
157        let fd_flags = fd_entry.inner.flags;
158
159        let (bytes_read, can_update_cursor) = {
160            let mut guard = inode.write();
161            match guard.deref_mut() {
162                Kind::File { handle, .. } => {
163                    let Some(handle) = handle else {
164                        tracing::warn!("fd_read: file handle is None");
165                        return Ok(Err(Errno::Badf));
166                    };
167                    let handle = handle.clone();
168
169                    drop(guard);
170
171                    let res = __asyncify_light(
172                        env,
173                        if fd_flags.contains(Fdflags::NONBLOCK) {
174                            Some(Duration::ZERO)
175                        } else {
176                            None
177                        },
178                        async move {
179                            let mut handle = match handle.write() {
180                                Ok(a) => a,
181                                Err(_) => return Err(Errno::Fault),
182                            };
183                            if !is_stdio {
184                                handle
185                                    .seek(std::io::SeekFrom::Start(offset as u64))
186                                    .await
187                                    .map_err(map_io_err)?;
188                            }
189
190                            let mut total_read = 0usize;
191
192                            let iovs_arr =
193                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
194                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
195                            for iovs in iovs_arr.iter() {
196                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
197                                    .slice(&memory, iovs.buf_len)
198                                    .map_err(mem_error_to_wasi)?
199                                    .access()
200                                    .map_err(mem_error_to_wasi)?;
201                                let r = handle.read(buf.as_mut()).await.map_err(|err| {
202                                    let err = From::<std::io::Error>::from(err);
203                                    match err {
204                                        Errno::Again => {
205                                            if is_stdio {
206                                                Errno::Badf
207                                            } else {
208                                                Errno::Again
209                                            }
210                                        }
211                                        a => a,
212                                    }
213                                });
214                                let local_read = match r {
215                                    Ok(s) => s,
216                                    Err(_) if total_read > 0 => break,
217                                    Err(err) => return Err(err),
218                                };
219                                total_read += local_read;
220                                if local_read != buf.len() {
221                                    break;
222                                }
223                            }
224                            Ok(total_read)
225                        },
226                    );
227                    let read = wasi_try_ok_ok!(res?.map_err(|err| match err {
228                        Errno::Timedout => Errno::Again,
229                        a => a,
230                    }));
231                    (read, true)
232                }
233                Kind::Socket { socket } => {
234                    let socket = socket.clone();
235
236                    drop(guard);
237
238                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
239                    let timeout = socket
240                        .opt_time(TimeType::ReadTimeout)
241                        .ok()
242                        .flatten()
243                        .unwrap_or(Duration::from_secs(30));
244
245                    let tasks = env.tasks().clone();
246                    let res = __asyncify_light(
247                        env,
248                        if fd_flags.contains(Fdflags::NONBLOCK) {
249                            Some(Duration::ZERO)
250                        } else {
251                            None
252                        },
253                        async move {
254                            let mut total_read = 0usize;
255
256                            let iovs_arr =
257                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
258                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
259                            for iovs in iovs_arr.iter() {
260                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
261                                    .slice(&memory, iovs.buf_len)
262                                    .map_err(mem_error_to_wasi)?
263                                    .access()
264                                    .map_err(mem_error_to_wasi)?;
265
266                                let local_read = socket
267                                    .recv(
268                                        tasks.deref(),
269                                        buf.as_mut_uninit(),
270                                        Some(timeout),
271                                        nonblocking,
272                                        false,
273                                    )
274                                    .await?;
275                                total_read += local_read;
276                                if total_read != buf.len() {
277                                    break;
278                                }
279                            }
280                            Ok(total_read)
281                        },
282                    );
283                    let res = res?.map_err(|err| match err {
284                        Errno::Timedout => Errno::Again,
285                        a => a,
286                    });
287                    match res {
288                        Err(Errno::Connaborted) | Err(Errno::Connreset) => (0, false),
289                        res => {
290                            let bytes_read = wasi_try_ok_ok!(res);
291                            (bytes_read, false)
292                        }
293                    }
294                }
295                Kind::PipeTx { .. } => return Ok(Err(Errno::Badf)),
296                Kind::PipeRx { rx } => {
297                    let mut rx = rx.clone();
298                    drop(guard);
299
300                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
301
302                    let res = __asyncify_light(
303                        env,
304                        if fd_flags.contains(Fdflags::NONBLOCK) {
305                            Some(Duration::ZERO)
306                        } else {
307                            None
308                        },
309                        async move {
310                            let mut total_read = 0usize;
311
312                            let iovs_arr =
313                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
314                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
315                            for iovs in iovs_arr.iter() {
316                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
317                                    .slice(&memory, iovs.buf_len)
318                                    .map_err(mem_error_to_wasi)?
319                                    .access()
320                                    .map_err(mem_error_to_wasi)?;
321
322                                let local_read = match nonblocking {
323                                    true => match rx.try_read(buf.as_mut()) {
324                                        Some(amt) => amt,
325                                        None => {
326                                            return Err(Errno::Again);
327                                        }
328                                    },
329                                    false => {
330                                        virtual_fs::AsyncReadExt::read(&mut rx, buf.as_mut())
331                                            .await?
332                                    }
333                                };
334                                total_read += local_read;
335                                if local_read != buf.len() {
336                                    break;
337                                }
338                            }
339                            Ok(total_read)
340                        },
341                    );
342
343                    let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
344                        Errno::Timedout => Errno::Again,
345                        a => a,
346                    }));
347
348                    (bytes_read, false)
349                }
350                Kind::DuplexPipe { pipe } => {
351                    let mut pipe = pipe.clone();
352                    drop(guard);
353
354                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
355
356                    let res = __asyncify_light(
357                        env,
358                        if fd_flags.contains(Fdflags::NONBLOCK) {
359                            Some(Duration::ZERO)
360                        } else {
361                            None
362                        },
363                        async move {
364                            let mut total_read = 0usize;
365
366                            let iovs_arr =
367                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
368                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
369                            for iovs in iovs_arr.iter() {
370                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
371                                    .slice(&memory, iovs.buf_len)
372                                    .map_err(mem_error_to_wasi)?
373                                    .access()
374                                    .map_err(mem_error_to_wasi)?;
375
376                                let local_read = match nonblocking {
377                                    true => match pipe.try_read(buf.as_mut()) {
378                                        Some(amt) => amt,
379                                        None => {
380                                            return Err(Errno::Again);
381                                        }
382                                    },
383                                    false => {
384                                        virtual_fs::AsyncReadExt::read(&mut pipe, buf.as_mut())
385                                            .await?
386                                    }
387                                };
388                                total_read += local_read;
389                                if local_read != buf.len() {
390                                    break;
391                                }
392                            }
393                            Ok(total_read)
394                        },
395                    );
396
397                    let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
398                        Errno::Timedout => Errno::Again,
399                        a => a,
400                    }));
401
402                    (bytes_read, false)
403                }
404                Kind::Dir { .. } | Kind::Root { .. } => {
405                    // TODO: verify
406                    return Ok(Err(Errno::Isdir));
407                }
408                Kind::EventNotifications { inner } => {
409                    // Create a poller
410                    struct NotifyPoller {
411                        inner: Arc<NotificationInner>,
412                        non_blocking: bool,
413                    }
414                    let poller = NotifyPoller {
415                        inner: inner.clone(),
416                        non_blocking: fd_flags.contains(Fdflags::NONBLOCK),
417                    };
418
419                    drop(guard);
420
421                    // The poller will register itself for notifications and wait for the
422                    // counter to drop
423                    impl Future for NotifyPoller {
424                        type Output = Result<u64, Errno>;
425                        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
426                            if self.non_blocking {
427                                Poll::Ready(self.inner.try_read().ok_or(Errno::Again))
428                            } else {
429                                self.inner.read(cx.waker()).map(Ok)
430                            }
431                        }
432                    }
433
434                    // Yield until the notifications are triggered
435                    let tasks_inner = env.tasks().clone();
436
437                    let res = __asyncify_light(env, None, poller)?.map_err(|err| match err {
438                        Errno::Timedout => Errno::Again,
439                        a => a,
440                    });
441                    let val = wasi_try_ok_ok!(res);
442
443                    let mut memory = unsafe { env.memory_view(ctx) };
444                    let reader = val.to_ne_bytes();
445                    let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
446                    let ret = wasi_try_ok_ok!(read_bytes(&reader[..], &memory, iovs_arr));
447                    (ret, false)
448                }
449                Kind::Symlink { .. } | Kind::Epoll { .. } => {
450                    return Ok(Err(Errno::Notsup));
451                }
452                Kind::Buffer { buffer } => {
453                    let memory = unsafe { env.memory_view(ctx) };
454                    let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
455                    let read = wasi_try_ok_ok!(read_bytes(&buffer[offset..], &memory, iovs_arr));
456                    (read, true)
457                }
458            }
459        };
460
461        if !is_stdio && should_update_cursor && can_update_cursor {
462            fd_entry
463                .inner
464                .offset
465                .fetch_add(bytes_read as u64, Ordering::AcqRel);
466        }
467
468        bytes_read
469    };
470
471    Ok(Ok(bytes_read))
472}