1use std::task::Waker;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::{
6 journal::{JournalEffector, JournalEntry},
7 utils::map_snapshot_err,
8};
9use crate::{net::MAX_SOCKET_PAYLOAD, net::socket::TimeType, syscalls::*};
10
11#[instrument(level = "trace", skip_all, fields(%fd, nwritten = field::Empty), ret)]
26pub fn fd_write<M: MemorySize>(
27 mut ctx: FunctionEnvMut<'_, WasiEnv>,
28 fd: WasiFd,
29 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
30 iovs_len: M::Offset,
31 nwritten: WasmPtr<M::Offset, M>,
32) -> Result<Errno, WasiError> {
33 WasiEnv::do_pending_operations(&mut ctx)?;
34
35 let env = ctx.data();
36 let enable_journal = env.enable_journal;
37 let fd_entry = {
38 let state = env.state.clone();
39 wasi_try_ok!(state.fs.get_fd(fd))
40 };
41 let offset = fd_entry.inner.offset.load(Ordering::Acquire) as usize;
42
43 let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
44 &mut ctx,
45 fd,
46 fd_entry,
47 FdWriteSource::Iovs { iovs, iovs_len },
48 offset as u64,
49 true,
50 enable_journal,
51 )?);
52
53 Span::current().record("nwritten", bytes_written);
54
55 let mut env = ctx.data();
56 let memory = unsafe { env.memory_view(&ctx) };
57 let nwritten_ref = nwritten.deref(&memory);
58 let bytes_written: M::Offset =
59 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
60 wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
61
62 Ok(Errno::Success)
63}
64
65#[instrument(level = "trace", skip_all, fields(%fd, %offset, nwritten = field::Empty), ret)]
80pub fn fd_pwrite<M: MemorySize>(
81 mut ctx: FunctionEnvMut<'_, WasiEnv>,
82 fd: WasiFd,
83 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
84 iovs_len: M::Offset,
85 offset: Filesize,
86 nwritten: WasmPtr<M::Offset, M>,
87) -> Result<Errno, WasiError> {
88 WasiEnv::do_pending_operations(&mut ctx)?;
89
90 let enable_snapshot_capture = ctx.data().enable_journal;
91
92 let fd_entry = {
93 let env = ctx.data();
94 let state = env.state.clone();
95 wasi_try_ok!(state.fs.get_fd(fd))
96 };
97 let bytes_written = wasi_try_ok!(fd_write_internal::<M>(
98 &mut ctx,
99 fd,
100 fd_entry,
101 FdWriteSource::Iovs { iovs, iovs_len },
102 offset,
103 false,
104 enable_snapshot_capture,
105 )?);
106
107 Span::current().record("nwritten", bytes_written);
108
109 let mut env = ctx.data();
110 let memory = unsafe { env.memory_view(&ctx) };
111 let nwritten_ref = nwritten.deref(&memory);
112 let bytes_written: M::Offset =
113 wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
114 wasi_try_mem_ok!(nwritten_ref.write(bytes_written));
115
116 Ok(Errno::Success)
117}
118
119pub(crate) enum FdWriteSource<'a, M: MemorySize> {
120 Iovs {
121 iovs: WasmPtr<__wasi_ciovec_t<M>, M>,
122 iovs_len: M::Offset,
123 },
124 Buffer(Cow<'a, [u8]>),
125}
126
127impl<'a, M: MemorySize> FdWriteSource<'a, M> {
128 pub(crate) fn coalesce(
129 &self,
130 memory: &MemoryView,
131 max_len: usize,
132 ) -> Result<Cow<'a, [u8]>, Errno> {
133 match self {
134 FdWriteSource::Iovs { iovs, iovs_len } => {
135 let iovs_arr = iovs.slice(memory, *iovs_len).map_err(mem_error_to_wasi)?;
136 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
137
138 let mut total_len = 0usize;
139 for iov in iovs_arr.iter() {
140 let len = iov.buf_len.into() as usize;
141 total_len = total_len.checked_add(len).ok_or(Errno::Msgsize)?;
142 }
143 if total_len > max_len {
144 return Err(Errno::Msgsize);
145 }
146
147 let mut coalesced = Vec::with_capacity(total_len);
148
149 for iov in iovs_arr.iter() {
150 let buf = WasmPtr::<u8, M>::new(iov.buf)
151 .slice(memory, iov.buf_len)
152 .map_err(mem_error_to_wasi)?
153 .access()
154 .map_err(mem_error_to_wasi)?;
155 coalesced.extend_from_slice(buf.as_ref());
156 }
157
158 Ok(Cow::Owned(coalesced))
159 }
160 FdWriteSource::Buffer(cow) => {
161 if cow.len() > max_len {
162 return Err(Errno::Msgsize);
163 }
164 Ok(cow.clone())
165 }
166 }
167 }
168}
169
170#[allow(clippy::await_holding_lock)]
171pub(crate) fn fd_write_internal<M: MemorySize>(
172 mut ctx: &mut FunctionEnvMut<'_, WasiEnv>,
173 fd: WasiFd,
174 fd_entry: Fd,
175 data: FdWriteSource<'_, M>,
176 offset: u64,
177 should_update_cursor: bool,
178 should_snapshot: bool,
179) -> Result<Result<usize, Errno>, WasiError> {
180 let mut offset = offset;
181 let mut env = ctx.data();
182 let state = env.state.clone();
183 let is_stdio = fd_entry.is_stdio;
184
185 let bytes_written = {
186 if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_WRITE) {
187 return Ok(Err(Errno::Access));
188 }
189
190 let fd_flags = fd_entry.inner.flags;
191 let mut memory = unsafe { env.memory_view(&ctx) };
192
193 let (bytes_written, is_file, can_snapshot) = {
194 let (mut memory, _) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
195 let mut guard = fd_entry.inode.write();
196 match guard.deref_mut() {
197 Kind::File { handle, .. } => {
198 if let Some(handle) = handle {
199 let handle = handle.clone();
200 drop(guard);
201
202 let res = __asyncify_light(
203 env,
204 if fd_entry.inner.flags.contains(Fdflags::NONBLOCK) {
205 Some(Duration::ZERO)
206 } else {
207 None
208 },
209 async {
210 let mut handle = handle.write().unwrap();
211 if !is_stdio {
212 if fd_entry.inner.flags.contains(Fdflags::APPEND) {
213 offset = fd_entry.inode.stat.read().unwrap().st_size;
215 fd_entry.inner.offset.store(offset, Ordering::Release);
216 }
217
218 handle
219 .seek(std::io::SeekFrom::Start(offset))
220 .await
221 .map_err(map_io_err)?;
222 }
223
224 let mut written = 0usize;
225
226 match &data {
227 FdWriteSource::Iovs { iovs, iovs_len } => {
228 let iovs_arr = iovs
229 .slice(&memory, *iovs_len)
230 .map_err(mem_error_to_wasi)?;
231 let iovs_arr =
232 iovs_arr.access().map_err(mem_error_to_wasi)?;
233 for iovs in iovs_arr.iter() {
234 let buf = WasmPtr::<u8, M>::new(iovs.buf)
235 .slice(&memory, iovs.buf_len)
236 .map_err(mem_error_to_wasi)?
237 .access()
238 .map_err(mem_error_to_wasi)?;
239 let local_written =
240 match handle.write(buf.as_ref()).await {
241 Ok(s) => s,
242 Err(_) if written > 0 => break,
243 Err(err) => return Err(map_io_err(err)),
244 };
245 written += local_written;
246 if local_written != buf.len() {
247 break;
248 }
249 }
250 }
251 FdWriteSource::Buffer(data) => {
252 handle.write_all(data).await?;
253 written += data.len();
254 }
255 }
256
257 if is_stdio {
258 handle.flush().await.map_err(map_io_err)?;
259 }
260 Ok(written)
261 },
262 );
263 let written = wasi_try_ok_ok!(res?.map_err(|err| match err {
264 Errno::Timedout => Errno::Again,
265 a => a,
266 }));
267
268 (written, true, true)
269 } else {
270 return Ok(Err(Errno::Inval));
271 }
272 }
273 Kind::Socket { socket } => {
274 let socket = socket.clone();
275 drop(guard);
276
277 let nonblocking = fd_flags.contains(Fdflags::NONBLOCK);
278 let timeout = socket
279 .opt_time(TimeType::WriteTimeout)
280 .ok()
281 .flatten()
282 .unwrap_or(Duration::from_secs(30));
283
284 let tasks = env.tasks().clone();
285
286 let res = __asyncify_light(env, None, async {
287 let mut sent = 0usize;
288
289 if socket.is_dgram() {
290 let data = data.coalesce(&memory, MAX_SOCKET_PAYLOAD)?;
291 sent += socket
292 .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
293 .await?;
294 return Ok(sent);
295 }
296
297 match &data {
298 FdWriteSource::Iovs { iovs, iovs_len } => {
299 let iovs_arr =
300 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)?;
301 let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;
302 for iovs in iovs_arr.iter() {
303 let buf = WasmPtr::<u8, M>::new(iovs.buf)
304 .slice(&memory, iovs.buf_len)
305 .map_err(mem_error_to_wasi)?
306 .access()
307 .map_err(mem_error_to_wasi)?;
308 let local_sent = match socket
309 .send(
310 tasks.deref(),
311 buf.as_ref(),
312 Some(timeout),
313 nonblocking,
314 )
315 .await
316 {
317 Ok(sent) => sent,
318 Err(_) if sent > 0 => break,
319 Err(err) => return Err(err),
320 };
321 sent += local_sent;
322 if local_sent != buf.len() {
323 break;
324 }
325 }
326 }
327 FdWriteSource::Buffer(data) => {
328 sent += socket
329 .send(tasks.deref(), data.as_ref(), Some(timeout), nonblocking)
330 .await?;
331 }
332 }
333 Ok(sent)
334 });
335 let written = wasi_try_ok_ok!(res?);
336 (written, false, false)
337 }
338 Kind::PipeRx { .. } => {
339 return Ok(Err(Errno::Badf));
340 }
341 Kind::PipeTx { tx } => {
342 let mut written = 0usize;
343
344 match &data {
345 FdWriteSource::Iovs { iovs, iovs_len } => {
346 let mut raise_sigpipe = false;
347 let iovs_arr = wasi_try_ok_ok!(
348 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
349 );
350 let iovs_arr =
351 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
352 for iovs in iovs_arr.iter() {
353 let buf = wasi_try_ok_ok!(
354 WasmPtr::<u8, M>::new(iovs.buf)
355 .slice(&memory, iovs.buf_len)
356 .map_err(mem_error_to_wasi)
357 );
358 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
359 let write_result = std::io::Write::write(tx, buf.as_ref());
360 let local_written = match write_result {
361 Ok(w) => w,
362 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
363 raise_sigpipe = true;
365 break;
366 }
367 Err(e) => return Ok(Err(map_io_err(e))),
368 };
369
370 written += local_written;
371 if local_written != buf.len() {
372 break;
373 }
374 }
375
376 drop(iovs_arr);
377
378 if raise_sigpipe {
379 env.process.signal_process(Signal::Sigpipe);
380 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
381 return Ok(Err(Errno::Pipe));
382 }
383 }
384 FdWriteSource::Buffer(data) => {
385 match std::io::Write::write_all(tx, data) {
386 Ok(()) => (),
387 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
388 env.process.signal_process(Signal::Sigpipe);
389 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
390 return Ok(Err(Errno::Pipe));
391 }
392 Err(e) => return Ok(Err(map_io_err(e))),
393 };
394 written += data.len();
395 }
396 }
397
398 (written, false, true)
399 }
400 Kind::DuplexPipe { pipe } => {
401 let mut written = 0usize;
402
403 match &data {
404 FdWriteSource::Iovs { iovs, iovs_len } => {
405 let mut raise_sigpipe = false;
406 let iovs_arr = wasi_try_ok_ok!(
407 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
408 );
409 let iovs_arr =
410 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
411 for iovs in iovs_arr.iter() {
412 let buf = wasi_try_ok_ok!(
413 WasmPtr::<u8, M>::new(iovs.buf)
414 .slice(&memory, iovs.buf_len)
415 .map_err(mem_error_to_wasi)
416 );
417 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
418 let write_result = std::io::Write::write(pipe, buf.as_ref());
419 let local_written = match write_result {
420 Ok(w) => w,
421 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
422 raise_sigpipe = true;
424 break;
425 }
426 Err(e) => return Ok(Err(map_io_err(e))),
427 };
428
429 written += local_written;
430 if local_written != buf.len() {
431 break;
432 }
433 }
434
435 drop(iovs_arr);
436
437 if raise_sigpipe {
438 env.process.signal_process(Signal::Sigpipe);
439 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
440 return Ok(Err(Errno::Pipe));
441 }
442 }
443 FdWriteSource::Buffer(data) => {
444 match std::io::Write::write_all(pipe, data) {
445 Ok(()) => (),
446 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
447 env.process.signal_process(Signal::Sigpipe);
448 wasi_try_ok_ok!(WasiEnv::process_signals_and_exit(ctx)?);
449 return Ok(Err(Errno::Pipe));
450 }
451 Err(e) => return Ok(Err(map_io_err(e))),
452 };
453 written += data.len();
454 }
455 }
456
457 (written, false, true)
458 }
459 Kind::Dir { .. } | Kind::Root { .. } => {
460 return Ok(Err(Errno::Isdir));
462 }
463 Kind::EventNotifications { inner } => {
464 let mut written = 0usize;
465
466 match &data {
467 FdWriteSource::Iovs { iovs, iovs_len } => {
468 let iovs_arr = wasi_try_ok_ok!(
469 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
470 );
471 let iovs_arr =
472 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
473 for iovs in iovs_arr.iter() {
474 let buf_len: usize = wasi_try_ok_ok!(
475 iovs.buf_len.try_into().map_err(|_| Errno::Inval)
476 );
477 let will_be_written = buf_len;
478
479 let val_cnt = buf_len / std::mem::size_of::<u64>();
480 let val_cnt: M::Offset =
481 wasi_try_ok_ok!(val_cnt.try_into().map_err(|_| Errno::Inval));
482
483 let vals = wasi_try_ok_ok!(
484 WasmPtr::<u64, M>::new(iovs.buf)
485 .slice(&memory, val_cnt as M::Offset)
486 .map_err(mem_error_to_wasi)
487 );
488 let vals =
489 wasi_try_ok_ok!(vals.access().map_err(mem_error_to_wasi));
490 for val in vals.iter() {
491 inner.write(*val);
492 }
493
494 written += will_be_written;
495 }
496 }
497 FdWriteSource::Buffer(data) => {
498 let cnt = data.len() / std::mem::size_of::<u64>();
499 for n in 0..cnt {
500 let start = n * std::mem::size_of::<u64>();
501 let data = [
502 data[start],
503 data[start + 1],
504 data[start + 2],
505 data[start + 3],
506 data[start + 4],
507 data[start + 5],
508 data[start + 6],
509 data[start + 7],
510 ];
511 inner.write(u64::from_ne_bytes(data));
512 }
513 }
514 }
515
516 (written, false, true)
517 }
518 Kind::Symlink { .. } | Kind::Epoll { .. } => return Ok(Err(Errno::Inval)),
519 Kind::Buffer { buffer } => {
520 let mut written = 0usize;
521
522 match &data {
523 FdWriteSource::Iovs { iovs, iovs_len } => {
524 let iovs_arr = wasi_try_ok_ok!(
525 iovs.slice(&memory, *iovs_len).map_err(mem_error_to_wasi)
526 );
527 let iovs_arr =
528 wasi_try_ok_ok!(iovs_arr.access().map_err(mem_error_to_wasi));
529 for iovs in iovs_arr.iter() {
530 let buf = wasi_try_ok_ok!(
531 WasmPtr::<u8, M>::new(iovs.buf)
532 .slice(&memory, iovs.buf_len)
533 .map_err(mem_error_to_wasi)
534 );
535 let buf = wasi_try_ok_ok!(buf.access().map_err(mem_error_to_wasi));
536 let local_written = wasi_try_ok_ok!(
537 std::io::Write::write(buffer, buf.as_ref()).map_err(map_io_err)
538 );
539 written += local_written;
540 if local_written != buf.len() {
541 break;
542 }
543 }
544 }
545 FdWriteSource::Buffer(data) => {
546 wasi_try_ok_ok!(
547 std::io::Write::write_all(buffer, data).map_err(map_io_err)
548 );
549 written += data.len();
550 }
551 }
552
553 (written, false, true)
554 }
555 }
556 };
557
558 #[cfg(feature = "journal")]
559 if should_snapshot
560 && can_snapshot
561 && bytes_written > 0
562 && let FdWriteSource::Iovs { iovs, iovs_len } = data
563 {
564 JournalEffector::save_fd_write(ctx, fd, offset, bytes_written, iovs, iovs_len)
565 .map_err(|err| {
566 tracing::error!("failed to save terminal data - {}", err);
567 WasiError::Exit(ExitCode::from(Errno::Fault))
568 })?;
569 }
570
571 env = ctx.data();
572 memory = unsafe { env.memory_view(&ctx) };
573
574 if !is_stdio {
576 let curr_offset = if is_file && should_update_cursor {
577 let bytes_written = bytes_written as u64;
578 fd_entry
579 .inner
580 .offset
581 .fetch_add(bytes_written, Ordering::AcqRel)
582 + bytes_written
584 } else {
585 fd_entry.inner.offset.load(Ordering::Acquire)
586 };
587
588 let (mut memory, _, inodes) =
591 unsafe { env.get_memory_and_wasi_state_and_inodes(&ctx, 0) };
592 if is_file {
593 let mut stat = fd_entry.inode.stat.write().unwrap();
594 if should_update_cursor {
595 stat.st_size = stat.st_size.max(curr_offset);
599 } else {
600 stat.st_size = stat.st_size.max(offset + bytes_written as u64);
604 }
605 } else {
606 fd_entry.inode.stat.write().unwrap().st_size += bytes_written as u64;
608 }
609 }
610 bytes_written
611 };
612
613 Ok(Ok(bytes_written))
614}