wasmer_wasix/syscalls/wasi/
fd_read.rs

1use std::{collections::VecDeque, task::Waker};
2
3use virtual_fs::{AsyncReadExt, DeviceFile, ReadBuf};
4
5use super::*;
6use crate::{
7    fs::NotificationInner,
8    journal::SnapshotTrigger,
9    net::socket::TimeType,
10    os::task::process::{MaybeCheckpointResult, WasiProcessCheckpoint, WasiProcessInner},
11    syscalls::*,
12};
13
14/// ### `fd_read()`
15/// Read data from file descriptor
16/// Inputs:
17/// - `Fd fd`
18///     File descriptor from which data will be read
19/// - `const __wasi_iovec_t *iovs`
20///     Vectors where data will be stored
21/// - `u32 iovs_len`
22///     Length of data in `iovs`
23/// Output:
24/// - `u32 *nread`
25///     Number of bytes read
26///
27#[instrument(level = "trace", skip_all, fields(%fd, nread = field::Empty), ret)]
28pub fn fd_read<M: MemorySize>(
29    mut ctx: FunctionEnvMut<'_, WasiEnv>,
30    fd: WasiFd,
31    iovs: WasmPtr<__wasi_iovec_t<M>, M>,
32    iovs_len: M::Offset,
33    nread: WasmPtr<M::Offset, M>,
34) -> Result<Errno, WasiError> {
35    WasiEnv::do_pending_operations(&mut ctx)?;
36
37    let pid = ctx.data().pid();
38    let tid = ctx.data().tid();
39
40    let offset = {
41        let mut env = ctx.data();
42        let state = env.state.clone();
43        let inodes = state.inodes.clone();
44
45        let fd_entry = wasi_try_ok!(state.fs.get_fd(fd));
46        fd_entry.inner.offset.load(Ordering::Acquire) as usize
47    };
48
49    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
50    if fd == DeviceFile::STDIN {
51        ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
52    }
53
54    let res = fd_read_internal::<M>(&mut ctx, fd, iovs, iovs_len, offset, nread, true)?;
55    fd_read_internal_handler(ctx, res, nread)
56}
57
58/// ### `fd_pread()`
59/// Read from the file at the given offset without updating the file cursor.
60/// This acts like a stateless version of Seek + Read
61/// Inputs:
62/// - `Fd fd`
63///     The file descriptor to read the data with
64/// - `const __wasi_iovec_t* iovs'
65///     Vectors where the data will be stored
66/// - `size_t iovs_len`
67///     The number of vectors to store the data into
68/// - `Filesize offset`
69///     The file cursor to use: the starting position from which data will be read
70/// Output:
71/// - `size_t nread`
72///     The number of bytes read
73#[instrument(level = "trace", skip_all, fields(%fd, %offset, ?nread), ret)]
74pub fn fd_pread<M: MemorySize>(
75    mut ctx: FunctionEnvMut<'_, WasiEnv>,
76    fd: WasiFd,
77    iovs: WasmPtr<__wasi_iovec_t<M>, M>,
78    iovs_len: M::Offset,
79    offset: Filesize,
80    nread: WasmPtr<M::Offset, M>,
81) -> Result<Errno, WasiError> {
82    let pid = ctx.data().pid();
83    let tid = ctx.data().tid();
84
85    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
86    if fd == DeviceFile::STDIN {
87        ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::FirstStdin)?);
88    }
89
90    let res = fd_read_internal::<M>(&mut ctx, fd, iovs, iovs_len, offset as usize, nread, false)?;
91    fd_read_internal_handler::<M>(ctx, res, nread)
92}
93
94pub(crate) fn fd_read_internal_handler<M: MemorySize>(
95    mut ctx: FunctionEnvMut<'_, WasiEnv>,
96    res: Result<usize, Errno>,
97    nread: WasmPtr<M::Offset, M>,
98) -> Result<Errno, WasiError> {
99    let mut ret = Errno::Success;
100    let bytes_read = match res {
101        Ok(bytes_read) => bytes_read,
102        Err(err) => {
103            ret = err;
104            0
105        }
106    };
107    Span::current().record("nread", bytes_read);
108
109    let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
110
111    let env = ctx.data();
112    let memory = unsafe { env.memory_view(&ctx) };
113
114    let env = ctx.data();
115    let memory = unsafe { env.memory_view(&ctx) };
116    let nread_ref = nread.deref(&memory);
117    wasi_try_mem_ok!(nread_ref.write(bytes_read));
118
119    Ok(ret)
120}
121
122#[allow(clippy::await_holding_lock)]
123pub(crate) fn fd_read_internal<M: MemorySize>(
124    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
125    fd: WasiFd,
126    iovs: WasmPtr<__wasi_iovec_t<M>, M>,
127    iovs_len: M::Offset,
128    offset: usize,
129    nread: WasmPtr<M::Offset, M>,
130    should_update_cursor: bool,
131) -> WasiResult<usize> {
132    let env = ctx.data();
133    let memory = unsafe { env.memory_view(&ctx) };
134    let state = env.state();
135
136    let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(fd));
137    let is_stdio = fd_entry.is_stdio;
138
139    let bytes_read = {
140        if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_READ) {
141            // TODO: figure out the error to return when lacking rights
142            return Ok(Err(Errno::Access));
143        }
144
145        let inode = fd_entry.inode;
146        let fd_flags = fd_entry.inner.flags;
147
148        let (bytes_read, can_update_cursor) = {
149            let mut guard = inode.write();
150            match guard.deref_mut() {
151                Kind::File { handle, .. } => {
152                    if let Some(handle) = handle {
153                        let handle = handle.clone();
154
155                        drop(guard);
156
157                        let res = __asyncify_light(
158                            env,
159                            if fd_flags.contains(Fdflags::NONBLOCK) {
160                                Some(Duration::ZERO)
161                            } else {
162                                None
163                            },
164                            async move {
165                                let mut handle = match handle.write() {
166                                    Ok(a) => a,
167                                    Err(_) => return Err(Errno::Fault),
168                                };
169                                if !is_stdio {
170                                    handle
171                                        .seek(std::io::SeekFrom::Start(offset as u64))
172                                        .await
173                                        .map_err(map_io_err)?;
174                                }
175
176                                let mut total_read = 0usize;
177
178                                let iovs_arr =
179                                    iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
180                                let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
181                                for iovs in iovs_arr.iter() {
182                                    let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
183                                        .slice(&memory, iovs.buf_len)
184                                        .map_err(mem_error_to_wasi)?
185                                        .access()
186                                        .map_err(mem_error_to_wasi)?;
187                                    let r = handle.read(buf.as_mut()).await.map_err(|err| {
188                                        let err = From::<std::io::Error>::from(err);
189                                        match err {
190                                            Errno::Again => {
191                                                if is_stdio {
192                                                    Errno::Badf
193                                                } else {
194                                                    Errno::Again
195                                                }
196                                            }
197                                            a => a,
198                                        }
199                                    });
200                                    let local_read = match r {
201                                        Ok(s) => s,
202                                        Err(_) if total_read > 0 => break,
203                                        Err(err) => return Err(err),
204                                    };
205                                    total_read += local_read;
206                                    if local_read != buf.len() {
207                                        break;
208                                    }
209                                }
210                                Ok(total_read)
211                            },
212                        );
213                        let read = wasi_try_ok_ok!(res?.map_err(|err| match err {
214                            Errno::Timedout => Errno::Again,
215                            a => a,
216                        }));
217                        (read, true)
218                    } else {
219                        return Ok(Err(Errno::Badf));
220                    }
221                }
222                Kind::Socket { socket } => {
223                    let socket = socket.clone();
224
225                    drop(guard);
226
227                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
228                    let timeout = socket
229                        .opt_time(TimeType::ReadTimeout)
230                        .ok()
231                        .flatten()
232                        .unwrap_or(Duration::from_secs(30));
233
234                    let tasks = env.tasks().clone();
235                    let res = __asyncify_light(
236                        env,
237                        if fd_flags.contains(Fdflags::NONBLOCK) {
238                            Some(Duration::ZERO)
239                        } else {
240                            None
241                        },
242                        async move {
243                            let mut total_read = 0usize;
244
245                            let iovs_arr =
246                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
247                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
248                            for iovs in iovs_arr.iter() {
249                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
250                                    .slice(&memory, iovs.buf_len)
251                                    .map_err(mem_error_to_wasi)?
252                                    .access()
253                                    .map_err(mem_error_to_wasi)?;
254
255                                let local_read = socket
256                                    .recv(
257                                        tasks.deref(),
258                                        buf.as_mut_uninit(),
259                                        Some(timeout),
260                                        nonblocking,
261                                        false,
262                                    )
263                                    .await?;
264                                total_read += local_read;
265                                if total_read != buf.len() {
266                                    break;
267                                }
268                            }
269                            Ok(total_read)
270                        },
271                    );
272                    let res = res?.map_err(|err| match err {
273                        Errno::Timedout => Errno::Again,
274                        a => a,
275                    });
276                    match res {
277                        Err(Errno::Connaborted) | Err(Errno::Connreset) => (0, false),
278                        res => {
279                            let bytes_read = wasi_try_ok_ok!(res);
280                            (bytes_read, false)
281                        }
282                    }
283                }
284                Kind::PipeTx { .. } => return Ok(Err(Errno::Badf)),
285                Kind::PipeRx { rx } => {
286                    let mut rx = rx.clone();
287                    drop(guard);
288
289                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
290
291                    let res = __asyncify_light(
292                        env,
293                        if fd_flags.contains(Fdflags::NONBLOCK) {
294                            Some(Duration::ZERO)
295                        } else {
296                            None
297                        },
298                        async move {
299                            let mut total_read = 0usize;
300
301                            let iovs_arr =
302                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
303                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
304                            for iovs in iovs_arr.iter() {
305                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
306                                    .slice(&memory, iovs.buf_len)
307                                    .map_err(mem_error_to_wasi)?
308                                    .access()
309                                    .map_err(mem_error_to_wasi)?;
310
311                                let local_read = match nonblocking {
312                                    true => match rx.try_read(buf.as_mut()) {
313                                        Some(amt) => amt,
314                                        None => {
315                                            return Err(Errno::Again);
316                                        }
317                                    },
318                                    false => {
319                                        virtual_fs::AsyncReadExt::read(&mut rx, buf.as_mut())
320                                            .await?
321                                    }
322                                };
323                                total_read += local_read;
324                                if local_read != buf.len() {
325                                    break;
326                                }
327                            }
328                            Ok(total_read)
329                        },
330                    );
331
332                    let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
333                        Errno::Timedout => Errno::Again,
334                        a => a,
335                    }));
336
337                    (bytes_read, false)
338                }
339                Kind::DuplexPipe { pipe } => {
340                    let mut pipe = pipe.clone();
341                    drop(guard);
342
343                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
344
345                    let res = __asyncify_light(
346                        env,
347                        if fd_flags.contains(Fdflags::NONBLOCK) {
348                            Some(Duration::ZERO)
349                        } else {
350                            None
351                        },
352                        async move {
353                            let mut total_read = 0usize;
354
355                            let iovs_arr =
356                                iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
357                            let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
358                            for iovs in iovs_arr.iter() {
359                                let mut buf = WasmPtr::<u8, M>::new(iovs.buf)
360                                    .slice(&memory, iovs.buf_len)
361                                    .map_err(mem_error_to_wasi)?
362                                    .access()
363                                    .map_err(mem_error_to_wasi)?;
364
365                                let local_read = match nonblocking {
366                                    true => match pipe.try_read(buf.as_mut()) {
367                                        Some(amt) => amt,
368                                        None => {
369                                            return Err(Errno::Again);
370                                        }
371                                    },
372                                    false => {
373                                        virtual_fs::AsyncReadExt::read(&mut pipe, buf.as_mut())
374                                            .await?
375                                    }
376                                };
377                                total_read += local_read;
378                                if local_read != buf.len() {
379                                    break;
380                                }
381                            }
382                            Ok(total_read)
383                        },
384                    );
385
386                    let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err {
387                        Errno::Timedout => Errno::Again,
388                        a => a,
389                    }));
390
391                    (bytes_read, false)
392                }
393                Kind::Dir { .. } | Kind::Root { .. } => {
394                    // TODO: verify
395                    return Ok(Err(Errno::Isdir));
396                }
397                Kind::EventNotifications { inner } => {
398                    // Create a poller
399                    struct NotifyPoller {
400                        inner: Arc<NotificationInner>,
401                        non_blocking: bool,
402                    }
403                    let poller = NotifyPoller {
404                        inner: inner.clone(),
405                        non_blocking: fd_flags.contains(Fdflags::NONBLOCK),
406                    };
407
408                    drop(guard);
409
410                    // The poller will register itself for notifications and wait for the
411                    // counter to drop
412                    impl Future for NotifyPoller {
413                        type Output = Result<u64, Errno>;
414                        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415                            if self.non_blocking {
416                                Poll::Ready(self.inner.try_read().ok_or(Errno::Again))
417                            } else {
418                                self.inner.read(cx.waker()).map(Ok)
419                            }
420                        }
421                    }
422
423                    // Yield until the notifications are triggered
424                    let tasks_inner = env.tasks().clone();
425
426                    let res = __asyncify_light(env, None, poller)?.map_err(|err| match err {
427                        Errno::Timedout => Errno::Again,
428                        a => a,
429                    });
430                    let val = wasi_try_ok_ok!(res);
431
432                    let mut memory = unsafe { env.memory_view(ctx) };
433                    let reader = val.to_ne_bytes();
434                    let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
435                    let ret = wasi_try_ok_ok!(read_bytes(&reader[..], &memory, iovs_arr));
436                    (ret, false)
437                }
438                Kind::Symlink { .. } | Kind::Epoll { .. } => {
439                    return Ok(Err(Errno::Notsup));
440                }
441                Kind::Buffer { buffer } => {
442                    let memory = unsafe { env.memory_view(ctx) };
443                    let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len));
444                    let read = wasi_try_ok_ok!(read_bytes(&buffer[offset..], &memory, iovs_arr));
445                    (read, true)
446                }
447            }
448        };
449
450        if !is_stdio && should_update_cursor && can_update_cursor {
451            // reborrow
452            let mut fd_map = state.fs.fd_map.write().unwrap();
453            let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(fd).ok_or(Errno::Badf));
454            let old = fd_entry
455                .offset
456                .fetch_add(bytes_read as u64, Ordering::AcqRel);
457        }
458
459        bytes_read
460    };
461
462    Ok(Ok(bytes_read))
463}