wasmer_wasix/syscalls/wasi/
fd_write.rs1use std::task::Waker;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::{
6 journal::{JournalEffector, JournalEntry},
7 utils::map_snapshot_err,
8};
9use crate::{net::socket::TimeType, syscalls::*};
10
11#[instrument(level = "trace", skip_all, fields(%fd, nwritten = field::Empty), ret)]
26pub fn fd_write<M: MemorySize>(
27 mut ctx: FunctionEnvMut<'_, WasiEnv>,
28 fd: WasiFd,
29 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
30 iovs_len: M::Offset,
31 nwritten: WasmPtr<M::Offset, M>,
32) -> Result<Errno, WasiError> {
33 WasiEnv::do_pending_operations(&mut ctx)?;
34
35 let env = ctx.data();
36 let enable_journal = env.enable_journal;
37 let offset = {
38 let state = env.state.clone();
39 let inodes = state.inodes.clone();
40
41 let fd_entry = wasi_try_ok!(state.fs.get_fd(fd));
42 fd_entry.inner.offset.load(Ordering::Acquire) as usize
43 };
44
45 let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
46 &mut ctx,
47 fd,
48 FdWriteSource::Iovs { iovs, iovs_len },
49 offset as u64,
50 true,
51 enable_journal,
52 )?);
53
54 Span::current().record("nwritten", bytes_written);
55
56 let mut env = ctx.data();
57 let memory = unsafe { env.memory_view(&ctx) };
58 let nwritten_ref = nwritten.deref(&memory);
59 let bytes_written: M::Offset =
60 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
61 wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
62
63 Ok(Errno::Success)
64}
65
66#[instrument(level = "trace", skip_all, fields(%fd, %offset, nwritten = field::Empty), ret)]
81pub fn fd_pwrite<M: MemorySize>(
82 mut ctx: FunctionEnvMut<'_, WasiEnv>,
83 fd: WasiFd,
84 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
85 iovs_len: M::Offset,
86 offset: Filesize,
87 nwritten: WasmPtr<M::Offset, M>,
88) -> Result<Errno, WasiError> {
89 WasiEnv::do_pending_operations(&mut ctx)?;
90
91 let enable_snapshot_capture = ctx.data().enable_journal;
92
93 let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
94 &mut ctx,
95 fd,
96 FdWriteSource::Iovs { iovs, iovs_len },
97 offset,
98 false,
99 enable_snapshot_capture,
100 )?);
101
102 Span::current().record("nwritten", bytes_written);
103
104 let mut env = ctx.data();
105 let memory = unsafe { env.memory_view(&ctx) };
106 let nwritten_ref = nwritten.deref(&memory);
107 let bytes_written: M::Offset =
108 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
109 wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
110
111 Ok(Errno::Success)
112}
113
114pub(crate) enum FdWriteSource<'a, M: MemorySize> {
115 Iovs {
116 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
117 iovs_len: M::Offset,
118 },
119 Buffer(Cow<'a, [u8]>),
120}
121
122#[allow(clippy::await_holding_lock)]
123pub(crate) fn fd_write_internal<M: MemorySize>(
124 mut ctx: &mut FunctionEnvMut<'_, WasiEnv>,
125 fd: WasiFd,
126 data: FdWriteSource<'_, M>,
127 offset: u64,
128 should_update_cursor: bool,
129 should_snapshot: bool,
130) -> Result<Result<usize, Errno>, WasiError> {
131 let mut offset = offset;
132 let mut env = ctx.data();
133 let state = env.state.clone();
134
135 let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(fd));
136 let is_stdio = fd_entry.is_stdio;
137
138 let bytes_written = {
139 if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_WRITE) {
140 return Ok(Err(Errno::Access));
141 }
142
143 let fd_flags = fd_entry.inner.flags;
144 let mut memory = unsafe { env.memory_view(&ctx) };
145
146 let (bytes_written, is_file, can_snapshot) = {
147 let (mut memory, _) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
148 let mut guard = fd_entry.inode.write();
149 match guard.deref_mut() {
150 Kind::File { handle, .. } => {
151 if let Some(handle) = handle {
152 let handle = handle.clone();
153 drop(guard);
154
155 let res = __asyncify_light(
156 env,
157 if fd_entry.inner.flags.contains(Fdflags::NONBLOCK) {
158 Some(Duration::ZERO)
159 } else {
160 None
161 },
162 async {
163 let mut handle = handle.write().unwrap();
164 if !is_stdio {
165 if fd_entry.inner.flags.contains(Fdflags::APPEND) {
166 offset = fd_entry.inode.stat.read().unwrap().st_size;
168 fd_entry.inner.offset.store(offset, Ordering::Release);
169 }
170
171 handle
172 .seek(std::io::SeekFrom::Start(offset))
173 .await
174 .map_err(map_io_err)?;
175 }
176
177 let mut written = 0usize;
178
179 match &data {
180 FdWriteSource::Iovs { iovs, iovs_len } => {
181 let iovs_arr = iovs
182 .slice(&memory, *iovs_len)
183 .map_err(mem_error_to_wasi)?;
184 let iovs_arr =
185 iovs_arr.access().map_err(mem_error_to_wasi)?;
186 for iovs in iovs_arr.iter() {
187 let buf = WasmPtr::<u8, M>::new(iovs.buf)
188 .slice(&memory, iovs.buf_len)
189 .map_err(mem_error_to_wasi)?
190 .access()
191 .map_err(mem_error_to_wasi)?;
192 let local_written =
193 match handle.write(buf.as_ref()).await {
194 Ok(s) => s,
195 Err(_) if written > 0 => break,
196 Err(err) => return Err(map_io_err(err)),
197 };
198 written += local_written;
199 if local_written != buf.len() {
200 break;
201 }
202 }
203 }
204 FdWriteSource::Buffer(data) => {
205 handle.write_all(data).await?;
206 written += data.len();
207 }
208 }
209
210 if is_stdio {
211 handle.flush().await.map_err(map_io_err)?;
212 }
213 Ok(written)
214 },
215 );
216 let written = wasi_try_ok_ok!(res?.map_err(|err| match err {
217 Errno::Timedout => Errno::Again,
218 a => a,
219 }));
220
221 (written, true, true)
222 } else {
223 return Ok(Err(Errno::Inval));
224 }
225 }
226 Kind::Socket { socket } => {
227 let socket = socket.clone();
228 drop(guard);
229
230 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
231 let timeout = socket
232 .opt_time(TimeType::WriteTimeout)
233 .ok()
234 .flatten()
235 .unwrap_or(Duration::from_secs(30));
236
237 let tasks = env.tasks().clone();
238
239 let res = __asyncify_light(env, None, async {
240 let mut sent = 0usize;
241
242 match &data {
243 FdWriteSource::Iovs { iovs, iovs_len } => {
244 let iovs_arr =
245 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)?;
246 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
247 for iovs in iovs_arr.iter() {
248 let buf = WasmPtr::<u8, M>::new(iovs.buf)
249 .slice(&memory, iovs.buf_len)
250 .map_err(mem_error_to_wasi)?
251 .access()
252 .map_err(mem_error_to_wasi)?;
253 let local_sent = socket
254 .send(
255 tasks.deref(),
256 buf.as_ref(),
257 Some(timeout),
258 nonblocking,
259 )
260 .await?;
261 sent += local_sent;
262 if local_sent != buf.len() {
263 break;
264 }
265 }
266 }
267 FdWriteSource::Buffer(data) => {
268 sent += socket
269 .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
270 .await?;
271 }
272 }
273 Ok(sent)
274 });
275 let written = wasi_try_ok_ok!(res?);
276 (written, false, false)
277 }
278 Kind::PipeRx { .. } => {
279 return Ok(Err(Errno::Badf));
280 }
281 Kind::PipeTx { tx } => {
282 let mut written = 0usize;
283
284 match &data {
285 FdWriteSource::Iovs { iovs, iovs_len } => {
286 let mut raise_sigpipe = false;
287 let iovs_arr = wasi_try_ok_ok!(
288 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
289 );
290 let iovs_arr =
291 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
292 for iovs in iovs_arr.iter() {
293 let buf = wasi_try_ok_ok!(
294 WasmPtr::<u8, M>::new(iovs.buf)
295 .slice(&memory, iovs.buf_len)
296 .map_err(mem_error_to_wasi)
297 );
298 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
299 let write_result = std::io::Write::write(tx, buf.as_ref());
300 let local_written = match write_result {
301 Ok(w) => w,
302 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
303 raise_sigpipe = true;
305 break;
306 }
307 Err(e) => return Ok(Err(map_io_err(e))),
308 };
309
310 written += local_written;
311 if local_written != buf.len() {
312 break;
313 }
314 }
315
316 drop(iovs_arr);
317
318 if raise_sigpipe {
319 env.process.signal_process(Signal::Sigpipe);
320 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
321 return Ok(Err(Errno::Pipe));
322 }
323 }
324 FdWriteSource::Buffer(data) => {
325 match std::io::Write::write_all(tx, data) {
326 Ok(()) => (),
327 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
328 env.process.signal_process(Signal::Sigpipe);
329 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
330 return Ok(Err(Errno::Pipe));
331 }
332 Err(e) => return Ok(Err(map_io_err(e))),
333 };
334 written += data.len();
335 }
336 }
337
338 (written, false, true)
339 }
340 Kind::DuplexPipe { pipe } => {
341 let mut written = 0usize;
342
343 match &data {
344 FdWriteSource::Iovs { iovs, iovs_len } => {
345 let mut raise_sigpipe = false;
346 let iovs_arr = wasi_try_ok_ok!(
347 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
348 );
349 let iovs_arr =
350 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
351 for iovs in iovs_arr.iter() {
352 let buf = wasi_try_ok_ok!(
353 WasmPtr::<u8, M>::new(iovs.buf)
354 .slice(&memory, iovs.buf_len)
355 .map_err(mem_error_to_wasi)
356 );
357 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
358 let write_result = std::io::Write::write(pipe, buf.as_ref());
359 let local_written = match write_result {
360 Ok(w) => w,
361 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
362 raise_sigpipe = true;
364 break;
365 }
366 Err(e) => return Ok(Err(map_io_err(e))),
367 };
368
369 written += local_written;
370 if local_written != buf.len() {
371 break;
372 }
373 }
374
375 drop(iovs_arr);
376
377 if raise_sigpipe {
378 env.process.signal_process(Signal::Sigpipe);
379 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
380 return Ok(Err(Errno::Pipe));
381 }
382 }
383 FdWriteSource::Buffer(data) => {
384 match std::io::Write::write_all(pipe, data) {
385 Ok(()) => (),
386 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
387 env.process.signal_process(Signal::Sigpipe);
388 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
389 return Ok(Err(Errno::Pipe));
390 }
391 Err(e) => return Ok(Err(map_io_err(e))),
392 };
393 written += data.len();
394 }
395 }
396
397 (written, false, true)
398 }
399 Kind::Dir { .. } | Kind::Root { .. } => {
400 return Ok(Err(Errno::Isdir));
402 }
403 Kind::EventNotifications { inner } => {
404 let mut written = 0usize;
405
406 match &data {
407 FdWriteSource::Iovs { iovs, iovs_len } => {
408 let iovs_arr = wasi_try_ok_ok!(
409 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
410 );
411 let iovs_arr =
412 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
413 for iovs in iovs_arr.iter() {
414 let buf_len: usize = wasi_try_ok_ok!(
415 iovs.buf_len.try_into().map_err(|_| Errno::Inval)
416 );
417 let will_be_written = buf_len;
418
419 let val_cnt = buf_len / std::mem::size_of::<u64>();
420 let val_cnt: M::Offset =
421 wasi_try_ok_ok!(val_cnt.try_into().map_err(|_| Errno::Inval));
422
423 let vals = wasi_try_ok_ok!(
424 WasmPtr::<u64, M>::new(iovs.buf)
425 .slice(&memory, val_cnt as M::Offset)
426 .map_err(mem_error_to_wasi)
427 );
428 let vals =
429 wasi_try_ok_ok!(vals.access().map_err(mem_error_to_wasi));
430 for val in vals.iter() {
431 inner.write(*val);
432 }
433
434 written += will_be_written;
435 }
436 }
437 FdWriteSource::Buffer(data) => {
438 let cnt = data.len() / std::mem::size_of::<u64>();
439 for n in 0..cnt {
440 let start = n * std::mem::size_of::<u64>();
441 let data = [
442 data[start],
443 data[start + 1],
444 data[start + 2],
445 data[start + 3],
446 data[start + 4],
447 data[start + 5],
448 data[start + 6],
449 data[start + 7],
450 ];
451 inner.write(u64::from_ne_bytes(data));
452 }
453 }
454 }
455
456 (written, false, true)
457 }
458 Kind::Symlink { .. } | Kind::Epoll { .. } => return Ok(Err(Errno::Inval)),
459 Kind::Buffer { buffer } => {
460 let mut written = 0usize;
461
462 match &data {
463 FdWriteSource::Iovs { iovs, iovs_len } => {
464 let iovs_arr = wasi_try_ok_ok!(
465 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
466 );
467 let iovs_arr =
468 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
469 for iovs in iovs_arr.iter() {
470 let buf = wasi_try_ok_ok!(
471 WasmPtr::<u8, M>::new(iovs.buf)
472 .slice(&memory, iovs.buf_len)
473 .map_err(mem_error_to_wasi)
474 );
475 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
476 let local_written = wasi_try_ok_ok!(
477 std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
478 );
479 written += local_written;
480 if local_written != buf.len() {
481 break;
482 }
483 }
484 }
485 FdWriteSource::Buffer(data) => {
486 wasi_try_ok_ok!(
487 std::io::Write::write_all(buffer, data).map_err(map_io_err)
488 );
489 written += data.len();
490 }
491 }
492
493 (written, false, true)
494 }
495 }
496 };
497
498 #[cfg(feature = "journal")]
499 if should_snapshot
500 && can_snapshot
501 && bytes_written > 0
502 && let FdWriteSource::Iovs { iovs, iovs_len } = data
503 {
504 JournalEffector::save_fd_write(ctx, fd, offset, bytes_written, iovs, iovs_len)
505 .map_err(|err| {
506 tracing::error!("failed to save terminal data - {}", err);
507 WasiError::Exit(ExitCode::from(Errno::Fault))
508 })?;
509 }
510
511 env = ctx.data();
512 memory = unsafe { env.memory_view(&ctx) };
513
514 if !is_stdio {
516 let curr_offset = if is_file && should_update_cursor {
517 let bytes_written = bytes_written as u64;
518 let mut fd_map = state.fs.fd_map.write().unwrap();
519 let fd_entry = wasi_try_ok_ok!(fd_map.get_mut(fd).ok_or(Errno::Badf));
520 fd_entry
521 .offset
522 .fetch_add(bytes_written, Ordering::AcqRel)
523 + bytes_written
525 } else {
526 fd_entry.inner.offset.load(Ordering::Acquire)
527 };
528
529 let (mut memory, _, inodes) =
532 unsafe { env.get_memory_and_wasi_state_and_inodes(&ctx, 0) };
533 if is_file {
534 let mut stat = fd_entry.inode.stat.write().unwrap();
535 if should_update_cursor {
536 stat.st_size = stat.st_size.max(curr_offset);
540 } else {
541 stat.st_size = stat.st_size.max(offset + bytes_written as u64);
545 }
546 } else {
547 fd_entry.inode.stat.write().unwrap().st_size += bytes_written as u64;
549 }
550 }
551 bytes_written
552 };
553
554 Ok(Ok(bytes_written))
555}