wasmer_wasix/syscalls/wasi/
fd_write.rs

1use std::task::Waker;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::{
6    journal::{JournalEffector, JournalEntry},
7    utils::map_snapshot_err,
8};
9use crate::{net::MAX_SOCKET_PAYLOAD, net::socket::TimeType, syscalls::*};
10
11/// ### `fd_write()`
12/// Write data to the file descriptor
13/// Inputs:
14/// - `Fd`
15///     File descriptor (opened with writing) to write to
16/// - `const __wasi_ciovec_t *iovs`
17///     List of vectors to read data from
18/// - `u32 iovs_len`
19///     Length of data in `iovs`
20/// Output:
21/// - `u32 *nwritten`
22///     Number of bytes written
23/// Errors:
24///
25#[instrument(level = "trace", skip_all, fields(%fd, nwritten = field::Empty), ret)]
26pub fn fd_write<M: MemorySize>(
27    mut ctx: FunctionEnvMut<'_, WasiEnv>,
28    fd: WasiFd,
29    iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
30    iovs_len: M::Offset,
31    nwritten: WasmPtr<M::Offset, M>,
32) -> Result<Errno, WasiError> {
33    WasiEnv::do_pending_operations(&mut ctx)?;
34
35    let env = ctx.data();
36    let enable_journal = env.enable_journal;
37    let fd_entry = {
38        let state = env.state.clone();
39        wasi_try_ok!(state.fs.get_fd(fd))
40    };
41    let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
42
43    let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
44        &mut ctx,
45        fd,
46        fd_entry,
47        FdWriteSource::Iovs { iovs, iovs_len },
48        offset as u64,
49        true,
50        enable_journal,
51    )?);
52
53    Span::current().record("nwritten", bytes_written);
54
55    let mut env = ctx.data();
56    let memory = unsafe { env.memory_view(&ctx) };
57    let nwritten_ref = nwritten.deref(&memory);
58    let bytes_written: M::Offset =
59        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
60    wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
61
62    Ok(Errno::Success)
63}
64
65/// ### `fd_pwrite()`
66/// Write to a file without adjusting its offset
67/// Inputs:
68/// - `Fd`
69///     File descriptor (opened with writing) to write to
70/// - `const __wasi_ciovec_t *iovs`
71///     List of vectors to read data from
72/// - `u32 iovs_len`
73///     Length of data in `iovs`
74/// - `Filesize offset`
75///     The offset to write at
76/// Output:
77/// - `u32 *nwritten`
78///     Number of bytes written
79#[instrument(level = "trace", skip_all, fields(%fd, %offset, nwritten = field::Empty), ret)]
80pub fn fd_pwrite<M: MemorySize>(
81    mut ctx: FunctionEnvMut<'_, WasiEnv>,
82    fd: WasiFd,
83    iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
84    iovs_len: M::Offset,
85    offset: Filesize,
86    nwritten: WasmPtr<M::Offset, M>,
87) -> Result<Errno, WasiError> {
88    WasiEnv::do_pending_operations(&mut ctx)?;
89
90    let enable_snapshot_capture = ctx.data().enable_journal;
91
92    let fd_entry = {
93        let env = ctx.data();
94        let state = env.state.clone();
95        wasi_try_ok!(state.fs.get_fd(fd))
96    };
97    let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
98        &mut ctx,
99        fd,
100        fd_entry,
101        FdWriteSource::Iovs { iovs, iovs_len },
102        offset,
103        false,
104        enable_snapshot_capture,
105    )?);
106
107    Span::current().record("nwritten", bytes_written);
108
109    let mut env = ctx.data();
110    let memory = unsafe { env.memory_view(&ctx) };
111    let nwritten_ref = nwritten.deref(&memory);
112    let bytes_written: M::Offset =
113        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
114    wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
115
116    Ok(Errno::Success)
117}
118
119pub(crate) enum FdWriteSource<'a, M: MemorySize> {
120    Iovs {
121        iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
122        iovs_len: M::Offset,
123    },
124    Buffer(Cow<'a, [u8]>),
125}
126
127impl<'a, M: MemorySize> FdWriteSource<'a, M> {
128    pub(crate) fn coalesce(
129        &self,
130        memory: &MemoryView,
131        max_len: usize,
132    ) -> Result<Cow<'a, [u8]>, Errno> {
133        match self {
134            FdWriteSource::Iovs { iovs, iovs_len } => {
135                let iovs_arr = iovs.slice(memory, *iovs_len).map_err(mem_error_to_wasi)?;
136                let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
137
138                let mut total_len = 0usize;
139                for iov in iovs_arr.iter() {
140                    let len = iov.buf_len.into() as usize;
141                    total_len = total_len.checked_add(len).ok_or(Errno::Msgsize)?;
142                }
143                if total_len > max_len {
144                    return Err(Errno::Msgsize);
145                }
146
147                let mut coalesced = Vec::with_capacity(total_len);
148
149                for iov in iovs_arr.iter() {
150                    let buf = WasmPtr::<u8, M>::new(iov.buf)
151                        .slice(memory, iov.buf_len)
152                        .map_err(mem_error_to_wasi)?
153                        .access()
154                        .map_err(mem_error_to_wasi)?;
155                    coalesced.extend_from_slice(buf.as_ref());
156                }
157
158                Ok(Cow::Owned(coalesced))
159            }
160            FdWriteSource::Buffer(cow) => {
161                if cow.len() > max_len {
162                    return Err(Errno::Msgsize);
163                }
164                Ok(cow.clone())
165            }
166        }
167    }
168}
169
170#[allow(clippy::await_holding_lock)]
171pub(crate) fn fd_write_internal<M: MemorySize>(
172    mut ctx: &mut FunctionEnvMut<'_, WasiEnv>,
173    fd: WasiFd,
174    fd_entry: Fd,
175    data: FdWriteSource<'_, M>,
176    offset: u64,
177    should_update_cursor: bool,
178    should_snapshot: bool,
179) -> Result<Result<usize, Errno>, WasiError> {
180    let mut offset = offset;
181    let mut env = ctx.data();
182    let state = env.state.clone();
183    let is_stdio = fd_entry.is_stdio;
184
185    let bytes_written = {
186        if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_WRITE) {
187            return Ok(Err(Errno::Access));
188        }
189
190        let fd_flags = fd_entry.inner.flags;
191        let mut memory = unsafe { env.memory_view(&ctx) };
192
193        let (bytes_written, is_file, can_snapshot) = {
194            let (mut memory, _) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
195            let mut guard = fd_entry.inode.write();
196            match guard.deref_mut() {
197                Kind::File { handle, .. } => {
198                    if let Some(handle) = handle {
199                        let handle = handle.clone();
200                        drop(guard);
201
202                        let res = __asyncify_light(
203                            env,
204                            if fd_entry.inner.flags.contains(Fdflags::NONBLOCK) {
205                                Some(Duration::ZERO)
206                            } else {
207                                None
208                            },
209                            async {
210                                let mut handle = handle.write().unwrap();
211                                if !is_stdio {
212                                    if fd_entry.inner.flags.contains(Fdflags::APPEND) {
213                                        // `fdflags::append` means we need to seek to the end before writing.
214                                        offset = fd_entry.inode.stat.read().unwrap().st_size;
215                                        fd_entry.inner.offset.store(offset, Ordering::Release);
216                                    }
217
218                                    handle
219                                        .seek(std::io::SeekFrom::Start(offset))
220                                        .await
221                                        .map_err(map_io_err)?;
222                                }
223
224                                let mut written = 0usize;
225
226                                match &data {
227                                    FdWriteSource::Iovs { iovs, iovs_len } => {
228                                        let iovs_arr = iovs
229                                            .slice(&memory, *iovs_len)
230                                            .map_err(mem_error_to_wasi)?;
231                                        let iovs_arr =
232                                            iovs_arr.access().map_err(mem_error_to_wasi)?;
233                                        for iovs in iovs_arr.iter() {
234                                            let buf = WasmPtr::<u8, M>::new(iovs.buf)
235                                                .slice(&memory, iovs.buf_len)
236                                                .map_err(mem_error_to_wasi)?
237                                                .access()
238                                                .map_err(mem_error_to_wasi)?;
239                                            let local_written =
240                                                match handle.write(buf.as_ref()).await {
241                                                    Ok(s) => s,
242                                                    Err(_) if written > 0 => break,
243                                                    Err(err) => return Err(map_io_err(err)),
244                                                };
245                                            written += local_written;
246                                            if local_written != buf.len() {
247                                                break;
248                                            }
249                                        }
250                                    }
251                                    FdWriteSource::Buffer(data) => {
252                                        handle.write_all(data).await?;
253                                        written += data.len();
254                                    }
255                                }
256
257                                if is_stdio {
258                                    handle.flush().await.map_err(map_io_err)?;
259                                }
260                                Ok(written)
261                            },
262                        );
263                        let written = wasi_try_ok_ok!(res?.map_err(|err| match err {
264                            Errno::Timedout => Errno::Again,
265                            a => a,
266                        }));
267
268                        (written, true, true)
269                    } else {
270                        return Ok(Err(Errno::Inval));
271                    }
272                }
273                Kind::Socket { socket } => {
274                    let socket = socket.clone();
275                    drop(guard);
276
277                    let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
278                    let timeout = socket
279                        .opt_time(TimeType::WriteTimeout)
280                        .ok()
281                        .flatten()
282                        .unwrap_or(Duration::from_secs(30));
283
284                    let tasks = env.tasks().clone();
285
286                    let res = __asyncify_light(env, None, async {
287                        let mut sent = 0usize;
288
289                        if socket.is_dgram() {
290                            let data = data.coalesce(&memory, MAX_SOCKET_PAYLOAD)?;
291                            sent += socket
292                                .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
293                                .await?;
294                            return Ok(sent);
295                        }
296
297                        match &data {
298                            FdWriteSource::Iovs { iovs, iovs_len } => {
299                                let iovs_arr =
300                                    iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)?;
301                                let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
302                                for iovs in iovs_arr.iter() {
303                                    let buf = WasmPtr::<u8, M>::new(iovs.buf)
304                                        .slice(&memory, iovs.buf_len)
305                                        .map_err(mem_error_to_wasi)?
306                                        .access()
307                                        .map_err(mem_error_to_wasi)?;
308                                    let local_sent = match socket
309                                        .send(
310                                            tasks.deref(),
311                                            buf.as_ref(),
312                                            Some(timeout),
313                                            nonblocking,
314                                        )
315                                        .await
316                                    {
317                                        Ok(sent) => sent,
318                                        Err(_) if sent > 0 => break,
319                                        Err(err) => return Err(err),
320                                    };
321                                    sent += local_sent;
322                                    if local_sent != buf.len() {
323                                        break;
324                                    }
325                                }
326                            }
327                            FdWriteSource::Buffer(data) => {
328                                sent += socket
329                                    .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
330                                    .await?;
331                            }
332                        }
333                        Ok(sent)
334                    });
335                    let written = wasi_try_ok_ok!(res?);
336                    (written, false, false)
337                }
338                Kind::PipeRx { .. } => {
339                    return Ok(Err(Errno::Badf));
340                }
341                Kind::PipeTx { tx } => {
342                    let mut written = 0usize;
343
344                    match &data {
345                        FdWriteSource::Iovs { iovs, iovs_len } => {
346                            let mut raise_sigpipe = false;
347                            let iovs_arr = wasi_try_ok_ok!(
348                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
349                            );
350                            let iovs_arr =
351                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
352                            for iovs in iovs_arr.iter() {
353                                let buf = wasi_try_ok_ok!(
354                                    WasmPtr::<u8, M>::new(iovs.buf)
355                                        .slice(&memory, iovs.buf_len)
356                                        .map_err(mem_error_to_wasi)
357                                );
358                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
359                                let write_result = std::io::Write::write(tx, buf.as_ref());
360                                let local_written = match write_result {
361                                    Ok(w) => w,
362                                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
363                                        // Need to do this to avoid double borrow on ctx with iovs_arr
364                                        raise_sigpipe = true;
365                                        break;
366                                    }
367                                    Err(e) => return Ok(Err(map_io_err(e))),
368                                };
369
370                                written += local_written;
371                                if local_written != buf.len() {
372                                    break;
373                                }
374                            }
375
376                            drop(iovs_arr);
377
378                            if raise_sigpipe {
379                                env.process.signal_process(Signal::Sigpipe);
380                                wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
381                                return Ok(Err(Errno::Pipe));
382                            }
383                        }
384                        FdWriteSource::Buffer(data) => {
385                            match std::io::Write::write_all(tx, data) {
386                                Ok(()) => (),
387                                Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
388                                    env.process.signal_process(Signal::Sigpipe);
389                                    wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
390                                    return Ok(Err(Errno::Pipe));
391                                }
392                                Err(e) => return Ok(Err(map_io_err(e))),
393                            };
394                            written += data.len();
395                        }
396                    }
397
398                    (written, false, true)
399                }
400                Kind::DuplexPipe { pipe } => {
401                    let mut written = 0usize;
402
403                    match &data {
404                        FdWriteSource::Iovs { iovs, iovs_len } => {
405                            let mut raise_sigpipe = false;
406                            let iovs_arr = wasi_try_ok_ok!(
407                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
408                            );
409                            let iovs_arr =
410                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
411                            for iovs in iovs_arr.iter() {
412                                let buf = wasi_try_ok_ok!(
413                                    WasmPtr::<u8, M>::new(iovs.buf)
414                                        .slice(&memory, iovs.buf_len)
415                                        .map_err(mem_error_to_wasi)
416                                );
417                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
418                                let write_result = std::io::Write::write(pipe, buf.as_ref());
419                                let local_written = match write_result {
420                                    Ok(w) => w,
421                                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
422                                        // Need to do this to avoid double borrow on ctx with iovs_arr
423                                        raise_sigpipe = true;
424                                        break;
425                                    }
426                                    Err(e) => return Ok(Err(map_io_err(e))),
427                                };
428
429                                written += local_written;
430                                if local_written != buf.len() {
431                                    break;
432                                }
433                            }
434
435                            drop(iovs_arr);
436
437                            if raise_sigpipe {
438                                env.process.signal_process(Signal::Sigpipe);
439                                wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
440                                return Ok(Err(Errno::Pipe));
441                            }
442                        }
443                        FdWriteSource::Buffer(data) => {
444                            match std::io::Write::write_all(pipe, data) {
445                                Ok(()) => (),
446                                Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
447                                    env.process.signal_process(Signal::Sigpipe);
448                                    wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
449                                    return Ok(Err(Errno::Pipe));
450                                }
451                                Err(e) => return Ok(Err(map_io_err(e))),
452                            };
453                            written += data.len();
454                        }
455                    }
456
457                    (written, false, true)
458                }
459                Kind::Dir { .. } | Kind::Root { .. } => {
460                    // TODO: verify
461                    return Ok(Err(Errno::Isdir));
462                }
463                Kind::EventNotifications { inner } => {
464                    let mut written = 0usize;
465
466                    match &data {
467                        FdWriteSource::Iovs { iovs, iovs_len } => {
468                            let iovs_arr = wasi_try_ok_ok!(
469                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
470                            );
471                            let iovs_arr =
472                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
473                            for iovs in iovs_arr.iter() {
474                                let buf_len: usize = wasi_try_ok_ok!(
475                                    iovs.buf_len.try_into().map_err(|_| Errno::Inval)
476                                );
477                                let will_be_written = buf_len;
478
479                                let val_cnt = buf_len / std::mem::size_of::<u64>();
480                                let val_cnt: M::Offset =
481                                    wasi_try_ok_ok!(val_cnt.try_into().map_err(|_| Errno::Inval));
482
483                                let vals = wasi_try_ok_ok!(
484                                    WasmPtr::<u64, M>::new(iovs.buf)
485                                        .slice(&memory, val_cnt as M::Offset)
486                                        .map_err(mem_error_to_wasi)
487                                );
488                                let vals =
489                                    wasi_try_ok_ok!(vals.access().map_err(mem_error_to_wasi));
490                                for val in vals.iter() {
491                                    inner.write(*val);
492                                }
493
494                                written += will_be_written;
495                            }
496                        }
497                        FdWriteSource::Buffer(data) => {
498                            let cnt = data.len() / std::mem::size_of::<u64>();
499                            for n in 0..cnt {
500                                let start = n * std::mem::size_of::<u64>();
501                                let data = [
502                                    data[start],
503                                    data[start + 1],
504                                    data[start + 2],
505                                    data[start + 3],
506                                    data[start + 4],
507                                    data[start + 5],
508                                    data[start + 6],
509                                    data[start + 7],
510                                ];
511                                inner.write(u64::from_ne_bytes(data));
512                            }
513                        }
514                    }
515
516                    (written, false, true)
517                }
518                Kind::Symlink { .. } | Kind::Epoll { .. } => return Ok(Err(Errno::Inval)),
519                Kind::Buffer { buffer } => {
520                    let mut written = 0usize;
521
522                    match &data {
523                        FdWriteSource::Iovs { iovs, iovs_len } => {
524                            let iovs_arr = wasi_try_ok_ok!(
525                                iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
526                            );
527                            let iovs_arr =
528                                wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
529                            for iovs in iovs_arr.iter() {
530                                let buf = wasi_try_ok_ok!(
531                                    WasmPtr::<u8, M>::new(iovs.buf)
532                                        .slice(&memory, iovs.buf_len)
533                                        .map_err(mem_error_to_wasi)
534                                );
535                                let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
536                                let local_written = wasi_try_ok_ok!(
537                                    std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
538                                );
539                                written += local_written;
540                                if local_written != buf.len() {
541                                    break;
542                                }
543                            }
544                        }
545                        FdWriteSource::Buffer(data) => {
546                            wasi_try_ok_ok!(
547                                std::io::Write::write_all(buffer, data).map_err(map_io_err)
548                            );
549                            written += data.len();
550                        }
551                    }
552
553                    (written, false, true)
554                }
555            }
556        };
557
558        #[cfg(feature = "journal")]
559        if should_snapshot
560            && can_snapshot
561            && bytes_written > 0
562            && let FdWriteSource::Iovs { iovs, iovs_len } = data
563        {
564            JournalEffector::save_fd_write(ctx, fd, offset, bytes_written, iovs, iovs_len)
565                .map_err(|err| {
566                    tracing::error!("failed to save terminal data - {}", err);
567                    WasiError::Exit(ExitCode::from(Errno::Fault))
568                })?;
569        }
570
571        env = ctx.data();
572        memory = unsafe { env.memory_view(&ctx) };
573
574        // reborrow and update the size
575        if !is_stdio {
576            let curr_offset = if is_file && should_update_cursor {
577                let bytes_written = bytes_written as u64;
578                fd_entry
579                    .inner
580                    .offset
581                    .fetch_add(bytes_written, Ordering::AcqRel)
582                    // fetch_add returns the previous value, we have to add bytes_written again here
583                    + bytes_written
584            } else {
585                fd_entry.inner.offset.load(Ordering::Acquire)
586            };
587
588            // we set the size but we don't return any errors if it fails as
589            // pipes and sockets will not do anything with this
590            let (mut memory, _, inodes) =
591                unsafe { env.get_memory_and_wasi_state_and_inodes(&ctx, 0) };
592            if is_file {
593                let mut stat = fd_entry.inode.stat.write().unwrap();
594                if should_update_cursor {
595                    // If we wrote before the end, the current size is still correct.
596                    // Otherwise, we only got as far as the current cursor. So, the
597                    // max of the two is the correct new size.
598                    stat.st_size = stat.st_size.max(curr_offset);
599                } else {
600                    // pwrite does not update the cursor of the file so to calculate the final
601                    // size of the file we compute where the cursor would have been if it was updated,
602                    // and get the max value between it and the current size.
603                    stat.st_size = stat.st_size.max(offset + bytes_written as u64);
604                }
605            } else {
606                // Cast is valid because we don't support 128 bit systems...
607                fd_entry.inode.stat.write().unwrap().st_size += bytes_written as u64;
608            }
609        }
610        bytes_written
611    };
612
613    Ok(Ok(bytes_written))
614}