wasmer_wasix/syscalls/wasi/
fd_write.rs1use std::task::Waker;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::{
6 journal::{JournalEffector, JournalEntry},
7 utils::map_snapshot_err,
8};
9use crate::{net::socket::TimeType, syscalls::*};
10
11#[instrument(level = "trace", skip_all, fields(%fd, nwritten = field::Empty), ret)]
26pub fn fd_write<M: MemorySize>(
27 mut ctx: FunctionEnvMut<'_, WasiEnv>,
28 fd: WasiFd,
29 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
30 iovs_len: M::Offset,
31 nwritten: WasmPtr<M::Offset, M>,
32) -> Result<Errno, WasiError> {
33 WasiEnv::do_pending_operations(&mut ctx)?;
34
35 let env = ctx.data();
36 let enable_journal = env.enable_journal;
37 let fd_entry = {
38 let state = env.state.clone();
39 wasi_try_ok!(state.fs.get_fd(fd))
40 };
41 let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
42
43 let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
44 &mut ctx,
45 fd,
46 fd_entry,
47 FdWriteSource::Iovs { iovs, iovs_len },
48 offset as u64,
49 true,
50 enable_journal,
51 )?);
52
53 Span::current().record("nwritten", bytes_written);
54
55 let mut env = ctx.data();
56 let memory = unsafe { env.memory_view(&ctx) };
57 let nwritten_ref = nwritten.deref(&memory);
58 let bytes_written: M::Offset =
59 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
60 wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
61
62 Ok(Errno::Success)
63}
64
65#[instrument(level = "trace", skip_all, fields(%fd, %offset, nwritten = field::Empty), ret)]
80pub fn fd_pwrite<M: MemorySize>(
81 mut ctx: FunctionEnvMut<'_, WasiEnv>,
82 fd: WasiFd,
83 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
84 iovs_len: M::Offset,
85 offset: Filesize,
86 nwritten: WasmPtr<M::Offset, M>,
87) -> Result<Errno, WasiError> {
88 WasiEnv::do_pending_operations(&mut ctx)?;
89
90 let enable_snapshot_capture = ctx.data().enable_journal;
91
92 let fd_entry = {
93 let env = ctx.data();
94 let state = env.state.clone();
95 wasi_try_ok!(state.fs.get_fd(fd))
96 };
97 let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
98 &mut ctx,
99 fd,
100 fd_entry,
101 FdWriteSource::Iovs { iovs, iovs_len },
102 offset,
103 false,
104 enable_snapshot_capture,
105 )?);
106
107 Span::current().record("nwritten", bytes_written);
108
109 let mut env = ctx.data();
110 let memory = unsafe { env.memory_view(&ctx) };
111 let nwritten_ref = nwritten.deref(&memory);
112 let bytes_written: M::Offset =
113 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
114 wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
115
116 Ok(Errno::Success)
117}
118
119pub(crate) enum FdWriteSource<'a, M: MemorySize> {
120 Iovs {
121 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
122 iovs_len: M::Offset,
123 },
124 Buffer(Cow<'a, [u8]>),
125}
126
127#[allow(clippy::await_holding_lock)]
128pub(crate) fn fd_write_internal<M: MemorySize>(
129 mut ctx: &mut FunctionEnvMut<'_, WasiEnv>,
130 fd: WasiFd,
131 fd_entry: Fd,
132 data: FdWriteSource<'_, M>,
133 offset: u64,
134 should_update_cursor: bool,
135 should_snapshot: bool,
136) -> Result<Result<usize, Errno>, WasiError> {
137 let mut offset = offset;
138 let mut env = ctx.data();
139 let state = env.state.clone();
140 let is_stdio = fd_entry.is_stdio;
141
142 let bytes_written = {
143 if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_WRITE) {
144 return Ok(Err(Errno::Access));
145 }
146
147 let fd_flags = fd_entry.inner.flags;
148 let mut memory = unsafe { env.memory_view(&ctx) };
149
150 let (bytes_written, is_file, can_snapshot) = {
151 let (mut memory, _) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
152 let mut guard = fd_entry.inode.write();
153 match guard.deref_mut() {
154 Kind::File { handle, .. } => {
155 if let Some(handle) = handle {
156 let handle = handle.clone();
157 drop(guard);
158
159 let res = __asyncify_light(
160 env,
161 if fd_entry.inner.flags.contains(Fdflags::NONBLOCK) {
162 Some(Duration::ZERO)
163 } else {
164 None
165 },
166 async {
167 let mut handle = handle.write().unwrap();
168 if !is_stdio {
169 if fd_entry.inner.flags.contains(Fdflags::APPEND) {
170 offset = fd_entry.inode.stat.read().unwrap().st_size;
172 fd_entry.inner.offset.store(offset, Ordering::Release);
173 }
174
175 handle
176 .seek(std::io::SeekFrom::Start(offset))
177 .await
178 .map_err(map_io_err)?;
179 }
180
181 let mut written = 0usize;
182
183 match &data {
184 FdWriteSource::Iovs { iovs, iovs_len } => {
185 let iovs_arr = iovs
186 .slice(&memory, *iovs_len)
187 .map_err(mem_error_to_wasi)?;
188 let iovs_arr =
189 iovs_arr.access().map_err(mem_error_to_wasi)?;
190 for iovs in iovs_arr.iter() {
191 let buf = WasmPtr::<u8, M>::new(iovs.buf)
192 .slice(&memory, iovs.buf_len)
193 .map_err(mem_error_to_wasi)?
194 .access()
195 .map_err(mem_error_to_wasi)?;
196 let local_written =
197 match handle.write(buf.as_ref()).await {
198 Ok(s) => s,
199 Err(_) if written > 0 => break,
200 Err(err) => return Err(map_io_err(err)),
201 };
202 written += local_written;
203 if local_written != buf.len() {
204 break;
205 }
206 }
207 }
208 FdWriteSource::Buffer(data) => {
209 handle.write_all(data).await?;
210 written += data.len();
211 }
212 }
213
214 if is_stdio {
215 handle.flush().await.map_err(map_io_err)?;
216 }
217 Ok(written)
218 },
219 );
220 let written = wasi_try_ok_ok!(res?.map_err(|err| match err {
221 Errno::Timedout => Errno::Again,
222 a => a,
223 }));
224
225 (written, true, true)
226 } else {
227 return Ok(Err(Errno::Inval));
228 }
229 }
230 Kind::Socket { socket } => {
231 let socket = socket.clone();
232 drop(guard);
233
234 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
235 let timeout = socket
236 .opt_time(TimeType::WriteTimeout)
237 .ok()
238 .flatten()
239 .unwrap_or(Duration::from_secs(30));
240
241 let tasks = env.tasks().clone();
242
243 let res = __asyncify_light(env, None, async {
244 let mut sent = 0usize;
245
246 match &data {
247 FdWriteSource::Iovs { iovs, iovs_len } => {
248 let iovs_arr =
249 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)?;
250 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
251 for iovs in iovs_arr.iter() {
252 let buf = WasmPtr::<u8, M>::new(iovs.buf)
253 .slice(&memory, iovs.buf_len)
254 .map_err(mem_error_to_wasi)?
255 .access()
256 .map_err(mem_error_to_wasi)?;
257 let local_sent = socket
258 .send(
259 tasks.deref(),
260 buf.as_ref(),
261 Some(timeout),
262 nonblocking,
263 )
264 .await?;
265 sent += local_sent;
266 if local_sent != buf.len() {
267 break;
268 }
269 }
270 }
271 FdWriteSource::Buffer(data) => {
272 sent += socket
273 .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
274 .await?;
275 }
276 }
277 Ok(sent)
278 });
279 let written = wasi_try_ok_ok!(res?);
280 (written, false, false)
281 }
282 Kind::PipeRx { .. } => {
283 return Ok(Err(Errno::Badf));
284 }
285 Kind::PipeTx { tx } => {
286 let mut written = 0usize;
287
288 match &data {
289 FdWriteSource::Iovs { iovs, iovs_len } => {
290 let mut raise_sigpipe = false;
291 let iovs_arr = wasi_try_ok_ok!(
292 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
293 );
294 let iovs_arr =
295 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
296 for iovs in iovs_arr.iter() {
297 let buf = wasi_try_ok_ok!(
298 WasmPtr::<u8, M>::new(iovs.buf)
299 .slice(&memory, iovs.buf_len)
300 .map_err(mem_error_to_wasi)
301 );
302 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
303 let write_result = std::io::Write::write(tx, buf.as_ref());
304 let local_written = match write_result {
305 Ok(w) => w,
306 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
307 raise_sigpipe = true;
309 break;
310 }
311 Err(e) => return Ok(Err(map_io_err(e))),
312 };
313
314 written += local_written;
315 if local_written != buf.len() {
316 break;
317 }
318 }
319
320 drop(iovs_arr);
321
322 if raise_sigpipe {
323 env.process.signal_process(Signal::Sigpipe);
324 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
325 return Ok(Err(Errno::Pipe));
326 }
327 }
328 FdWriteSource::Buffer(data) => {
329 match std::io::Write::write_all(tx, data) {
330 Ok(()) => (),
331 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
332 env.process.signal_process(Signal::Sigpipe);
333 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
334 return Ok(Err(Errno::Pipe));
335 }
336 Err(e) => return Ok(Err(map_io_err(e))),
337 };
338 written += data.len();
339 }
340 }
341
342 (written, false, true)
343 }
344 Kind::DuplexPipe { pipe } => {
345 let mut written = 0usize;
346
347 match &data {
348 FdWriteSource::Iovs { iovs, iovs_len } => {
349 let mut raise_sigpipe = false;
350 let iovs_arr = wasi_try_ok_ok!(
351 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
352 );
353 let iovs_arr =
354 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
355 for iovs in iovs_arr.iter() {
356 let buf = wasi_try_ok_ok!(
357 WasmPtr::<u8, M>::new(iovs.buf)
358 .slice(&memory, iovs.buf_len)
359 .map_err(mem_error_to_wasi)
360 );
361 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
362 let write_result = std::io::Write::write(pipe, buf.as_ref());
363 let local_written = match write_result {
364 Ok(w) => w,
365 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
366 raise_sigpipe = true;
368 break;
369 }
370 Err(e) => return Ok(Err(map_io_err(e))),
371 };
372
373 written += local_written;
374 if local_written != buf.len() {
375 break;
376 }
377 }
378
379 drop(iovs_arr);
380
381 if raise_sigpipe {
382 env.process.signal_process(Signal::Sigpipe);
383 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
384 return Ok(Err(Errno::Pipe));
385 }
386 }
387 FdWriteSource::Buffer(data) => {
388 match std::io::Write::write_all(pipe, data) {
389 Ok(()) => (),
390 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
391 env.process.signal_process(Signal::Sigpipe);
392 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
393 return Ok(Err(Errno::Pipe));
394 }
395 Err(e) => return Ok(Err(map_io_err(e))),
396 };
397 written += data.len();
398 }
399 }
400
401 (written, false, true)
402 }
403 Kind::Dir { .. } | Kind::Root { .. } => {
404 return Ok(Err(Errno::Isdir));
406 }
407 Kind::EventNotifications { inner } => {
408 let mut written = 0usize;
409
410 match &data {
411 FdWriteSource::Iovs { iovs, iovs_len } => {
412 let iovs_arr = wasi_try_ok_ok!(
413 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
414 );
415 let iovs_arr =
416 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
417 for iovs in iovs_arr.iter() {
418 let buf_len: usize = wasi_try_ok_ok!(
419 iovs.buf_len.try_into().map_err(|_| Errno::Inval)
420 );
421 let will_be_written = buf_len;
422
423 let val_cnt = buf_len / std::mem::size_of::<u64>();
424 let val_cnt: M::Offset =
425 wasi_try_ok_ok!(val_cnt.try_into().map_err(|_| Errno::Inval));
426
427 let vals = wasi_try_ok_ok!(
428 WasmPtr::<u64, M>::new(iovs.buf)
429 .slice(&memory, val_cnt as M::Offset)
430 .map_err(mem_error_to_wasi)
431 );
432 let vals =
433 wasi_try_ok_ok!(vals.access().map_err(mem_error_to_wasi));
434 for val in vals.iter() {
435 inner.write(*val);
436 }
437
438 written += will_be_written;
439 }
440 }
441 FdWriteSource::Buffer(data) => {
442 let cnt = data.len() / std::mem::size_of::<u64>();
443 for n in 0..cnt {
444 let start = n * std::mem::size_of::<u64>();
445 let data = [
446 data[start],
447 data[start + 1],
448 data[start + 2],
449 data[start + 3],
450 data[start + 4],
451 data[start + 5],
452 data[start + 6],
453 data[start + 7],
454 ];
455 inner.write(u64::from_ne_bytes(data));
456 }
457 }
458 }
459
460 (written, false, true)
461 }
462 Kind::Symlink { .. } | Kind::Epoll { .. } => return Ok(Err(Errno::Inval)),
463 Kind::Buffer { buffer } => {
464 let mut written = 0usize;
465
466 match &data {
467 FdWriteSource::Iovs { iovs, iovs_len } => {
468 let iovs_arr = wasi_try_ok_ok!(
469 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
470 );
471 let iovs_arr =
472 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
473 for iovs in iovs_arr.iter() {
474 let buf = wasi_try_ok_ok!(
475 WasmPtr::<u8, M>::new(iovs.buf)
476 .slice(&memory, iovs.buf_len)
477 .map_err(mem_error_to_wasi)
478 );
479 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
480 let local_written = wasi_try_ok_ok!(
481 std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
482 );
483 written += local_written;
484 if local_written != buf.len() {
485 break;
486 }
487 }
488 }
489 FdWriteSource::Buffer(data) => {
490 wasi_try_ok_ok!(
491 std::io::Write::write_all(buffer, data).map_err(map_io_err)
492 );
493 written += data.len();
494 }
495 }
496
497 (written, false, true)
498 }
499 }
500 };
501
502 #[cfg(feature = "journal")]
503 if should_snapshot
504 && can_snapshot
505 && bytes_written > 0
506 && let FdWriteSource::Iovs { iovs, iovs_len } = data
507 {
508 JournalEffector::save_fd_write(ctx, fd, offset, bytes_written, iovs, iovs_len)
509 .map_err(|err| {
510 tracing::error!("failed to save terminal data - {}", err);
511 WasiError::Exit(ExitCode::from(Errno::Fault))
512 })?;
513 }
514
515 env = ctx.data();
516 memory = unsafe { env.memory_view(&ctx) };
517
518 if !is_stdio {
520 let curr_offset = if is_file && should_update_cursor {
521 let bytes_written = bytes_written as u64;
522 fd_entry
523 .inner
524 .offset
525 .fetch_add(bytes_written, Ordering::AcqRel)
526 + bytes_written
528 } else {
529 fd_entry.inner.offset.load(Ordering::Acquire)
530 };
531
532 let (mut memory, _, inodes) =
535 unsafe { env.get_memory_and_wasi_state_and_inodes(&ctx, 0) };
536 if is_file {
537 let mut stat = fd_entry.inode.stat.write().unwrap();
538 if should_update_cursor {
539 stat.st_size = stat.st_size.max(curr_offset);
543 } else {
544 stat.st_size = stat.st_size.max(offset + bytes_written as u64);
548 }
549 } else {
550 fd_entry.inode.stat.write().unwrap().st_size += bytes_written as u64;
552 }
553 }
554 bytes_written
555 };
556
557 Ok(Ok(bytes_written))
558}