1use crate::{WasiEnv, WasiRuntimeError, journal::SnapshotTrigger};
2#[cfg(feature = "journal")]
3use crate::{WasiResult, journal::JournalEffector, syscalls::do_checkpoint_from_outside, unwind};
4use serde::{Deserialize, Serialize};
5#[cfg(feature = "journal")]
6use std::collections::HashSet;
7use std::{
8 collections::HashMap,
9 convert::TryInto,
10 ops::Range,
11 sync::{
12 Arc, Condvar, Mutex, MutexGuard, RwLock, Weak,
13 atomic::{AtomicU32, Ordering},
14 },
15 task::Waker,
16 time::Duration,
17};
18use tracing::trace;
19use wasmer::{FunctionEnvMut, SharedMemory};
20use wasmer_types::ModuleHash;
21use wasmer_wasix_types::{
22 types::Signal,
23 wasi::{Errno, ExitCode, Snapshot0Clockid},
24 wasix::ThreadStartType,
25};
26
27use crate::{
28 WasiThread, WasiThreadHandle, WasiThreadId, os::task::signal::WasiSignalInterval,
29 syscalls::platform_clock_time_get,
30};
31
32use super::{
33 TaskStatus,
34 backoff::WasiProcessCpuBackoff,
35 control_plane::{ControlPlaneError, WasiControlPlaneHandle},
36 signal::{SignalDeliveryError, SignalHandlerAbi},
37 task_join_handle::OwnedTaskStatus,
38 thread::WasiMemoryLayout,
39};
40
41#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
43pub struct WasiProcessId(u32);
44
45impl WasiProcessId {
46 pub fn raw(&self) -> u32 {
47 self.0
48 }
49}
50
51impl From<i32> for WasiProcessId {
52 fn from(id: i32) -> Self {
53 Self(id as u32)
54 }
55}
56
57impl From<WasiProcessId> for i32 {
58 fn from(val: WasiProcessId) -> Self {
59 val.0 as i32
60 }
61}
62
63impl From<u32> for WasiProcessId {
64 fn from(id: u32) -> Self {
65 Self(id)
66 }
67}
68
69impl From<WasiProcessId> for u32 {
70 fn from(val: WasiProcessId) -> Self {
71 val.0
72 }
73}
74
75impl std::fmt::Display for WasiProcessId {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "{}", self.0)
78 }
79}
80
81impl std::fmt::Debug for WasiProcessId {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(f, "{}", self.0)
84 }
85}
86
87pub type LockableWasiProcessInner = Arc<(Mutex<WasiProcessInner>, Condvar)>;
88
89#[derive(Debug, Clone)]
92pub struct WasiProcess {
93 pub(crate) pid: WasiProcessId,
95 pub(crate) module_hash: ModuleHash,
97 pub(crate) parent: Option<Weak<RwLock<WasiProcessInner>>>,
99 pub(crate) inner: LockableWasiProcessInner,
102 pub(crate) compute: WasiControlPlaneHandle,
106 pub(crate) finished: Arc<OwnedTaskStatus>,
108 pub(crate) waiting: Arc<AtomicU32>,
110 pub(crate) cpu_run_tokens: Arc<AtomicU32>,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
121pub enum WasiProcessCheckpoint {
122 Execute,
125 Snapshot { trigger: SnapshotTrigger },
128}
129
130#[repr(C)]
131#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
132pub struct MemorySnapshotRegion {
133 pub start: u64,
134 pub end: u64,
135}
136
137impl From<Range<u64>> for MemorySnapshotRegion {
138 fn from(value: Range<u64>) -> Self {
139 Self {
140 start: value.start,
141 end: value.end,
142 }
143 }
144}
145
146#[allow(clippy::from_over_into)]
147impl Into<Range<u64>> for MemorySnapshotRegion {
148 fn into(self) -> Range<u64> {
149 self.start..self.end
150 }
151}
152
153#[derive(Debug)]
155pub struct WasiProcessInner {
156 pub pid: WasiProcessId,
158 pub(crate) waiting: Arc<AtomicU32>,
160 pub threads: HashMap<WasiThreadId, WasiThread>,
162 pub thread_count: u32,
164 pub signal_intervals: HashMap<Signal, WasiSignalInterval>,
166 pub children: Vec<WasiProcess>,
168 pub checkpoint: WasiProcessCheckpoint,
171 pub disable_journaling_after_checkpoint: bool,
174 pub stop_running_after_checkpoint: bool,
177 #[cfg(feature = "journal")]
179 pub snapshot_on: HashSet<SnapshotTrigger>,
180 pub wakers: Vec<Waker>,
182 pub cleanup_started: bool,
184 pub memory: Option<SharedMemory>,
186 #[cfg(feature = "journal")]
189 pub snapshot_memory_hash: HashMap<MemorySnapshotRegion, u64>,
190 pub(super) backoff: WasiProcessCpuBackoff,
194}
195
196pub enum MaybeCheckpointResult<'a> {
197 NotThisTime(FunctionEnvMut<'a, WasiEnv>),
198 Unwinding,
199}
200
201impl WasiProcessInner {
202 #[cfg(feature = "journal")]
205 pub fn checkpoint<M: wasmer_types::MemorySize>(
206 inner: LockableWasiProcessInner,
207 ctx: FunctionEnvMut<'_, WasiEnv>,
208 for_what: WasiProcessCheckpoint,
209 ) -> WasiResult<MaybeCheckpointResult<'_>> {
210 {
212 let mut guard = inner.0.lock().unwrap();
213 guard.checkpoint = for_what;
214 for waker in guard.wakers.drain(..) {
215 waker.wake();
216 }
217 inner.1.notify_all();
218 }
219
220 Self::maybe_checkpoint::<M>(inner, ctx)
221 }
222
223 #[cfg(feature = "journal")]
226 pub fn maybe_checkpoint<M: wasmer_types::MemorySize>(
227 inner: LockableWasiProcessInner,
228 ctx: FunctionEnvMut<'_, WasiEnv>,
229 ) -> WasiResult<MaybeCheckpointResult<'_>> {
230 use bytes::Bytes;
233 use wasmer::AsStoreMut;
234 use wasmer_types::OnCalledAction;
235
236 use crate::{WasiError, os::task::thread::RewindResultType, rewind_ext};
237 let guard = inner.0.lock().unwrap();
238 if guard.checkpoint == WasiProcessCheckpoint::Execute {
239 return Ok(Ok(MaybeCheckpointResult::NotThisTime(ctx)));
241 }
242 trace!("checkpoint capture");
243 drop(guard);
244
245 let thread_layout = ctx.data().thread.memory_layout().clone();
247 unwind::<M, _>(ctx, move |mut ctx, memory_stack, rewind_stack| {
248 let store_data = crate::utils::store::capture_store_snapshot(&mut ctx.as_store_mut())
250 .serialize()
251 .unwrap();
252 let memory_stack = memory_stack.freeze();
253 let rewind_stack = rewind_stack.freeze();
254 let store_data = Bytes::from(store_data);
255
256 tracing::debug!(
257 "stack snapshot unwind (memory_stack={}, rewind_stack={}, store_data={})",
258 memory_stack.len(),
259 rewind_stack.len(),
260 store_data.len(),
261 );
262
263 let thread_start = ctx.data().thread.thread_start_type();
265 let tid = ctx.data().thread.tid();
266 if let Err(err) = JournalEffector::save_thread_state::<M>(
267 &mut ctx,
268 tid,
269 memory_stack.clone(),
270 rewind_stack.clone(),
271 store_data.clone(),
272 thread_start,
273 thread_layout,
274 ) {
275 return wasmer_types::OnCalledAction::Trap(err.into());
276 }
277
278 let mut guard = inner.0.lock().unwrap();
279
280 loop {
283 if let WasiProcessCheckpoint::Snapshot { trigger } = guard.checkpoint {
284 ctx.data().thread.set_checkpointing(true);
285
286 let is_last_thread = guard
288 .threads
289 .values()
290 .all(|t| t.is_check_pointing() || t.is_deep_sleeping());
291 if is_last_thread {
292 if let Err(err) =
293 JournalEffector::save_memory_and_snapshot(&mut ctx, &mut guard, trigger)
294 {
295 inner.1.notify_all();
296 return wasmer_types::OnCalledAction::Trap(err.into());
297 }
298
299 ctx.data().thread.set_checkpointing(false);
301 trace!("checkpoint complete");
302 if guard.disable_journaling_after_checkpoint {
303 ctx.data_mut().enable_journal = false;
304 }
305 guard.checkpoint = WasiProcessCheckpoint::Execute;
306 for waker in guard.wakers.drain(..) {
307 waker.wake();
308 }
309 inner.1.notify_all();
310 } else {
311 guard = inner.1.wait(guard).unwrap();
312 }
313 continue;
314 }
315
316 ctx.data().thread.set_checkpointing(false);
317 trace!("checkpoint finished");
318
319 if guard.stop_running_after_checkpoint {
320 trace!("will stop running now");
321 ctx.data_mut().enable_journal = false;
324 return OnCalledAction::Finish;
325 }
326
327 return match rewind_ext::<M>(
329 &mut ctx,
330 Some(memory_stack),
331 rewind_stack,
332 store_data,
333 RewindResultType::RewindWithoutResult,
334 ) {
335 Errno::Success => OnCalledAction::InvokeAgain,
336 err => {
337 tracing::warn!(
338 "snapshot resumption failed - could not rewind the stack - errno={}",
339 err
340 );
341 OnCalledAction::Trap(Box::new(WasiError::Exit(err.into())))
342 }
343 };
344 }
345 })?;
346
347 Ok(Ok(MaybeCheckpointResult::Unwinding))
348 }
349
350 #[cfg(not(feature = "journal"))]
352 pub fn do_checkpoints_from_outside(_ctx: &mut FunctionEnvMut<'_, WasiEnv>) {}
353
354 #[cfg(feature = "journal")]
356 pub fn do_checkpoints_from_outside(ctx: &mut FunctionEnvMut<'_, WasiEnv>) {
357 let inner = ctx.data().process.inner.clone();
358 let mut guard = inner.0.lock().unwrap();
359
360 while let WasiProcessCheckpoint::Snapshot { trigger } = guard.checkpoint {
363 ctx.data().thread.set_checkpointing(true);
364
365 let is_last_thread = guard
367 .threads
368 .values()
369 .all(|t| t.is_check_pointing() || t.is_deep_sleeping());
370 if is_last_thread {
371 if let Err(err) =
372 JournalEffector::save_memory_and_snapshot(ctx, &mut guard, trigger)
373 {
374 inner.1.notify_all();
375 tracing::error!("failed to snapshot memory and threads - {}", err);
376 return;
377 }
378
379 ctx.data().thread.set_checkpointing(false);
381 trace!("checkpoint complete");
382 if guard.disable_journaling_after_checkpoint {
383 ctx.data_mut().enable_journal = false;
384 }
385 guard.checkpoint = WasiProcessCheckpoint::Execute;
386 for waker in guard.wakers.drain(..) {
387 waker.wake();
388 }
389 inner.1.notify_all();
390 } else {
391 guard = inner.1.wait(guard).unwrap();
392 }
393 continue;
394 }
395
396 ctx.data().thread.set_checkpointing(false);
397 trace!("checkpoint finished");
398 }
399}
400
401pub(crate) struct WasiProcessWait {
403 waiting: Arc<AtomicU32>,
404}
405
406impl WasiProcessWait {
407 pub fn new(process: &WasiProcess) -> Self {
408 process.waiting.fetch_add(1, Ordering::AcqRel);
409 Self {
410 waiting: process.waiting.clone(),
411 }
412 }
413}
414
415impl Drop for WasiProcessWait {
416 fn drop(&mut self) {
417 self.waiting.fetch_sub(1, Ordering::AcqRel);
418 }
419}
420
421impl WasiProcess {
422 pub fn new(pid: WasiProcessId, module_hash: ModuleHash, plane: WasiControlPlaneHandle) -> Self {
423 let max_cpu_backoff_time = plane
424 .upgrade()
425 .and_then(|p| p.config().enable_exponential_cpu_backoff)
426 .unwrap_or(Duration::from_secs(30));
427 let max_cpu_cool_off_time = Duration::from_millis(500);
428
429 let waiting = Arc::new(AtomicU32::new(0));
430 let inner = Arc::new((
431 Mutex::new(WasiProcessInner {
432 pid,
433 threads: Default::default(),
434 thread_count: Default::default(),
435 signal_intervals: Default::default(),
436 children: Default::default(),
437 checkpoint: WasiProcessCheckpoint::Execute,
438 wakers: Default::default(),
439 cleanup_started: false,
440 memory: Default::default(),
441 waiting: waiting.clone(),
442 #[cfg(feature = "journal")]
443 snapshot_on: Default::default(),
444 #[cfg(feature = "journal")]
445 snapshot_memory_hash: Default::default(),
446 disable_journaling_after_checkpoint: false,
447 stop_running_after_checkpoint: false,
448 backoff: WasiProcessCpuBackoff::new(max_cpu_backoff_time, max_cpu_cool_off_time),
449 }),
450 Condvar::new(),
451 ));
452
453 #[derive(Debug)]
454 struct SignalHandler(LockableWasiProcessInner);
455 impl SignalHandlerAbi for SignalHandler {
456 fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError> {
457 if let Ok(signal) = signal.try_into() {
458 signal_process_internal(&self.0, signal);
459 Ok(())
460 } else {
461 Err(SignalDeliveryError)
462 }
463 }
464 }
465
466 WasiProcess {
467 pid,
468 module_hash,
469 parent: None,
470 compute: plane,
471 inner: inner.clone(),
472 finished: Arc::new(
473 OwnedTaskStatus::new(TaskStatus::Pending)
474 .with_signal_handler(Arc::new(SignalHandler(inner))),
475 ),
476 waiting,
477 cpu_run_tokens: Arc::new(AtomicU32::new(0)),
478 }
479 }
480
481 pub fn try_start_cleanup(&self) -> bool {
484 let mut guard = self.inner.0.lock().unwrap();
485 if guard.cleanup_started {
486 false
487 } else {
488 guard.cleanup_started = true;
489 true
490 }
491 }
492
493 pub(super) fn set_pid(&mut self, pid: WasiProcessId) {
494 self.pid = pid;
495 }
496
497 pub fn pid(&self) -> WasiProcessId {
499 self.pid
500 }
501
502 pub fn ppid(&self) -> WasiProcessId {
504 self.parent
505 .iter()
506 .filter_map(|parent| parent.upgrade())
507 .map(|parent| parent.read().unwrap().pid)
508 .next()
509 .unwrap_or(WasiProcessId(0))
510 }
511
512 pub fn lock(&self) -> MutexGuard<'_, WasiProcessInner> {
515 self.inner.0.lock().unwrap()
516 }
517
518 pub fn new_thread(
520 &self,
521 layout: WasiMemoryLayout,
522 start: ThreadStartType,
523 ) -> Result<WasiThreadHandle, ControlPlaneError> {
524 let control_plane = self.compute.must_upgrade();
525
526 let is_main = matches!(start, ThreadStartType::MainThread);
528
529 let tid: WasiThreadId = if is_main {
532 self.pid().raw().into()
533 } else {
534 let tid: u32 = control_plane.generate_id()?.into();
535 tid.into()
536 };
537
538 self.new_thread_with_id(layout, start, tid)
539 }
540
541 pub fn new_thread_with_id(
543 &self,
544 layout: WasiMemoryLayout,
545 start: ThreadStartType,
546 tid: WasiThreadId,
547 ) -> Result<WasiThreadHandle, ControlPlaneError> {
548 let control_plane = self.compute.must_upgrade();
549 let task_count_guard = control_plane.register_task()?;
550
551 let is_main = matches!(start, ThreadStartType::MainThread);
552
553 let mut inner = self.inner.0.lock().unwrap();
555 let finished = if is_main {
556 self.finished.clone()
557 } else {
558 Arc::new(OwnedTaskStatus::default())
559 };
560
561 let ctrl = WasiThread::new(
563 self.pid(),
564 tid,
565 is_main,
566 finished,
567 task_count_guard,
568 layout,
569 start,
570 );
571 inner.threads.insert(tid, ctrl.clone());
572 inner.thread_count += 1;
573
574 Ok(WasiThreadHandle::new(ctrl, &self.inner))
575 }
576
577 pub fn all_threads(&self) -> Vec<WasiThreadId> {
578 let inner = self.inner.0.lock().unwrap();
579 inner.threads.keys().cloned().collect()
580 }
581
582 pub fn get_thread(&self, tid: &WasiThreadId) -> Option<WasiThread> {
584 let inner = self.inner.0.lock().unwrap();
585 inner.threads.get(tid).cloned()
586 }
587
588 pub fn signal_thread(&self, tid: &WasiThreadId, signal: Signal) {
590 let mut tid = tid.raw();
592 if tid == 1073741823 {
593 tid = self.pid().raw();
594 }
595 let tid: WasiThreadId = tid.into();
596
597 let pid = self.pid();
598 tracing::trace!(%pid, %tid, "signal-thread({:?})", signal);
599
600 let inner = self.inner.0.lock().unwrap();
601
602 wake_atomic_waiters(&inner, signal);
603 if let Some(thread) = inner.threads.get(&tid) {
604 thread.signal(signal);
605 } else {
606 trace!(
607 "wasi[{}]::lost-signal(tid={}, sig={:?})",
608 self.pid(),
609 tid,
610 signal
611 );
612 }
613 }
614
615 pub fn signal_process(&self, signal: Signal) {
617 signal_process_internal(&self.inner, signal);
618 }
619
620 pub fn register_memory(&self, memory: SharedMemory) {
622 let mut inner = self.inner.0.lock().unwrap();
623 inner.memory = Some(memory);
624 }
625
626 pub fn snapshot_and_disable_journaling(
632 &self,
633 trigger: SnapshotTrigger,
634 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
635 let mut guard = self.inner.0.lock().unwrap();
636 guard.disable_journaling_after_checkpoint = true;
637 guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
638 self.wait_for_checkpoint_finish()
639 }
640
641 pub fn snapshot_and_stop(
647 &self,
648 trigger: SnapshotTrigger,
649 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
650 let mut guard = self.inner.0.lock().unwrap();
651 guard.stop_running_after_checkpoint = true;
652 guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
653 self.wait_for_checkpoint_finish()
654 }
655
656 pub fn snapshot(
661 &self,
662 trigger: SnapshotTrigger,
663 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
664 let mut guard = self.inner.0.lock().unwrap();
665 guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
666 self.wait_for_checkpoint_finish()
667 }
668
669 pub fn disable_journaling_after_checkpoint(&self) {
671 let mut guard = self.inner.0.lock().unwrap();
672 guard.disable_journaling_after_checkpoint = true;
673 }
674
675 pub fn stop_running_after_checkpoint(&self) {
677 let mut guard = self.inner.0.lock().unwrap();
678 guard.stop_running_after_checkpoint = true;
679 }
680
681 #[cfg(not(feature = "journal"))]
683 pub fn wait_for_checkpoint(
684 &self,
685 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
686 Box::pin(std::future::pending())
687 }
688
689 #[cfg(feature = "journal")]
691 pub fn wait_for_checkpoint(
692 &self,
693 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
694 use futures::Future;
695 use std::{
696 pin::Pin,
697 task::{Context, Poll},
698 };
699
700 struct Poller {
701 inner: LockableWasiProcessInner,
702 }
703 impl Future for Poller {
704 type Output = ();
705 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
706 let mut guard = self.inner.0.lock().unwrap();
707 if !matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) {
708 return Poll::Ready(());
709 }
710 if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) {
711 guard.wakers.push(cx.waker().clone());
712 }
713 Poll::Pending
714 }
715 }
716 Box::pin(Poller {
717 inner: self.inner.clone(),
718 })
719 }
720
721 #[cfg(not(feature = "journal"))]
723 pub fn wait_for_checkpoint_finish(
724 &self,
725 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
726 Box::pin(std::future::pending())
727 }
728
729 #[cfg(feature = "journal")]
731 pub fn wait_for_checkpoint_finish(
732 &self,
733 ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
734 use futures::Future;
735 use std::{
736 pin::Pin,
737 task::{Context, Poll},
738 };
739
740 struct Poller {
741 inner: LockableWasiProcessInner,
742 }
743 impl Future for Poller {
744 type Output = ();
745 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
746 let mut guard = self.inner.0.lock().unwrap();
747 if matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) {
748 return Poll::Ready(());
749 }
750 if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) {
751 guard.wakers.push(cx.waker().clone());
752 }
753 Poll::Pending
754 }
755 }
756 Box::pin(Poller {
757 inner: self.inner.clone(),
758 })
759 }
760
761 pub fn signal_interval(&self, signal: Signal, interval: Option<Duration>, repeat: bool) {
763 let mut inner = self.inner.0.lock().unwrap();
764
765 let interval = match interval {
766 None => {
767 inner.signal_intervals.remove(&signal);
768 return;
769 }
770 Some(a) => a,
771 };
772
773 let now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128;
774 inner.signal_intervals.insert(
775 signal,
776 WasiSignalInterval {
777 signal,
778 interval,
779 last_signal: now,
780 repeat,
781 },
782 );
783 }
784
785 pub fn active_threads(&self) -> u32 {
787 let inner = self.inner.0.lock().unwrap();
788 inner.thread_count
789 }
790
791 pub async fn join(&self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
793 let _guard = WasiProcessWait::new(self);
794 self.finished.await_termination().await
795 }
796
797 pub fn try_join(&self) -> Option<Result<ExitCode, Arc<WasiRuntimeError>>> {
799 self.finished.status().into_finished()
800 }
801
802 pub async fn join_children(&mut self) -> Option<Result<ExitCode, Arc<WasiRuntimeError>>> {
804 let _guard = WasiProcessWait::new(self);
805 let children: Vec<_> = {
806 let inner = self.inner.0.lock().unwrap();
807 inner.children.clone()
808 };
809 if children.is_empty() {
810 return None;
811 }
812 let mut waits = Vec::new();
813 for child in children {
814 if let Some(process) = self.compute.must_upgrade().get_process(child.pid) {
815 let inner = self.inner.clone();
816 waits.push(async move {
817 let join = process.join().await;
818 let mut inner = inner.0.lock().unwrap();
819 inner.children.retain(|a| a.pid != child.pid);
820 join
821 })
822 }
823 }
824 futures::future::join_all(waits).await.into_iter().next()
825 }
826
827 pub async fn join_any_child(&mut self) -> Result<Option<(WasiProcessId, ExitCode)>, Errno> {
829 let _guard = WasiProcessWait::new(self);
830 let children: Vec<_> = {
831 let inner = self.inner.0.lock().unwrap();
832 inner.children.clone()
833 };
834 if children.is_empty() {
835 return Err(Errno::Child);
836 }
837
838 let mut waits = Vec::new();
839 for child in children {
840 if let Some(process) = self.compute.must_upgrade().get_process(child.pid) {
841 let inner = self.inner.clone();
842 waits.push(async move {
843 let join = process.join().await;
844 let mut inner = inner.0.lock().unwrap();
845 inner.children.retain(|a| a.pid != child.pid);
846 (child, join)
847 })
848 }
849 }
850 let (child, res) = futures::future::select_all(waits.into_iter().map(Box::pin))
851 .await
852 .0;
853
854 let code =
855 res.unwrap_or_else(|e| e.as_exit_code().unwrap_or_else(|| Errno::Canceled.into()));
856
857 Ok(Some((child.pid, code)))
858 }
859
860 pub fn terminate(&self, exit_code: ExitCode) {
862 let pid = self.pid;
863 tracing::trace!(%pid, %exit_code, "process-terminate");
864 let guard = self.inner.0.lock().unwrap();
867 for thread in guard.threads.values() {
868 thread.set_status_finished(Ok(exit_code))
869 }
870 }
871}
872
873fn signal_process_internal(process: &LockableWasiProcessInner, signal: Signal) {
875 #[allow(unused_mut)]
876 let mut guard = process.0.lock().unwrap();
877 let pid = guard.pid;
878 tracing::trace!(%pid, "signal-process({:?})", signal);
879
880 #[cfg(feature = "journal")]
883 {
884 if signal == Signal::Sigint
885 && (guard.snapshot_on.contains(&SnapshotTrigger::Sigint)
886 || guard.snapshot_on.remove(&SnapshotTrigger::FirstSigint))
887 {
888 drop(guard);
889
890 tracing::debug!(%pid, "snapshot-on-interrupt-signal");
891
892 do_checkpoint_from_outside(
893 process,
894 WasiProcessCheckpoint::Snapshot {
895 trigger: SnapshotTrigger::Sigint,
896 },
897 );
898 return;
899 };
900 }
901
902 if guard.waiting.load(Ordering::Acquire) > 0 {
905 let mut triggered = false;
906 for child in guard.children.iter() {
907 child.signal_process(signal);
908 triggered = true;
909 }
910 if triggered {
911 return;
912 }
913 }
914
915 wake_atomic_waiters(&guard, signal);
917 for thread in guard.threads.values() {
918 thread.signal(signal);
919 }
920}
921
922fn wake_atomic_waiters(process: &WasiProcessInner, signal: Signal) {
923 let Some(memory) = &process.memory else {
924 return;
925 };
926
927 if signal == Signal::Sigkill {
928 if let Err(err) = memory.disable_atomics() {
931 tracing::trace!(
932 pid=%process.pid,
933 error = &err as &dyn std::error::Error,
934 "failed to wake atomic waiters"
935 );
936 }
937 }
938
939 }
959
960impl SignalHandlerAbi for WasiProcess {
961 fn signal(&self, sig: u8) -> Result<(), SignalDeliveryError> {
962 if let Ok(sig) = sig.try_into() {
963 self.signal_process(sig);
964 Ok(())
965 } else {
966 Err(SignalDeliveryError)
967 }
968 }
969}