wasmer_wasix/syscalls/wasi/
fd_write.rs

1use std::task::Waker;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::{
6    journal::{JournalEffector, JournalEntry},
7    utils::map_snapshot_err,
8};
9use crate::{net::socket::TimeType, syscalls::*};
10
11/// ### `fd_write()`
12/// Write data to the file descriptor
13/// Inputs:
14/// - `Fd`
15///     File descriptor (opened with writing) to write to
16/// - `const __wasi_ciovec_t *iovs`
17///     List of vectors to read data from
18/// - `u32 iovs_len`
19///     Length of data in `iovs`
20/// Output:
21/// - `u32 *nwritten`
22///     Number of bytes written
23/// Errors:
24///
25#[instrument(level = "trace", skip_all, fields(%fd, nwritten = field::Empty), ret)]
26pub fn fd_write<M: MemorySize>(
27    mut ctx: FunctionEnvMut<'_, WasiEnv>,
28    fd: WasiFd,
29    iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
30    iovs_len: M::Offset,
31    nwritten: WasmPtr<M::Offset, M>,
32) -> Result<Errno, WasiError> {
33    WasiEnv::do_pending_operations(&mut ctx)?;
34
35    let env = ctx.data();
36    let enable_journal = env.enable_journal;
37    let offset = {
38        let state = env.state.clone();
39        let inodes = state.inodes.clone();
40
41        let fd_entry = wasi_try_ok!(state.fs.get_fd(fd));
42        fd_entry.inner.offset.load(Ordering::Acquire) as usize
43    };
44
45    let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
46        &mut ctx,
47        fd,
48        FdWriteSource::Iovs { iovs, iovs_len },
49        offset as u64,
50        true,
51        enable_journal,
52    )?);
53
54    Span::current().record("nwritten", bytes_written);
55
56    let mut env = ctx.data();
57    let memory = unsafe { env.memory_view(&ctx) };
58    let nwritten_ref = nwritten.deref(&memory);
59    let bytes_written: M::Offset =
60        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
61    wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
62
63    Ok(Errno::Success)
64}
65
66/// ### `fd_pwrite()`
67/// Write to a file without adjusting its offset
68/// Inputs:
69/// - `Fd`
70///     File descriptor (opened with writing) to write to
71/// - `const __wasi_ciovec_t *iovs`
72///     List of vectors to read data from
73/// - `u32 iovs_len`
74///     Length of data in `iovs`
75/// - `Filesize offset`
76///     The offset to write at
77/// Output:
78/// - `u32 *nwritten`
79///     Number of bytes written
80#[instrument(level = "trace", skip_all, fields(%fd, %offset, nwritten = field::Empty), ret)]
81pub fn fd_pwrite<M: MemorySize>(
82    mut ctx: FunctionEnvMut<'_, WasiEnv>,
83    fd: WasiFd,
84    iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
85    iovs_len: M::Offset,
86    offset: Filesize,
87    nwritten: WasmPtr<M::Offset, M>,
88) -> Result<Errno, WasiError> {
89    WasiEnv::do_pending_operations(&mut ctx)?;
90
91    let enable_snapshot_capture = ctx.data().enable_journal;
92
93    let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
94        &mut ctx,
95        fd,
96        FdWriteSource::Iovs { iovs, iovs_len },
97        offset,
98        false,
99        enable_snapshot_capture,
100    )?);
101
102    Span::current().record("nwritten", bytes_written);
103
104    let mut env = ctx.data();
105    let memory = unsafe { env.memory_view(&ctx) };
106    let nwritten_ref = nwritten.deref(&memory);
107    let bytes_written: M::Offset =
108        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
109    wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
110
111    Ok(Errno::Success)
112}
113
114pub(crate) enum FdWriteSource<'a, M: MemorySize> {
115    Iovs {
116        iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
117        iovs_len: M::Offset,
118    },
119    Buffer(Cow<'a, [u8]>),
120}
121
122#[allow(clippy::await_holding_lock)]
123pub(crate) fn fd_write_internal<M: MemorySize>(
124    mut ctx: &mut FunctionEnvMut<'_, WasiEnv>,
125    fd: WasiFd,
126    data: FdWriteSource<'_, M>,
127    offset: u64,
128    should_update_cursor: bool,
129    should_snapshot: bool,
130) -> Result<Result<usize, Errno>, WasiError> {
131    let mut offset = offset;
132    let mut env = ctx.data();
133    let state = env.state.clone();
134
135    let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(fd));
136    let is_stdio = fd_entry.is_stdio;
137
138    let bytes_written = {
139        if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_WRITE) {
140            return Ok(Err(Errno::Access));
141        }
142
143        let fd_flags = fd_entry.inner.flags;
144        let mut memory = unsafe { env.memory_view(&ctx) };
145
146        let (bytes_written, is_file, can_snapshot) = {
147            let (mut memory, _) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
148            let mut guard = fd_entry.inode.write();
149            match guard.deref_mut() {
150                Kind::File { handle, .. } => {
151                    if let Some(handle) = handle {
152                        let handle = handle.clone();
153                        drop(guard);
154
155                        let res = __asyncify_light(
156                            env,
157                            if fd_entry.inner.flags.contains(Fdflags::NONBLOCK) {
158                                Some(Duration::ZERO)
159                            } else {
160                                None
161                            },
162                            async {
163                                let mut handle = handle.write().unwrap();
164                                if !is_stdio {
165                                    if fd_entry.inner.flags.contains(Fdflags::APPEND) {
166                                        // `fdflags::append` means we need to seek to the end before writing.
167                                        offset = fd_entry.inode.stat.read().unwrap().st_size;
168                                        fd_entry.inner.offset.store(offset, Ordering::Release);
169                                    }
170
171                                    handle
172                                        .seek(std::io::SeekFrom::Start(offset))
173                                        .await
174                                        .map_err(map_io_err)?;
175                                }
176
177                                let mut written = 0usize;
178
179                                match &data {
180                                    FdWriteSource::Iovs { iovs, iovs_len } => {
181                                        let iovs_arr = iovs
182                                            .slice(&memory, *iovs_len)
183                                            .map_err(mem_error_to_wasi)?;
184                                        let iovs_arr =
185                                            iovs_arr.access().map_err(mem_error_to_wasi)?;
186                                        for iovs in iovs_arr.iter() {
187                                            let buf = WasmPtr::<u8, M>::new(iovs.buf)
188                                                .slice(&memory, iovs.buf_len)
189                                                .map_err(mem_error_to_wasi)?
190                                                .access()
191                                                .map_err(mem_error_to_wasi)?;
192                                            let local_written =
193                                                match handle.write(buf.as_ref()).await {
194                                                    Ok(s) => s,
195                                                    Err(_) if written > 0 => break,
196                                                    Err(err) => return Err(map_io_err(err)),
197                                                };
198                                            written += local_written;
199                                            if local_written != buf.len() {
200                                                break;
201                                            }
202                                        }
203                                    }
204                                    FdWriteSource::Buffer(data) => {
205                                        handle.write_all(data).await?;
206                                        written += data.len();
207                                    }
208                                }
209
210                                if is_stdio {
211                                    handle.flush().await.map_err(map_io_err)?;
212                                }
213                                Ok(written)
214                            },
215                        );
216                        let written = wasi_try_ok_ok!(res?.map_err(|err| match err {
217                            Errno::Timedout => Errno::Again,
218                            a => a,
219                        }));
220
221                        (written, true, true)
222                    } else {
223                        return Ok(Err(Errno::Inval));
224                    }
225                }
226                Kind::Socket { socket } => {
227                    let socket = socket.clone();
228                    drop(guard);
229
230                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
231                    let timeout = socket
232                        .opt_time(TimeType::WriteTimeout)
233                        .ok()
234                        .flatten()
235                        .unwrap_or(Duration::from_secs(30));
236
237                    let tasks = env.tasks().clone();
238
239                    let res = __asyncify_light(env, None, async {
240                        let mut sent = 0usize;
241
242                        match &data {
243                            FdWriteSource::Iovs { iovs, iovs_len } => {
244                                let iovs_arr =
245                                    iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)?;
246                                let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
247                                for iovs in iovs_arr.iter() {
248                                    let buf = WasmPtr::<u8, M>::new(iovs.buf)
249                                        .slice(&memory, iovs.buf_len)
250                                        .map_err(mem_error_to_wasi)?
251                                        .access()
252                                        .map_err(mem_error_to_wasi)?;
253                                    let local_sent = socket
254                                        .send(
255                                            tasks.deref(),
256                                            buf.as_ref(),
257                                            Some(timeout),
258                                            nonblocking,
259                                        )
260                                        .await?;
261                                    sent += local_sent;
262                                    if local_sent != buf.len() {
263                                        break;
264                                    }
265                                }
266                            }
267                            FdWriteSource::Buffer(data) => {
268                                sent += socket
269                                    .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
270                                    .await?;
271                            }
272                        }
273                        Ok(sent)
274                    });
275                    let written = wasi_try_ok_ok!(res?);
276                    (written, false, false)
277                }
278                Kind::PipeRx { .. } => {
279                    return Ok(Err(Errno::Badf));
280                }
281                Kind::PipeTx { tx } => {
282                    let mut written = 0usize;
283
284                    match &data {
285                        FdWriteSource::Iovs { iovs, iovs_len } => {
286                            let mut raise_sigpipe = false;
287                            let iovs_arr = wasi_try_ok_ok!(
288                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
289                            );
290                            let iovs_arr =
291                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
292                            for iovs in iovs_arr.iter() {
293                                let buf = wasi_try_ok_ok!(
294                                    WasmPtr::<u8, M>::new(iovs.buf)
295                                        .slice(&memory, iovs.buf_len)
296                                        .map_err(mem_error_to_wasi)
297                                );
298                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
299                                let write_result = std::io::Write::write(tx, buf.as_ref());
300                                let local_written = match write_result {
301                                    Ok(w) => w,
302                                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
303                                        // Need to do this to avoid double borrow on ctx with iovs_arr
304                                        raise_sigpipe = true;
305                                        break;
306                                    }
307                                    Err(e) => return Ok(Err(map_io_err(e))),
308                                };
309
310                                written += local_written;
311                                if local_written != buf.len() {
312                                    break;
313                                }
314                            }
315
316                            drop(iovs_arr);
317
318                            if raise_sigpipe {
319                                env.process.signal_process(Signal::Sigpipe);
320                                wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
321                                return Ok(Err(Errno::Pipe));
322                            }
323                        }
324                        FdWriteSource::Buffer(data) => {
325                            match std::io::Write::write_all(tx, data) {
326                                Ok(()) => (),
327                                Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
328                                    env.process.signal_process(Signal::Sigpipe);
329                                    wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
330                                    return Ok(Err(Errno::Pipe));
331                                }
332                                Err(e) => return Ok(Err(map_io_err(e))),
333                            };
334                            written += data.len();
335                        }
336                    }
337
338                    (written, false, true)
339                }
340                Kind::DuplexPipe { pipe } => {
341                    let mut written = 0usize;
342
343                    match &data {
344                        FdWriteSource::Iovs { iovs, iovs_len } => {
345                            let mut raise_sigpipe = false;
346                            let iovs_arr = wasi_try_ok_ok!(
347                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
348                            );
349                            let iovs_arr =
350                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
351                            for iovs in iovs_arr.iter() {
352                                let buf = wasi_try_ok_ok!(
353                                    WasmPtr::<u8, M>::new(iovs.buf)
354                                        .slice(&memory, iovs.buf_len)
355                                        .map_err(mem_error_to_wasi)
356                                );
357                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
358                                let write_result = std::io::Write::write(pipe, buf.as_ref());
359                                let local_written = match write_result {
360                                    Ok(w) => w,
361                                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
362                                        // Need to do this to avoid double borrow on ctx with iovs_arr
363                                        raise_sigpipe = true;
364                                        break;
365                                    }
366                                    Err(e) => return Ok(Err(map_io_err(e))),
367                                };
368
369                                written += local_written;
370                                if local_written != buf.len() {
371                                    break;
372                                }
373                            }
374
375                            drop(iovs_arr);
376
377                            if raise_sigpipe {
378                                env.process.signal_process(Signal::Sigpipe);
379                                wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
380                                return Ok(Err(Errno::Pipe));
381                            }
382                        }
383                        FdWriteSource::Buffer(data) => {
384                            match std::io::Write::write_all(pipe, data) {
385                                Ok(()) => (),
386                                Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
387                                    env.process.signal_process(Signal::Sigpipe);
388                                    wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
389                                    return Ok(Err(Errno::Pipe));
390                                }
391                                Err(e) => return Ok(Err(map_io_err(e))),
392                            };
393                            written += data.len();
394                        }
395                    }
396
397                    (written, false, true)
398                }
399                Kind::Dir { .. } | Kind::Root { .. } => {
400                    // TODO: verify
401                    return Ok(Err(Errno::Isdir));
402                }
403                Kind::EventNotifications { inner } => {
404                    let mut written = 0usize;
405
406                    match &data {
407                        FdWriteSource::Iovs { iovs, iovs_len } => {
408                            let iovs_arr = wasi_try_ok_ok!(
409                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
410                            );
411                            let iovs_arr =
412                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
413                            for iovs in iovs_arr.iter() {
414                                let buf_len: usize = wasi_try_ok_ok!(
415                                    iovs.buf_len.try_into().map_err(|_| Errno::Inval)
416                                );
417                                let will_be_written = buf_len;
418
419                                let val_cnt = buf_len / std::mem::size_of::<u64>();
420                                let val_cnt: M::Offset =
421                                    wasi_try_ok_ok!(val_cnt.try_into().map_err(|_| Errno::Inval));
422
423                                let vals = wasi_try_ok_ok!(
424                                    WasmPtr::<u64, M>::new(iovs.buf)
425                                        .slice(&memory, val_cnt as M::Offset)
426                                        .map_err(mem_error_to_wasi)
427                                );
428                                let vals =
429                                    wasi_try_ok_ok!(vals.access().map_err(mem_error_to_wasi));
430                                for val in vals.iter() {
431                                    inner.write(*val);
432                                }
433
434                                written += will_be_written;
435                            }
436                        }
437                        FdWriteSource::Buffer(data) => {
438                            let cnt = data.len() / std::mem::size_of::<u64>();
439                            for n in 0..cnt {
440                                let start = n * std::mem::size_of::<u64>();
441                                let data = [
442                                    data[start],
443                                    data[start + 1],
444                                    data[start + 2],
445                                    data[start + 3],
446                                    data[start + 4],
447                                    data[start + 5],
448                                    data[start + 6],
449                                    data[start + 7],
450                                ];
451                                inner.write(u64::from_ne_bytes(data));
452                            }
453                        }
454                    }
455
456                    (written, false, true)
457                }
458                Kind::Symlink { .. } | Kind::Epoll { .. } => return Ok(Err(Errno::Inval)),
459                Kind::Buffer { buffer } => {
460                    let mut written = 0usize;
461
462                    match &data {
463                        FdWriteSource::Iovs { iovs, iovs_len } => {
464                            let iovs_arr = wasi_try_ok_ok!(
465                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
466                            );
467                            let iovs_arr =
468                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
469                            for iovs in iovs_arr.iter() {
470                                let buf = wasi_try_ok_ok!(
471                                    WasmPtr::<u8, M>::new(iovs.buf)
472                                        .slice(&memory, iovs.buf_len)
473                                        .map_err(mem_error_to_wasi)
474                                );
475                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
476                                let local_written = wasi_try_ok_ok!(
477                                    std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
478                                );
479                                written += local_written;
480                                if local_written != buf.len() {
481                                    break;
482                                }
483                            }
484                        }
485                        FdWriteSource::Buffer(data) => {
486                            wasi_try_ok_ok!(
487                                std::io::Write::write_all(buffer, data).map_err(map_io_err)
488                            );
489                            written += data.len();
490                        }
491                    }
492
493                    (written, false, true)
494                }
495            }
496        };
497
498        #[cfg(feature = "journal")]
499        if should_snapshot
500            && can_snapshot
501            && bytes_written > 0
502            && let FdWriteSource::Iovs { iovs, iovs_len } = data
503        {
504            JournalEffector::save_fd_write(ctx, fd, offset, bytes_written, iovs, iovs_len)
505                .map_err(|err| {
506                    tracing::error!("failed to save terminal data - {}", err);
507                    WasiError::Exit(ExitCode::from(Errno::Fault))
508                })?;
509        }
510
511        env = ctx.data();
512        memory = unsafe { env.memory_view(&ctx) };
513
514        // reborrow and update the size
515        if !is_stdio {
516            let curr_offset = if is_file && should_update_cursor {
517                let bytes_written = bytes_written as u64;
518                let mut fd_map = state.fs.fd_map.write().unwrap();
519                let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(fd).ok_or(Errno::Badf));
520                fd_entry
521                    .offset
522                    .fetch_add(bytes_written, Ordering::AcqRel)
523                    // fetch_add returns the previous value, we have to add bytes_written again here
524                    + bytes_written
525            } else {
526                fd_entry.inner.offset.load(Ordering::Acquire)
527            };
528
529            // we set the size but we don't return any errors if it fails as
530            // pipes and sockets will not do anything with this
531            let (mut memory, _, inodes) =
532                unsafe { env.get_memory_and_wasi_state_and_inodes(&ctx, 0) };
533            if is_file {
534                let mut stat = fd_entry.inode.stat.write().unwrap();
535                if should_update_cursor {
536                    // If we wrote before the end, the current size is still correct.
537                    // Otherwise, we only got as far as the current cursor. So, the
538                    // max of the two is the correct new size.
539                    stat.st_size = stat.st_size.max(curr_offset);
540                } else {
541                    // pwrite does not update the cursor of the file so to calculate the final
542                    // size of the file we compute where the cursor would have been if it was updated,
543                    // and get the max value between it and the current size.
544                    stat.st_size = stat.st_size.max(offset + bytes_written as u64);
545                }
546            } else {
547                // Cast is valid because we don't support 128 bit systems...
548                fd_entry.inode.stat.write().unwrap().st_size += bytes_written as u64;
549            }
550        }
551        bytes_written
552    };
553
554    Ok(Ok(bytes_written))
555}