wasmer_wasix/syscalls/wasi/
fd_write.rs

1use std::task::Waker;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::{
6    journal::{JournalEffector, JournalEntry},
7    utils::map_snapshot_err,
8};
9use crate::{net::socket::TimeType, syscalls::*};
10
11/// ### `fd_write()`
12/// Write data to the file descriptor
13/// Inputs:
14/// - `Fd`
15///     File descriptor (opened with writing) to write to
16/// - `const __wasi_ciovec_t *iovs`
17///     List of vectors to read data from
18/// - `u32 iovs_len`
19///     Length of data in `iovs`
20/// Output:
21/// - `u32 *nwritten`
22///     Number of bytes written
23/// Errors:
24///
25#[instrument(level = "trace", skip_all, fields(%fd, nwritten = field::Empty), ret)]
26pub fn fd_write<M: MemorySize>(
27    mut ctx: FunctionEnvMut<'_, WasiEnv>,
28    fd: WasiFd,
29    iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
30    iovs_len: M::Offset,
31    nwritten: WasmPtr<M::Offset, M>,
32) -> Result<Errno, WasiError> {
33    WasiEnv::do_pending_operations(&mut ctx)?;
34
35    let env = ctx.data();
36    let enable_journal = env.enable_journal;
37    let fd_entry = {
38        let state = env.state.clone();
39        wasi_try_ok!(state.fs.get_fd(fd))
40    };
41    let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
42
43    let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
44        &mut ctx,
45        fd,
46        fd_entry,
47        FdWriteSource::Iovs { iovs, iovs_len },
48        offset as u64,
49        true,
50        enable_journal,
51    )?);
52
53    Span::current().record("nwritten", bytes_written);
54
55    let mut env = ctx.data();
56    let memory = unsafe { env.memory_view(&ctx) };
57    let nwritten_ref = nwritten.deref(&memory);
58    let bytes_written: M::Offset =
59        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
60    wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
61
62    Ok(Errno::Success)
63}
64
65/// ### `fd_pwrite()`
66/// Write to a file without adjusting its offset
67/// Inputs:
68/// - `Fd`
69///     File descriptor (opened with writing) to write to
70/// - `const __wasi_ciovec_t *iovs`
71///     List of vectors to read data from
72/// - `u32 iovs_len`
73///     Length of data in `iovs`
74/// - `Filesize offset`
75///     The offset to write at
76/// Output:
77/// - `u32 *nwritten`
78///     Number of bytes written
79#[instrument(level = "trace", skip_all, fields(%fd, %offset, nwritten = field::Empty), ret)]
80pub fn fd_pwrite<M: MemorySize>(
81    mut ctx: FunctionEnvMut<'_, WasiEnv>,
82    fd: WasiFd,
83    iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
84    iovs_len: M::Offset,
85    offset: Filesize,
86    nwritten: WasmPtr<M::Offset, M>,
87) -> Result<Errno, WasiError> {
88    WasiEnv::do_pending_operations(&mut ctx)?;
89
90    let enable_snapshot_capture = ctx.data().enable_journal;
91
92    let fd_entry = {
93        let env = ctx.data();
94        let state = env.state.clone();
95        wasi_try_ok!(state.fs.get_fd(fd))
96    };
97    let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
98        &mut ctx,
99        fd,
100        fd_entry,
101        FdWriteSource::Iovs { iovs, iovs_len },
102        offset,
103        false,
104        enable_snapshot_capture,
105    )?);
106
107    Span::current().record("nwritten", bytes_written);
108
109    let mut env = ctx.data();
110    let memory = unsafe { env.memory_view(&ctx) };
111    let nwritten_ref = nwritten.deref(&memory);
112    let bytes_written: M::Offset =
113        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
114    wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
115
116    Ok(Errno::Success)
117}
118
119pub(crate) enum FdWriteSource<'a, M: MemorySize> {
120    Iovs {
121        iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
122        iovs_len: M::Offset,
123    },
124    Buffer(Cow<'a, [u8]>),
125}
126
127#[allow(clippy::await_holding_lock)]
128pub(crate) fn fd_write_internal<M: MemorySize>(
129    mut ctx: &mut FunctionEnvMut<'_, WasiEnv>,
130    fd: WasiFd,
131    fd_entry: Fd,
132    data: FdWriteSource<'_, M>,
133    offset: u64,
134    should_update_cursor: bool,
135    should_snapshot: bool,
136) -> Result<Result<usize, Errno>, WasiError> {
137    let mut offset = offset;
138    let mut env = ctx.data();
139    let state = env.state.clone();
140    let is_stdio = fd_entry.is_stdio;
141
142    let bytes_written = {
143        if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_WRITE) {
144            return Ok(Err(Errno::Access));
145        }
146
147        let fd_flags = fd_entry.inner.flags;
148        let mut memory = unsafe { env.memory_view(&ctx) };
149
150        let (bytes_written, is_file, can_snapshot) = {
151            let (mut memory, _) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
152            let mut guard = fd_entry.inode.write();
153            match guard.deref_mut() {
154                Kind::File { handle, .. } => {
155                    if let Some(handle) = handle {
156                        let handle = handle.clone();
157                        drop(guard);
158
159                        let res = __asyncify_light(
160                            env,
161                            if fd_entry.inner.flags.contains(Fdflags::NONBLOCK) {
162                                Some(Duration::ZERO)
163                            } else {
164                                None
165                            },
166                            async {
167                                let mut handle = handle.write().unwrap();
168                                if !is_stdio {
169                                    if fd_entry.inner.flags.contains(Fdflags::APPEND) {
170                                        // `fdflags::append` means we need to seek to the end before writing.
171                                        offset = fd_entry.inode.stat.read().unwrap().st_size;
172                                        fd_entry.inner.offset.store(offset, Ordering::Release);
173                                    }
174
175                                    handle
176                                        .seek(std::io::SeekFrom::Start(offset))
177                                        .await
178                                        .map_err(map_io_err)?;
179                                }
180
181                                let mut written = 0usize;
182
183                                match &data {
184                                    FdWriteSource::Iovs { iovs, iovs_len } => {
185                                        let iovs_arr = iovs
186                                            .slice(&memory, *iovs_len)
187                                            .map_err(mem_error_to_wasi)?;
188                                        let iovs_arr =
189                                            iovs_arr.access().map_err(mem_error_to_wasi)?;
190                                        for iovs in iovs_arr.iter() {
191                                            let buf = WasmPtr::<u8, M>::new(iovs.buf)
192                                                .slice(&memory, iovs.buf_len)
193                                                .map_err(mem_error_to_wasi)?
194                                                .access()
195                                                .map_err(mem_error_to_wasi)?;
196                                            let local_written =
197                                                match handle.write(buf.as_ref()).await {
198                                                    Ok(s) => s,
199                                                    Err(_) if written > 0 => break,
200                                                    Err(err) => return Err(map_io_err(err)),
201                                                };
202                                            written += local_written;
203                                            if local_written != buf.len() {
204                                                break;
205                                            }
206                                        }
207                                    }
208                                    FdWriteSource::Buffer(data) => {
209                                        handle.write_all(data).await?;
210                                        written += data.len();
211                                    }
212                                }
213
214                                if is_stdio {
215                                    handle.flush().await.map_err(map_io_err)?;
216                                }
217                                Ok(written)
218                            },
219                        );
220                        let written = wasi_try_ok_ok!(res?.map_err(|err| match err {
221                            Errno::Timedout => Errno::Again,
222                            a => a,
223                        }));
224
225                        (written, true, true)
226                    } else {
227                        return Ok(Err(Errno::Inval));
228                    }
229                }
230                Kind::Socket { socket } => {
231                    let socket = socket.clone();
232                    drop(guard);
233
234                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
235                    let timeout = socket
236                        .opt_time(TimeType::WriteTimeout)
237                        .ok()
238                        .flatten()
239                        .unwrap_or(Duration::from_secs(30));
240
241                    let tasks = env.tasks().clone();
242
243                    let res = __asyncify_light(env, None, async {
244                        let mut sent = 0usize;
245
246                        match &data {
247                            FdWriteSource::Iovs { iovs, iovs_len } => {
248                                let iovs_arr =
249                                    iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)?;
250                                let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
251                                for iovs in iovs_arr.iter() {
252                                    let buf = WasmPtr::<u8, M>::new(iovs.buf)
253                                        .slice(&memory, iovs.buf_len)
254                                        .map_err(mem_error_to_wasi)?
255                                        .access()
256                                        .map_err(mem_error_to_wasi)?;
257                                    let local_sent = socket
258                                        .send(
259                                            tasks.deref(),
260                                            buf.as_ref(),
261                                            Some(timeout),
262                                            nonblocking,
263                                        )
264                                        .await?;
265                                    sent += local_sent;
266                                    if local_sent != buf.len() {
267                                        break;
268                                    }
269                                }
270                            }
271                            FdWriteSource::Buffer(data) => {
272                                sent += socket
273                                    .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
274                                    .await?;
275                            }
276                        }
277                        Ok(sent)
278                    });
279                    let written = wasi_try_ok_ok!(res?);
280                    (written, false, false)
281                }
282                Kind::PipeRx { .. } => {
283                    return Ok(Err(Errno::Badf));
284                }
285                Kind::PipeTx { tx } => {
286                    let mut written = 0usize;
287
288                    match &data {
289                        FdWriteSource::Iovs { iovs, iovs_len } => {
290                            let mut raise_sigpipe = false;
291                            let iovs_arr = wasi_try_ok_ok!(
292                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
293                            );
294                            let iovs_arr =
295                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
296                            for iovs in iovs_arr.iter() {
297                                let buf = wasi_try_ok_ok!(
298                                    WasmPtr::<u8, M>::new(iovs.buf)
299                                        .slice(&memory, iovs.buf_len)
300                                        .map_err(mem_error_to_wasi)
301                                );
302                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
303                                let write_result = std::io::Write::write(tx, buf.as_ref());
304                                let local_written = match write_result {
305                                    Ok(w) => w,
306                                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
307                                        // Need to do this to avoid double borrow on ctx with iovs_arr
308                                        raise_sigpipe = true;
309                                        break;
310                                    }
311                                    Err(e) => return Ok(Err(map_io_err(e))),
312                                };
313
314                                written += local_written;
315                                if local_written != buf.len() {
316                                    break;
317                                }
318                            }
319
320                            drop(iovs_arr);
321
322                            if raise_sigpipe {
323                                env.process.signal_process(Signal::Sigpipe);
324                                wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
325                                return Ok(Err(Errno::Pipe));
326                            }
327                        }
328                        FdWriteSource::Buffer(data) => {
329                            match std::io::Write::write_all(tx, data) {
330                                Ok(()) => (),
331                                Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
332                                    env.process.signal_process(Signal::Sigpipe);
333                                    wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
334                                    return Ok(Err(Errno::Pipe));
335                                }
336                                Err(e) => return Ok(Err(map_io_err(e))),
337                            };
338                            written += data.len();
339                        }
340                    }
341
342                    (written, false, true)
343                }
344                Kind::DuplexPipe { pipe } => {
345                    let mut written = 0usize;
346
347                    match &data {
348                        FdWriteSource::Iovs { iovs, iovs_len } => {
349                            let mut raise_sigpipe = false;
350                            let iovs_arr = wasi_try_ok_ok!(
351                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
352                            );
353                            let iovs_arr =
354                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
355                            for iovs in iovs_arr.iter() {
356                                let buf = wasi_try_ok_ok!(
357                                    WasmPtr::<u8, M>::new(iovs.buf)
358                                        .slice(&memory, iovs.buf_len)
359                                        .map_err(mem_error_to_wasi)
360                                );
361                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
362                                let write_result = std::io::Write::write(pipe, buf.as_ref());
363                                let local_written = match write_result {
364                                    Ok(w) => w,
365                                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
366                                        // Need to do this to avoid double borrow on ctx with iovs_arr
367                                        raise_sigpipe = true;
368                                        break;
369                                    }
370                                    Err(e) => return Ok(Err(map_io_err(e))),
371                                };
372
373                                written += local_written;
374                                if local_written != buf.len() {
375                                    break;
376                                }
377                            }
378
379                            drop(iovs_arr);
380
381                            if raise_sigpipe {
382                                env.process.signal_process(Signal::Sigpipe);
383                                wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
384                                return Ok(Err(Errno::Pipe));
385                            }
386                        }
387                        FdWriteSource::Buffer(data) => {
388                            match std::io::Write::write_all(pipe, data) {
389                                Ok(()) => (),
390                                Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
391                                    env.process.signal_process(Signal::Sigpipe);
392                                    wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
393                                    return Ok(Err(Errno::Pipe));
394                                }
395                                Err(e) => return Ok(Err(map_io_err(e))),
396                            };
397                            written += data.len();
398                        }
399                    }
400
401                    (written, false, true)
402                }
403                Kind::Dir { .. } | Kind::Root { .. } => {
404                    // TODO: verify
405                    return Ok(Err(Errno::Isdir));
406                }
407                Kind::EventNotifications { inner } => {
408                    let mut written = 0usize;
409
410                    match &data {
411                        FdWriteSource::Iovs { iovs, iovs_len } => {
412                            let iovs_arr = wasi_try_ok_ok!(
413                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
414                            );
415                            let iovs_arr =
416                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
417                            for iovs in iovs_arr.iter() {
418                                let buf_len: usize = wasi_try_ok_ok!(
419                                    iovs.buf_len.try_into().map_err(|_| Errno::Inval)
420                                );
421                                let will_be_written = buf_len;
422
423                                let val_cnt = buf_len / std::mem::size_of::<u64>();
424                                let val_cnt: M::Offset =
425                                    wasi_try_ok_ok!(val_cnt.try_into().map_err(|_| Errno::Inval));
426
427                                let vals = wasi_try_ok_ok!(
428                                    WasmPtr::<u64, M>::new(iovs.buf)
429                                        .slice(&memory, val_cnt as M::Offset)
430                                        .map_err(mem_error_to_wasi)
431                                );
432                                let vals =
433                                    wasi_try_ok_ok!(vals.access().map_err(mem_error_to_wasi));
434                                for val in vals.iter() {
435                                    inner.write(*val);
436                                }
437
438                                written += will_be_written;
439                            }
440                        }
441                        FdWriteSource::Buffer(data) => {
442                            let cnt = data.len() / std::mem::size_of::<u64>();
443                            for n in 0..cnt {
444                                let start = n * std::mem::size_of::<u64>();
445                                let data = [
446                                    data[start],
447                                    data[start + 1],
448                                    data[start + 2],
449                                    data[start + 3],
450                                    data[start + 4],
451                                    data[start + 5],
452                                    data[start + 6],
453                                    data[start + 7],
454                                ];
455                                inner.write(u64::from_ne_bytes(data));
456                            }
457                        }
458                    }
459
460                    (written, false, true)
461                }
462                Kind::Symlink { .. } | Kind::Epoll { .. } => return Ok(Err(Errno::Inval)),
463                Kind::Buffer { buffer } => {
464                    let mut written = 0usize;
465
466                    match &data {
467                        FdWriteSource::Iovs { iovs, iovs_len } => {
468                            let iovs_arr = wasi_try_ok_ok!(
469                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
470                            );
471                            let iovs_arr =
472                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
473                            for iovs in iovs_arr.iter() {
474                                let buf = wasi_try_ok_ok!(
475                                    WasmPtr::<u8, M>::new(iovs.buf)
476                                        .slice(&memory, iovs.buf_len)
477                                        .map_err(mem_error_to_wasi)
478                                );
479                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
480                                let local_written = wasi_try_ok_ok!(
481                                    std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
482                                );
483                                written += local_written;
484                                if local_written != buf.len() {
485                                    break;
486                                }
487                            }
488                        }
489                        FdWriteSource::Buffer(data) => {
490                            wasi_try_ok_ok!(
491                                std::io::Write::write_all(buffer, data).map_err(map_io_err)
492                            );
493                            written += data.len();
494                        }
495                    }
496
497                    (written, false, true)
498                }
499            }
500        };
501
502        #[cfg(feature = "journal")]
503        if should_snapshot
504            && can_snapshot
505            && bytes_written > 0
506            && let FdWriteSource::Iovs { iovs, iovs_len } = data
507        {
508            JournalEffector::save_fd_write(ctx, fd, offset, bytes_written, iovs, iovs_len)
509                .map_err(|err| {
510                    tracing::error!("failed to save terminal data - {}", err);
511                    WasiError::Exit(ExitCode::from(Errno::Fault))
512                })?;
513        }
514
515        env = ctx.data();
516        memory = unsafe { env.memory_view(&ctx) };
517
518        // reborrow and update the size
519        if !is_stdio {
520            let curr_offset = if is_file && should_update_cursor {
521                let bytes_written = bytes_written as u64;
522                fd_entry
523                    .inner
524                    .offset
525                    .fetch_add(bytes_written, Ordering::AcqRel)
526                    // fetch_add returns the previous value, we have to add bytes_written again here
527                    + bytes_written
528            } else {
529                fd_entry.inner.offset.load(Ordering::Acquire)
530            };
531
532            // we set the size but we don't return any errors if it fails as
533            // pipes and sockets will not do anything with this
534            let (mut memory, _, inodes) =
535                unsafe { env.get_memory_and_wasi_state_and_inodes(&ctx, 0) };
536            if is_file {
537                let mut stat = fd_entry.inode.stat.write().unwrap();
538                if should_update_cursor {
539                    // If we wrote before the end, the current size is still correct.
540                    // Otherwise, we only got as far as the current cursor. So, the
541                    // max of the two is the correct new size.
542                    stat.st_size = stat.st_size.max(curr_offset);
543                } else {
544                    // pwrite does not update the cursor of the file so to calculate the final
545                    // size of the file we compute where the cursor would have been if it was updated,
546                    // and get the max value between it and the current size.
547                    stat.st_size = stat.st_size.max(offset + bytes_written as u64);
548                }
549            } else {
550                // Cast is valid because we don't support 128 bit systems...
551                fd_entry.inode.stat.write().unwrap().st_size += bytes_written as u64;
552            }
553        }
554        bytes_written
555    };
556
557    Ok(Ok(bytes_written))
558}