wasmer_wasix/os/task/
task_join_handle.rs1use std::{
2    pin::Pin,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use wasmer_wasix_types::wasi::{Errno, ExitCode};
8
9use crate::WasiRuntimeError;
10
11use super::signal::{DynSignalHandlerAbi, default_signal_handler};
12
13#[derive(Clone, Debug)]
14pub enum TaskStatus {
15    Pending,
16    Running,
17    Finished(Result<ExitCode, Arc<WasiRuntimeError>>),
18}
19
20impl TaskStatus {
21    #[must_use]
25    pub fn is_pending(&self) -> bool {
26        matches!(self, Self::Pending)
27    }
28
29    #[must_use]
33    pub fn is_running(&self) -> bool {
34        matches!(self, Self::Running)
35    }
36
37    pub fn into_finished(self) -> Option<Result<ExitCode, Arc<WasiRuntimeError>>> {
38        match self {
39            Self::Finished(res) => Some(res),
40            _ => None,
41        }
42    }
43
44    #[must_use]
48    pub fn is_finished(&self) -> bool {
49        matches!(self, Self::Finished(..))
50    }
51}
52
53#[derive(thiserror::Error, Debug)]
54#[error("Task already terminated")]
55pub struct TaskTerminatedError;
56
57pub trait VirtualTaskHandle: std::fmt::Debug + Send + Sync + 'static {
58    fn status(&self) -> TaskStatus;
59
60    fn poll_ready(
62        self: Pin<&mut Self>,
63        cx: &mut Context<'_>,
64    ) -> Poll<Result<(), TaskTerminatedError>>;
65
66    fn poll_finished(
67        self: Pin<&mut Self>,
68        cx: &mut Context<'_>,
69    ) -> Poll<Result<ExitCode, Arc<WasiRuntimeError>>>;
70}
71
72#[derive(Debug)]
74pub struct OwnedTaskStatus {
75    signal_handler: Arc<DynSignalHandlerAbi>,
77
78    watch_tx: tokio::sync::watch::Sender<TaskStatus>,
79    #[allow(dead_code)]
82    watch_rx: tokio::sync::watch::Receiver<TaskStatus>,
83}
84
85impl OwnedTaskStatus {
86    pub fn new(status: TaskStatus) -> Self {
87        let (tx, rx) = tokio::sync::watch::channel(status);
88        Self {
89            signal_handler: default_signal_handler(),
90            watch_tx: tx,
91            watch_rx: rx,
92        }
93    }
94
95    pub fn set_signal_handler(&mut self, handler: Arc<DynSignalHandlerAbi>) {
97        self.signal_handler = handler;
98    }
99
100    pub fn with_signal_handler(mut self, handler: Arc<DynSignalHandlerAbi>) -> Self {
102        self.set_signal_handler(handler);
103        self
104    }
105
106    pub fn new_finished_with_code(code: ExitCode) -> Self {
107        Self::new(TaskStatus::Finished(Ok(code)))
108    }
109
110    pub fn set_running(&self) {
112        self.watch_tx.send_modify(|value| {
113            if value.is_pending() {
115                *value = TaskStatus::Running;
116            }
117        })
118    }
119
120    pub(crate) fn set_finished(&self, res: Result<ExitCode, Arc<WasiRuntimeError>>) {
122        let inner = match res {
123            Ok(code) => Ok(code),
124            Err(err) => {
125                if let Some(code) = err.as_exit_code() {
126                    Ok(code)
127                } else {
128                    Err(err)
129                }
130            }
131        };
132        self.watch_tx.send_modify(move |old| {
133            if !old.is_finished() {
134                *old = TaskStatus::Finished(inner);
135            }
136        });
137    }
138
139    pub fn status(&self) -> TaskStatus {
140        self.watch_tx.borrow().clone()
141    }
142
143    pub async fn await_termination(&self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
144        let mut receiver = self.watch_tx.subscribe();
145        loop {
146            let status = receiver.borrow_and_update().clone();
147            match status {
148                TaskStatus::Pending | TaskStatus::Running => {}
149                TaskStatus::Finished(res) => {
150                    return res;
151                }
152            }
153            receiver.changed().await.unwrap();
155        }
156    }
157
158    pub async fn await_termination_anyhow(&self) -> anyhow::Result<ExitCode> {
159        Ok(self.await_termination().await?)
160    }
161
162    pub fn handle(&self) -> TaskJoinHandle {
163        TaskJoinHandle {
164            signal_handler: self.signal_handler.clone(),
165            watch: self.watch_tx.subscribe(),
166        }
167    }
168}
169
170impl Default for OwnedTaskStatus {
171    fn default() -> Self {
172        Self::new(TaskStatus::Pending)
173    }
174}
175
176#[derive(Clone, Debug)]
178pub struct TaskJoinHandle {
179    #[allow(unused)]
180    signal_handler: Arc<DynSignalHandlerAbi>,
181    watch: tokio::sync::watch::Receiver<TaskStatus>,
182}
183
184impl TaskJoinHandle {
185    pub fn status(&self) -> TaskStatus {
187        self.watch.borrow().clone()
188    }
189
190    #[cfg(feature = "ctrlc")]
191    pub fn install_ctrlc_handler(&self) {
192        use wasmer::FromToNativeWasmType;
193        use wasmer_wasix_types::wasi::Signal;
194
195        let signal_handler = self.signal_handler.clone();
196
197        tokio::spawn(async move {
198            while tokio::signal::ctrl_c().await.is_ok() {
200                if let Err(err) = signal_handler.signal(Signal::Sigint.to_native() as u8) {
201                    tracing::error!("failed to process signal - {}", err);
202                    std::process::exit(1);
203                }
204            }
205        });
206    }
207
208    pub async fn wait_finished(&mut self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
210        loop {
211            let status = self.watch.borrow_and_update().clone();
212            match status {
213                TaskStatus::Pending | TaskStatus::Running => {}
214                TaskStatus::Finished(res) => {
215                    return res;
216                }
217            }
218            if self.watch.changed().await.is_err() {
219                return Ok(Errno::Noent.into());
220            }
221        }
222    }
223}