wasmer_wasix/os/task/
process.rs

1use crate::{WasiEnv, WasiRuntimeError, journal::SnapshotTrigger};
2#[cfg(feature = "journal")]
3use crate::{WasiResult, journal::JournalEffector, syscalls::do_checkpoint_from_outside, unwind};
4use serde::{Deserialize, Serialize};
5#[cfg(feature = "journal")]
6use std::collections::HashSet;
7use std::{
8    collections::HashMap,
9    convert::TryInto,
10    ops::Range,
11    sync::{
12        Arc, Condvar, Mutex, MutexGuard, RwLock, Weak,
13        atomic::{AtomicU32, Ordering},
14    },
15    task::Waker,
16    time::Duration,
17};
18use tracing::trace;
19use wasmer::{FunctionEnvMut, SharedMemory};
20use wasmer_types::ModuleHash;
21use wasmer_wasix_types::{
22    types::Signal,
23    wasi::{Errno, ExitCode, Snapshot0Clockid},
24    wasix::ThreadStartType,
25};
26
27use crate::{
28    WasiThread, WasiThreadHandle, WasiThreadId, os::task::signal::WasiSignalInterval,
29    syscalls::platform_clock_time_get,
30};
31
32use super::{
33    TaskStatus,
34    backoff::WasiProcessCpuBackoff,
35    control_plane::{ControlPlaneError, WasiControlPlaneHandle},
36    signal::{SignalDeliveryError, SignalHandlerAbi},
37    task_join_handle::OwnedTaskStatus,
38    thread::WasiMemoryLayout,
39};
40
41/// Represents the ID of a sub-process
42#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
43pub struct WasiProcessId(u32);
44
45impl WasiProcessId {
46    pub fn raw(&self) -> u32 {
47        self.0
48    }
49}
50
51impl From<i32> for WasiProcessId {
52    fn from(id: i32) -> Self {
53        Self(id as u32)
54    }
55}
56
57impl From<WasiProcessId> for i32 {
58    fn from(val: WasiProcessId) -> Self {
59        val.0 as i32
60    }
61}
62
63impl From<u32> for WasiProcessId {
64    fn from(id: u32) -> Self {
65        Self(id)
66    }
67}
68
69impl From<WasiProcessId> for u32 {
70    fn from(val: WasiProcessId) -> Self {
71        val.0
72    }
73}
74
75impl std::fmt::Display for WasiProcessId {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "{}", self.0)
78    }
79}
80
81impl std::fmt::Debug for WasiProcessId {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(f, "{}", self.0)
84    }
85}
86
87pub type LockableWasiProcessInner = Arc<(Mutex<WasiProcessInner>, Condvar)>;
88
89/// Represents a process running within the compute state
90/// TODO: fields should be private and only accessed via methods.
91#[derive(Debug, Clone)]
92pub struct WasiProcess {
93    /// Unique ID of this process
94    pub(crate) pid: WasiProcessId,
95    /// Hash of the module that this process is using
96    pub(crate) module_hash: ModuleHash,
97    /// List of all the children spawned from this thread
98    pub(crate) parent: Option<Weak<RwLock<WasiProcessInner>>>,
99    /// The inner protected region of the process with a conditional
100    /// variable that is used for coordination such as snapshots.
101    pub(crate) inner: LockableWasiProcessInner,
102    /// Reference back to the compute engine
103    // TODO: remove this reference, access should happen via separate state instead
104    // (we don't want cyclical references)
105    pub(crate) compute: WasiControlPlaneHandle,
106    /// Reference to the exit code for the main thread
107    pub(crate) finished: Arc<OwnedTaskStatus>,
108    /// Number of threads waiting for children to exit
109    pub(crate) waiting: Arc<AtomicU32>,
110    /// Number of tokens that are currently active and thus
111    /// the exponential backoff of CPU is halted (as in CPU
112    /// is allowed to run freely)
113    pub(crate) cpu_run_tokens: Arc<AtomicU32>,
114}
115
116/// Represents a freeze of all threads to perform some action
117/// on the total state-machine. This is normally done for
118/// things like snapshots which require the memory to remain
119/// stable while it performs a diff.
120#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
121pub enum WasiProcessCheckpoint {
122    /// No checkpoint will take place and the process
123    /// should just execute as per normal
124    Execute,
125    /// The process needs to take a snapshot of the
126    /// memory and state-machine
127    Snapshot { trigger: SnapshotTrigger },
128}
129
130#[repr(C)]
131#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
132pub struct MemorySnapshotRegion {
133    pub start: u64,
134    pub end: u64,
135}
136
137impl From<Range<u64>> for MemorySnapshotRegion {
138    fn from(value: Range<u64>) -> Self {
139        Self {
140            start: value.start,
141            end: value.end,
142        }
143    }
144}
145
146#[allow(clippy::from_over_into)]
147impl Into<Range<u64>> for MemorySnapshotRegion {
148    fn into(self) -> Range<u64> {
149        self.start..self.end
150    }
151}
152
153// TODO: fields should be private and only accessed via methods.
154#[derive(Debug)]
155pub struct WasiProcessInner {
156    /// Unique ID of this process
157    pub pid: WasiProcessId,
158    /// Number of threads waiting for children to exit
159    pub(crate) waiting: Arc<AtomicU32>,
160    /// The threads that make up this process
161    pub threads: HashMap<WasiThreadId, WasiThread>,
162    /// Number of threads running for this process
163    pub thread_count: u32,
164    /// Signals that will be triggered at specific intervals
165    pub signal_intervals: HashMap<Signal, WasiSignalInterval>,
166    /// List of all the children spawned from this thread
167    pub children: Vec<WasiProcess>,
168    /// Represents a checkpoint which blocks all the threads
169    /// and then executes some maintenance action
170    pub checkpoint: WasiProcessCheckpoint,
171    /// If true then the journaling will be disabled after the
172    /// next snapshot is taken
173    pub disable_journaling_after_checkpoint: bool,
174    /// If true then the process will stop running after the
175    /// next snapshot is taken
176    pub stop_running_after_checkpoint: bool,
177    /// List of situations that the process will checkpoint on
178    #[cfg(feature = "journal")]
179    pub snapshot_on: HashSet<SnapshotTrigger>,
180    /// Any wakers waiting on this process (for example for a checkpoint)
181    pub wakers: Vec<Waker>,
182    /// If true then the process has started cleaning up
183    pub cleanup_started: bool,
184    /// Shared process memory.
185    pub memory: Option<SharedMemory>,
186    /// The snapshot memory significantly reduce the amount of
187    /// duplicate entries in the journal for memory that has not changed
188    #[cfg(feature = "journal")]
189    pub snapshot_memory_hash: HashMap<MemorySnapshotRegion, u64>,
190    /// Represents all the backoff properties for this process
191    /// which will be used to determine if the CPU should be
192    /// throttled or not
193    pub(super) backoff: WasiProcessCpuBackoff,
194}
195
196pub enum MaybeCheckpointResult<'a> {
197    NotThisTime(FunctionEnvMut<'a, WasiEnv>),
198    Unwinding,
199}
200
201impl WasiProcessInner {
202    /// Checkpoints the process which will cause all other threads to
203    /// pause and for the thread and memory state to be saved
204    #[cfg(feature = "journal")]
205    pub fn checkpoint<M: wasmer_types::MemorySize>(
206        inner: LockableWasiProcessInner,
207        ctx: FunctionEnvMut<'_, WasiEnv>,
208        for_what: WasiProcessCheckpoint,
209    ) -> WasiResult<MaybeCheckpointResult<'_>> {
210        // Set the checkpoint flag and then enter the normal processing loop
211        {
212            let mut guard = inner.0.lock().unwrap();
213            guard.checkpoint = for_what;
214            for waker in guard.wakers.drain(..) {
215                waker.wake();
216            }
217            inner.1.notify_all();
218        }
219
220        Self::maybe_checkpoint::<M>(inner, ctx)
221    }
222
223    /// If a checkpoint has been started this will block the current process
224    /// until the checkpoint operation has completed
225    #[cfg(feature = "journal")]
226    pub fn maybe_checkpoint<M: wasmer_types::MemorySize>(
227        inner: LockableWasiProcessInner,
228        ctx: FunctionEnvMut<'_, WasiEnv>,
229    ) -> WasiResult<MaybeCheckpointResult<'_>> {
230        // Enter the lock which will determine if we are in a checkpoint or not
231
232        use bytes::Bytes;
233        use wasmer::AsStoreMut;
234        use wasmer_types::OnCalledAction;
235
236        use crate::{WasiError, os::task::thread::RewindResultType, rewind_ext};
237        let guard = inner.0.lock().unwrap();
238        if guard.checkpoint == WasiProcessCheckpoint::Execute {
239            // No checkpoint so just carry on
240            return Ok(Ok(MaybeCheckpointResult::NotThisTime(ctx)));
241        }
242        trace!("checkpoint capture");
243        drop(guard);
244
245        // Perform the unwind action
246        let thread_layout = ctx.data().thread.memory_layout().clone();
247        unwind::<M, _>(ctx, move |mut ctx, memory_stack, rewind_stack| {
248            // Grab all the globals and serialize them
249            let store_data = crate::utils::store::capture_store_snapshot(&mut ctx.as_store_mut())
250                .serialize()
251                .unwrap();
252            let memory_stack = memory_stack.freeze();
253            let rewind_stack = rewind_stack.freeze();
254            let store_data = Bytes::from(store_data);
255
256            tracing::debug!(
257                "stack snapshot unwind (memory_stack={}, rewind_stack={}, store_data={})",
258                memory_stack.len(),
259                rewind_stack.len(),
260                store_data.len(),
261            );
262
263            // Write our thread state to the snapshot
264            let thread_start = ctx.data().thread.thread_start_type();
265            let tid = ctx.data().thread.tid();
266            if let Err(err) = JournalEffector::save_thread_state::<M>(
267                &mut ctx,
268                tid,
269                memory_stack.clone(),
270                rewind_stack.clone(),
271                store_data.clone(),
272                thread_start,
273                thread_layout,
274            ) {
275                return wasmer_types::OnCalledAction::Trap(err.into());
276            }
277
278            let mut guard = inner.0.lock().unwrap();
279
280            // Wait for the checkpoint to finish (or if we are the last thread
281            // to freeze then we have to execute the checksum operation)
282            loop {
283                if let WasiProcessCheckpoint::Snapshot { trigger } = guard.checkpoint {
284                    ctx.data().thread.set_checkpointing(true);
285
286                    // Now if we are the last thread we also write the memory
287                    let is_last_thread = guard
288                        .threads
289                        .values()
290                        .all(|t| t.is_check_pointing() || t.is_deep_sleeping());
291                    if is_last_thread {
292                        if let Err(err) =
293                            JournalEffector::save_memory_and_snapshot(&mut ctx, &mut guard, trigger)
294                        {
295                            inner.1.notify_all();
296                            return wasmer_types::OnCalledAction::Trap(err.into());
297                        }
298
299                        // Clear the checkpointing flag and notify everyone to wake up
300                        ctx.data().thread.set_checkpointing(false);
301                        trace!("checkpoint complete");
302                        if guard.disable_journaling_after_checkpoint {
303                            ctx.data_mut().enable_journal = false;
304                        }
305                        guard.checkpoint = WasiProcessCheckpoint::Execute;
306                        for waker in guard.wakers.drain(..) {
307                            waker.wake();
308                        }
309                        inner.1.notify_all();
310                    } else {
311                        guard = inner.1.wait(guard).unwrap();
312                    }
313                    continue;
314                }
315
316                ctx.data().thread.set_checkpointing(false);
317                trace!("checkpoint finished");
318
319                if guard.stop_running_after_checkpoint {
320                    trace!("will stop running now");
321                    // Need to stop recording journal events so we don't also record the
322                    // thread and process exit events
323                    ctx.data_mut().enable_journal = false;
324                    return OnCalledAction::Finish;
325                }
326
327                // Rewind the stack and carry on
328                return match rewind_ext::<M>(
329                    &mut ctx,
330                    Some(memory_stack),
331                    rewind_stack,
332                    store_data,
333                    RewindResultType::RewindWithoutResult,
334                ) {
335                    Errno::Success => OnCalledAction::InvokeAgain,
336                    err => {
337                        tracing::warn!(
338                            "snapshot resumption failed - could not rewind the stack - errno={}",
339                            err
340                        );
341                        OnCalledAction::Trap(Box::new(WasiError::Exit(err.into())))
342                    }
343                };
344            }
345        })?;
346
347        Ok(Ok(MaybeCheckpointResult::Unwinding))
348    }
349
350    // Execute any checkpoints that can be executed while outside of the WASM process
351    #[cfg(not(feature = "journal"))]
352    pub fn do_checkpoints_from_outside(_ctx: &mut FunctionEnvMut<'_, WasiEnv>) {}
353
354    // Execute any checkpoints that can be executed while outside of the WASM process
355    #[cfg(feature = "journal")]
356    pub fn do_checkpoints_from_outside(ctx: &mut FunctionEnvMut<'_, WasiEnv>) {
357        let inner = ctx.data().process.inner.clone();
358        let mut guard = inner.0.lock().unwrap();
359
360        // Wait for the checkpoint to finish (or if we are the last thread
361        // to freeze then we have to execute the checksum operation)
362        while let WasiProcessCheckpoint::Snapshot { trigger } = guard.checkpoint {
363            ctx.data().thread.set_checkpointing(true);
364
365            // Now if we are the last thread we also write the memory
366            let is_last_thread = guard
367                .threads
368                .values()
369                .all(|t| t.is_check_pointing() || t.is_deep_sleeping());
370            if is_last_thread {
371                if let Err(err) =
372                    JournalEffector::save_memory_and_snapshot(ctx, &mut guard, trigger)
373                {
374                    inner.1.notify_all();
375                    tracing::error!("failed to snapshot memory and threads - {}", err);
376                    return;
377                }
378
379                // Clear the checkpointing flag and notify everyone to wake up
380                ctx.data().thread.set_checkpointing(false);
381                trace!("checkpoint complete");
382                if guard.disable_journaling_after_checkpoint {
383                    ctx.data_mut().enable_journal = false;
384                }
385                guard.checkpoint = WasiProcessCheckpoint::Execute;
386                for waker in guard.wakers.drain(..) {
387                    waker.wake();
388                }
389                inner.1.notify_all();
390            } else {
391                guard = inner.1.wait(guard).unwrap();
392            }
393            continue;
394        }
395
396        ctx.data().thread.set_checkpointing(false);
397        trace!("checkpoint finished");
398    }
399}
400
401// TODO: why do we need this, how is it used?
402pub(crate) struct WasiProcessWait {
403    waiting: Arc<AtomicU32>,
404}
405
406impl WasiProcessWait {
407    pub fn new(process: &WasiProcess) -> Self {
408        process.waiting.fetch_add(1, Ordering::AcqRel);
409        Self {
410            waiting: process.waiting.clone(),
411        }
412    }
413}
414
415impl Drop for WasiProcessWait {
416    fn drop(&mut self) {
417        self.waiting.fetch_sub(1, Ordering::AcqRel);
418    }
419}
420
421impl WasiProcess {
422    pub fn new(pid: WasiProcessId, module_hash: ModuleHash, plane: WasiControlPlaneHandle) -> Self {
423        let max_cpu_backoff_time = plane
424            .upgrade()
425            .and_then(|p| p.config().enable_exponential_cpu_backoff)
426            .unwrap_or(Duration::from_secs(30));
427        let max_cpu_cool_off_time = Duration::from_millis(500);
428
429        let waiting = Arc::new(AtomicU32::new(0));
430        let inner = Arc::new((
431            Mutex::new(WasiProcessInner {
432                pid,
433                threads: Default::default(),
434                thread_count: Default::default(),
435                signal_intervals: Default::default(),
436                children: Default::default(),
437                checkpoint: WasiProcessCheckpoint::Execute,
438                wakers: Default::default(),
439                cleanup_started: false,
440                memory: Default::default(),
441                waiting: waiting.clone(),
442                #[cfg(feature = "journal")]
443                snapshot_on: Default::default(),
444                #[cfg(feature = "journal")]
445                snapshot_memory_hash: Default::default(),
446                disable_journaling_after_checkpoint: false,
447                stop_running_after_checkpoint: false,
448                backoff: WasiProcessCpuBackoff::new(max_cpu_backoff_time, max_cpu_cool_off_time),
449            }),
450            Condvar::new(),
451        ));
452
453        #[derive(Debug)]
454        struct SignalHandler(LockableWasiProcessInner);
455        impl SignalHandlerAbi for SignalHandler {
456            fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError> {
457                if let Ok(signal) = signal.try_into() {
458                    signal_process_internal(&self.0, signal);
459                    Ok(())
460                } else {
461                    Err(SignalDeliveryError)
462                }
463            }
464        }
465
466        WasiProcess {
467            pid,
468            module_hash,
469            parent: None,
470            compute: plane,
471            inner: inner.clone(),
472            finished: Arc::new(
473                OwnedTaskStatus::new(TaskStatus::Pending)
474                    .with_signal_handler(Arc::new(SignalHandler(inner))),
475            ),
476            waiting,
477            cpu_run_tokens: Arc::new(AtomicU32::new(0)),
478        }
479    }
480
481    /// Tries to start the cleanup process, returns true if this is the first
482    /// thread to start the cleanup.
483    pub fn try_start_cleanup(&self) -> bool {
484        let mut guard = self.inner.0.lock().unwrap();
485        if guard.cleanup_started {
486            false
487        } else {
488            guard.cleanup_started = true;
489            true
490        }
491    }
492
493    pub(super) fn set_pid(&mut self, pid: WasiProcessId) {
494        self.pid = pid;
495    }
496
497    /// Gets the process ID of this process
498    pub fn pid(&self) -> WasiProcessId {
499        self.pid
500    }
501
502    /// Gets the process ID of the parent process
503    pub fn ppid(&self) -> WasiProcessId {
504        self.parent
505            .iter()
506            .filter_map(|parent| parent.upgrade())
507            .map(|parent| parent.read().unwrap().pid)
508            .next()
509            .unwrap_or(WasiProcessId(0))
510    }
511
512    /// Gains access to the process internals
513    // TODO: Make this private, all inner access should be exposed with methods.
514    pub fn lock(&self) -> MutexGuard<'_, WasiProcessInner> {
515        self.inner.0.lock().unwrap()
516    }
517
518    /// Creates a thread and returns it
519    pub fn new_thread(
520        &self,
521        layout: WasiMemoryLayout,
522        start: ThreadStartType,
523    ) -> Result<WasiThreadHandle, ControlPlaneError> {
524        let control_plane = self.compute.must_upgrade();
525
526        // Determine if its the main thread or not
527        let is_main = matches!(start, ThreadStartType::MainThread);
528
529        // Generate a new process ID (this is because the process ID and thread ID
530        // address space must not overlap in libc). For the main process the TID=PID
531        let tid: WasiThreadId = if is_main {
532            self.pid().raw().into()
533        } else {
534            let tid: u32 = control_plane.generate_id()?.into();
535            tid.into()
536        };
537
538        self.new_thread_with_id(layout, start, tid)
539    }
540
541    /// Creates a thread and returns it
542    pub fn new_thread_with_id(
543        &self,
544        layout: WasiMemoryLayout,
545        start: ThreadStartType,
546        tid: WasiThreadId,
547    ) -> Result<WasiThreadHandle, ControlPlaneError> {
548        let control_plane = self.compute.must_upgrade();
549        let task_count_guard = control_plane.register_task()?;
550
551        let is_main = matches!(start, ThreadStartType::MainThread);
552
553        // The wait finished should be the process version if its the main thread
554        let mut inner = self.inner.0.lock().unwrap();
555        let finished = if is_main {
556            self.finished.clone()
557        } else {
558            Arc::new(OwnedTaskStatus::default())
559        };
560
561        // Insert the thread into the pool
562        let ctrl = WasiThread::new(
563            self.pid(),
564            tid,
565            is_main,
566            finished,
567            task_count_guard,
568            layout,
569            start,
570        );
571        inner.threads.insert(tid, ctrl.clone());
572        inner.thread_count += 1;
573
574        Ok(WasiThreadHandle::new(ctrl, &self.inner))
575    }
576
577    pub fn all_threads(&self) -> Vec<WasiThreadId> {
578        let inner = self.inner.0.lock().unwrap();
579        inner.threads.keys().cloned().collect()
580    }
581
582    /// Gets a reference to a particular thread
583    pub fn get_thread(&self, tid: &WasiThreadId) -> Option<WasiThread> {
584        let inner = self.inner.0.lock().unwrap();
585        inner.threads.get(tid).cloned()
586    }
587
588    /// Signals a particular thread in the process
589    pub fn signal_thread(&self, tid: &WasiThreadId, signal: Signal) {
590        // Sometimes we will signal the process rather than the thread hence this libc hardcoded value
591        let mut tid = tid.raw();
592        if tid == 1073741823 {
593            tid = self.pid().raw();
594        }
595        let tid: WasiThreadId = tid.into();
596
597        let pid = self.pid();
598        tracing::trace!(%pid, %tid, "signal-thread({:?})", signal);
599
600        let inner = self.inner.0.lock().unwrap();
601
602        wake_atomic_waiters(&inner, signal);
603        if let Some(thread) = inner.threads.get(&tid) {
604            thread.signal(signal);
605        } else {
606            trace!(
607                "wasi[{}]::lost-signal(tid={}, sig={:?})",
608                self.pid(),
609                tid,
610                signal
611            );
612        }
613    }
614
615    /// Signals all the threads in this process
616    pub fn signal_process(&self, signal: Signal) {
617        signal_process_internal(&self.inner, signal);
618    }
619
620    /// Registers the shared memory used by this process.
621    pub fn register_memory(&self, memory: SharedMemory) {
622        let mut inner = self.inner.0.lock().unwrap();
623        inner.memory = Some(memory);
624    }
625
626    /// Takes a snapshot of the process and disables journaling returning
627    /// a future that can be waited on for the snapshot to complete
628    ///
629    /// Note: If you ignore the returned future the checkpoint will still
630    /// occur but it will execute asynchronously
631    pub fn snapshot_and_disable_journaling(
632        &self,
633        trigger: SnapshotTrigger,
634    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
635        let mut guard = self.inner.0.lock().unwrap();
636        guard.disable_journaling_after_checkpoint = true;
637        guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
638        self.wait_for_checkpoint_finish()
639    }
640
641    /// Takes a snapshot of the process and shuts it down after the snapshot
642    /// is taken.
643    ///
644    /// Note: If you ignore the returned future the checkpoint will still
645    /// occur but it will execute asynchronously
646    pub fn snapshot_and_stop(
647        &self,
648        trigger: SnapshotTrigger,
649    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
650        let mut guard = self.inner.0.lock().unwrap();
651        guard.stop_running_after_checkpoint = true;
652        guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
653        self.wait_for_checkpoint_finish()
654    }
655
656    /// Takes a snapshot of the process
657    ///
658    /// Note: If you ignore the returned future the checkpoint will still
659    /// occur but it will execute asynchronously
660    pub fn snapshot(
661        &self,
662        trigger: SnapshotTrigger,
663    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
664        let mut guard = self.inner.0.lock().unwrap();
665        guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
666        self.wait_for_checkpoint_finish()
667    }
668
669    /// Disables the journaling functionality
670    pub fn disable_journaling_after_checkpoint(&self) {
671        let mut guard = self.inner.0.lock().unwrap();
672        guard.disable_journaling_after_checkpoint = true;
673    }
674
675    /// Stop running once a checkpoint is taken
676    pub fn stop_running_after_checkpoint(&self) {
677        let mut guard = self.inner.0.lock().unwrap();
678        guard.stop_running_after_checkpoint = true;
679    }
680
681    /// Wait for the checkout process to finish
682    #[cfg(not(feature = "journal"))]
683    pub fn wait_for_checkpoint(
684        &self,
685    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
686        Box::pin(std::future::pending())
687    }
688
689    /// Wait for the checkout process to finish
690    #[cfg(feature = "journal")]
691    pub fn wait_for_checkpoint(
692        &self,
693    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
694        use futures::Future;
695        use std::{
696            pin::Pin,
697            task::{Context, Poll},
698        };
699
700        struct Poller {
701            inner: LockableWasiProcessInner,
702        }
703        impl Future for Poller {
704            type Output = ();
705            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
706                let mut guard = self.inner.0.lock().unwrap();
707                if !matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) {
708                    return Poll::Ready(());
709                }
710                if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) {
711                    guard.wakers.push(cx.waker().clone());
712                }
713                Poll::Pending
714            }
715        }
716        Box::pin(Poller {
717            inner: self.inner.clone(),
718        })
719    }
720
721    /// Wait for the checkout process to finish
722    #[cfg(not(feature = "journal"))]
723    pub fn wait_for_checkpoint_finish(
724        &self,
725    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
726        Box::pin(std::future::pending())
727    }
728
729    /// Wait for the checkout process to finish
730    #[cfg(feature = "journal")]
731    pub fn wait_for_checkpoint_finish(
732        &self,
733    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
734        use futures::Future;
735        use std::{
736            pin::Pin,
737            task::{Context, Poll},
738        };
739
740        struct Poller {
741            inner: LockableWasiProcessInner,
742        }
743        impl Future for Poller {
744            type Output = ();
745            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
746                let mut guard = self.inner.0.lock().unwrap();
747                if matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) {
748                    return Poll::Ready(());
749                }
750                if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) {
751                    guard.wakers.push(cx.waker().clone());
752                }
753                Poll::Pending
754            }
755        }
756        Box::pin(Poller {
757            inner: self.inner.clone(),
758        })
759    }
760
761    /// Signals one of the threads every interval
762    pub fn signal_interval(&self, signal: Signal, interval: Option<Duration>, repeat: bool) {
763        let mut inner = self.inner.0.lock().unwrap();
764
765        let interval = match interval {
766            None => {
767                inner.signal_intervals.remove(&signal);
768                return;
769            }
770            Some(a) => a,
771        };
772
773        let now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128;
774        inner.signal_intervals.insert(
775            signal,
776            WasiSignalInterval {
777                signal,
778                interval,
779                last_signal: now,
780                repeat,
781            },
782        );
783    }
784
785    /// Returns the number of active threads for this process
786    pub fn active_threads(&self) -> u32 {
787        let inner = self.inner.0.lock().unwrap();
788        inner.thread_count
789    }
790
791    /// Waits until the process is finished.
792    pub async fn join(&self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
793        let _guard = WasiProcessWait::new(self);
794        self.finished.await_termination().await
795    }
796
797    /// Attempts to join on the process
798    pub fn try_join(&self) -> Option<Result<ExitCode, Arc<WasiRuntimeError>>> {
799        self.finished.status().into_finished()
800    }
801
802    /// Waits for all the children to be finished
803    pub async fn join_children(&mut self) -> Option<Result<ExitCode, Arc<WasiRuntimeError>>> {
804        let _guard = WasiProcessWait::new(self);
805        let children: Vec<_> = {
806            let inner = self.inner.0.lock().unwrap();
807            inner.children.clone()
808        };
809        if children.is_empty() {
810            return None;
811        }
812        let mut waits = Vec::new();
813        for child in children {
814            if let Some(process) = self.compute.must_upgrade().get_process(child.pid) {
815                let inner = self.inner.clone();
816                waits.push(async move {
817                    let join = process.join().await;
818                    let mut inner = inner.0.lock().unwrap();
819                    inner.children.retain(|a| a.pid != child.pid);
820                    join
821                })
822            }
823        }
824        futures::future::join_all(waits).await.into_iter().next()
825    }
826
827    /// Waits for any of the children to finished
828    pub async fn join_any_child(&mut self) -> Result<Option<(WasiProcessId, ExitCode)>, Errno> {
829        let _guard = WasiProcessWait::new(self);
830        let children: Vec<_> = {
831            let inner = self.inner.0.lock().unwrap();
832            inner.children.clone()
833        };
834        if children.is_empty() {
835            return Err(Errno::Child);
836        }
837
838        let mut waits = Vec::new();
839        for child in children {
840            if let Some(process) = self.compute.must_upgrade().get_process(child.pid) {
841                let inner = self.inner.clone();
842                waits.push(async move {
843                    let join = process.join().await;
844                    let mut inner = inner.0.lock().unwrap();
845                    inner.children.retain(|a| a.pid != child.pid);
846                    (child, join)
847                })
848            }
849        }
850        let (child, res) = futures::future::select_all(waits.into_iter().map(Box::pin))
851            .await
852            .0;
853
854        let code =
855            res.unwrap_or_else(|e| e.as_exit_code().unwrap_or_else(|| Errno::Canceled.into()));
856
857        Ok(Some((child.pid, code)))
858    }
859
860    /// Terminate the process and all its threads
861    pub fn terminate(&self, exit_code: ExitCode) {
862        let pid = self.pid;
863        tracing::trace!(%pid, %exit_code, "process-terminate");
864        // FIXME: this is wrong, threads might still be running!
865        // Need special logic for the main thread.
866        let guard = self.inner.0.lock().unwrap();
867        for thread in guard.threads.values() {
868            thread.set_status_finished(Ok(exit_code))
869        }
870    }
871}
872
873/// Signals all the threads in this process
874fn signal_process_internal(process: &LockableWasiProcessInner, signal: Signal) {
875    #[allow(unused_mut)]
876    let mut guard = process.0.lock().unwrap();
877    let pid = guard.pid;
878    tracing::trace!(%pid, "signal-process({:?})", signal);
879
880    // If the snapshot on ctrl-c is currently registered then we need
881    // to take a snapshot and exit
882    #[cfg(feature = "journal")]
883    {
884        if signal == Signal::Sigint
885            && (guard.snapshot_on.contains(&SnapshotTrigger::Sigint)
886                || guard.snapshot_on.remove(&SnapshotTrigger::FirstSigint))
887        {
888            drop(guard);
889
890            tracing::debug!(%pid, "snapshot-on-interrupt-signal");
891
892            do_checkpoint_from_outside(
893                process,
894                WasiProcessCheckpoint::Snapshot {
895                    trigger: SnapshotTrigger::Sigint,
896                },
897            );
898            return;
899        };
900    }
901
902    // Check if there are subprocesses that will receive this signal
903    // instead of this process
904    if guard.waiting.load(Ordering::Acquire) > 0 {
905        let mut triggered = false;
906        for child in guard.children.iter() {
907            child.signal_process(signal);
908            triggered = true;
909        }
910        if triggered {
911            return;
912        }
913    }
914
915    // Otherwise just send the signal to all the threads
916    wake_atomic_waiters(&guard, signal);
917    for thread in guard.threads.values() {
918        thread.signal(signal);
919    }
920}
921
922fn wake_atomic_waiters(process: &WasiProcessInner, signal: Signal) {
923    let Some(memory) = &process.memory else {
924        return;
925    };
926
927    if signal == Signal::Sigkill {
928        // On kill, disable atomics to prevent threads from resuming.
929        // NOTE: disable_atomics also wakes all current waiters.
930        if let Err(err) = memory.disable_atomics() {
931            tracing::trace!(
932                pid=%process.pid,
933                error = &err as &dyn std::error::Error,
934                "failed to wake atomic waiters"
935            );
936        }
937    }
938
939    // TODO: Should other signals also wake up waiters?
940    // We have low confidence this is useful outside the kill path.
941    // SEE https://github.com/wasmerio/wasmer/pull/6536
942    //
943    // Atomic wait wakeups are memory-wide, so only use them for signals
944    // that should interrupt or terminate execution anyway.
945    // if matches!(
946    //     signal,
947    //     Signal::Sigkill
948    //         | Signal::Sigterm
949    //         | Signal::Sigabrt
950    //         | Signal::Sigquit
951    //         | Signal::Sigint
952    //         | Signal::Sigstop
953    //         | Signal::Sigpipe
954    //         | Signal::Sigwakeup
955    // ) {
956    //    memory.wake_all_atomic_waiters();
957    // }
958}
959
960impl SignalHandlerAbi for WasiProcess {
961    fn signal(&self, sig: u8) -> Result<(), SignalDeliveryError> {
962        if let Ok(sig) = sig.try_into() {
963            self.signal_process(sig);
964            Ok(())
965        } else {
966            Err(SignalDeliveryError)
967        }
968    }
969}