wasmer_wasix/os/task/
task_join_handle.rs1use std::{
2 pin::Pin,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use wasmer_wasix_types::wasi::{Errno, ExitCode};
8
9use crate::WasiRuntimeError;
10
11use super::signal::{DynSignalHandlerAbi, default_signal_handler};
12
13#[derive(Clone, Debug)]
14pub enum TaskStatus {
15 Pending,
16 Running,
17 Finished(Result<ExitCode, Arc<WasiRuntimeError>>),
18}
19
20impl TaskStatus {
21 #[must_use]
25 pub fn is_pending(&self) -> bool {
26 matches!(self, Self::Pending)
27 }
28
29 #[must_use]
33 pub fn is_running(&self) -> bool {
34 matches!(self, Self::Running)
35 }
36
37 pub fn into_finished(self) -> Option<Result<ExitCode, Arc<WasiRuntimeError>>> {
38 match self {
39 Self::Finished(res) => Some(res),
40 _ => None,
41 }
42 }
43
44 #[must_use]
48 pub fn is_finished(&self) -> bool {
49 matches!(self, Self::Finished(..))
50 }
51}
52
53#[derive(thiserror::Error, Debug)]
54#[error("Task already terminated")]
55pub struct TaskTerminatedError;
56
57pub trait VirtualTaskHandle: std::fmt::Debug + Send + Sync + 'static {
58 fn status(&self) -> TaskStatus;
59
60 fn poll_ready(
62 self: Pin<&mut Self>,
63 cx: &mut Context<'_>,
64 ) -> Poll<Result<(), TaskTerminatedError>>;
65
66 fn poll_finished(
67 self: Pin<&mut Self>,
68 cx: &mut Context<'_>,
69 ) -> Poll<Result<ExitCode, Arc<WasiRuntimeError>>>;
70}
71
72#[derive(Debug)]
74pub struct OwnedTaskStatus {
75 signal_handler: Arc<DynSignalHandlerAbi>,
77
78 watch_tx: tokio::sync::watch::Sender<TaskStatus>,
79 #[allow(dead_code)]
82 watch_rx: tokio::sync::watch::Receiver<TaskStatus>,
83}
84
85impl OwnedTaskStatus {
86 pub fn new(status: TaskStatus) -> Self {
87 let (tx, rx) = tokio::sync::watch::channel(status);
88 Self {
89 signal_handler: default_signal_handler(),
90 watch_tx: tx,
91 watch_rx: rx,
92 }
93 }
94
95 pub fn set_signal_handler(&mut self, handler: Arc<DynSignalHandlerAbi>) {
97 self.signal_handler = handler;
98 }
99
100 pub fn with_signal_handler(mut self, handler: Arc<DynSignalHandlerAbi>) -> Self {
102 self.set_signal_handler(handler);
103 self
104 }
105
106 pub fn new_finished_with_code(code: ExitCode) -> Self {
107 Self::new(TaskStatus::Finished(Ok(code)))
108 }
109
110 pub fn set_running(&self) {
112 self.watch_tx.send_modify(|value| {
113 if value.is_pending() {
115 *value = TaskStatus::Running;
116 }
117 })
118 }
119
120 pub(crate) fn set_finished(&self, res: Result<ExitCode, Arc<WasiRuntimeError>>) {
122 let inner = match res {
123 Ok(code) => Ok(code),
124 Err(err) => {
125 if let Some(code) = err.as_exit_code() {
126 Ok(code)
127 } else {
128 Err(err)
129 }
130 }
131 };
132 self.watch_tx.send_modify(move |old| {
133 if !old.is_finished() {
134 *old = TaskStatus::Finished(inner);
135 }
136 });
137 }
138
139 pub fn status(&self) -> TaskStatus {
140 self.watch_tx.borrow().clone()
141 }
142
143 pub async fn await_termination(&self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
144 let mut receiver = self.watch_tx.subscribe();
145 loop {
146 let status = receiver.borrow_and_update().clone();
147 match status {
148 TaskStatus::Pending | TaskStatus::Running => {}
149 TaskStatus::Finished(res) => {
150 return res;
151 }
152 }
153 receiver.changed().await.unwrap();
155 }
156 }
157
158 pub async fn await_termination_anyhow(&self) -> anyhow::Result<ExitCode> {
159 Ok(self.await_termination().await?)
160 }
161
162 pub fn handle(&self) -> TaskJoinHandle {
163 TaskJoinHandle {
164 signal_handler: self.signal_handler.clone(),
165 watch: self.watch_tx.subscribe(),
166 }
167 }
168}
169
170impl Default for OwnedTaskStatus {
171 fn default() -> Self {
172 Self::new(TaskStatus::Pending)
173 }
174}
175
176#[derive(Clone, Debug)]
178pub struct TaskJoinHandle {
179 #[allow(unused)]
180 signal_handler: Arc<DynSignalHandlerAbi>,
181 watch: tokio::sync::watch::Receiver<TaskStatus>,
182}
183
184impl TaskJoinHandle {
185 pub fn status(&self) -> TaskStatus {
187 self.watch.borrow().clone()
188 }
189
190 #[cfg(feature = "ctrlc")]
191 pub fn install_ctrlc_handler(&self) {
192 use wasmer::FromToNativeWasmType;
193 use wasmer_wasix_types::wasi::Signal;
194
195 let signal_handler = self.signal_handler.clone();
196
197 tokio::spawn(async move {
198 while tokio::signal::ctrl_c().await.is_ok() {
200 if let Err(err) = signal_handler.signal(Signal::Sigint.to_native() as u8) {
201 tracing::error!("failed to process signal - {}", err);
202 std::process::exit(1);
203 }
204 }
205 });
206 }
207
208 pub async fn wait_finished(&mut self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
210 loop {
211 let status = self.watch.borrow_and_update().clone();
212 match status {
213 TaskStatus::Pending | TaskStatus::Running => {}
214 TaskStatus::Finished(res) => {
215 return res;
216 }
217 }
218 if self.watch.changed().await.is_err() {
219 return Ok(Errno::Noent.into());
220 }
221 }
222 }
223}