1#[cfg(feature = "sys-thread")]
3pub mod tokio;
4
5use std::ops::Deref;
6use std::task::{Context, Poll};
7use std::{pin::Pin, time::Duration};
8
9use bytes::Bytes;
10use derive_more::Debug;
11use futures::future::BoxFuture;
12use futures::{Future, TryFutureExt};
13use wasmer::{AsStoreMut, Memory, MemoryType, Module, SharedMemory, Store, StoreMut};
14use wasmer_wasix_types::wasi::{Errno, ExitCode};
15
16use crate::os::task::thread::WasiThreadError;
17use crate::{StoreSnapshot, WasiEnv, WasiFunctionEnv, WasiThread, capture_store_snapshot};
18use crate::{state::PreparedInstanceGroupData, syscalls::AsyncifyFuture};
19
20pub use virtual_mio::waker::*;
21
22#[derive(Debug)]
23pub enum SpawnType {
24 CreateMemory,
25 CreateMemoryOfType(MemoryType),
26 AttachMemory(SharedMemory),
27 #[debug("NewLinkerInstanceGroup(..)")]
28 NewLinkerInstanceGroup(PreparedInstanceGroupData),
29}
30
31pub enum SpawnMemoryTypeOrStore {
39 New,
40 Type(wasmer::MemoryType),
41 StoreAndMemory(wasmer::Store, Memory),
42}
43
44pub type WasmResumeTask = dyn FnOnce(WasiFunctionEnv, Store, Bytes) + Send + 'static;
45
46pub type WasmResumeTrigger = dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<Bytes, ExitCode>> + Send + 'static>>
47 + Send
48 + Sync;
49
50#[derive(derive_more::Debug)]
52pub struct TaskWasmRunProperties {
53 pub ctx: WasiFunctionEnv,
54 pub store: Store,
55 pub trigger_result: Option<Result<Bytes, ExitCode>>,
59 #[debug(ignore)]
61 pub recycle: Option<Box<TaskWasmRecycle>>,
62}
63
64pub type TaskWasmPreRun = dyn (for<'a> FnOnce(
65 &'a mut WasiFunctionEnv,
66 &'a mut Store,
67 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>)
68 + Send;
69
70pub type TaskWasmRun = dyn FnOnce(TaskWasmRunProperties) + Send + 'static;
72
73pub type TaskExecModule = dyn FnOnce(Module) + Send + 'static;
75
76#[derive(Debug)]
78pub struct TaskWasmRecycleProperties {
79 pub env: WasiEnv,
80 pub memory: Memory,
81 pub store: Store,
82}
83
84pub type TaskWasmRecycle = dyn FnOnce(TaskWasmRecycleProperties) + Send + 'static;
86
87pub struct TaskWasmCallbacks {
88 pub run: Box<TaskWasmRun>,
89 pub recycle: Option<Box<TaskWasmRecycle>>,
90 pub pre_run: Option<Box<TaskWasmPreRun>>,
91 pub trigger: Option<Box<WasmResumeTrigger>>,
92}
93
94pub struct TaskWasm {
96 pub callbacks: TaskWasmCallbacks,
97 pub env: WasiEnv,
98 pub module: Module,
99 pub globals: Option<StoreSnapshot>,
100 pub spawn_type: SpawnType,
101 pub update_layout: bool,
102 pub call_initialize: bool,
103}
104
105impl TaskWasm {
106 pub fn new(
107 run: Box<TaskWasmRun>,
108 env: WasiEnv,
109 module: Module,
110 update_layout: bool,
111 call_initialize: bool,
112 ) -> Self {
113 let shared_memory = module.imports().memories().next().map(|a| *a.ty());
114 Self {
115 callbacks: TaskWasmCallbacks {
116 run,
117 recycle: None,
118 pre_run: None,
119 trigger: None,
120 },
121 env,
122 module,
123 globals: None,
124 spawn_type: match shared_memory {
125 Some(ty) => SpawnType::CreateMemoryOfType(ty),
126 None => SpawnType::CreateMemory,
127 },
128 update_layout,
129 call_initialize,
130 }
131 }
132
133 pub fn with_memory(mut self, spawn_type: SpawnType) -> Self {
134 self.spawn_type = spawn_type;
135 self
136 }
137
138 pub fn with_optional_memory(mut self, spawn_type: Option<SpawnType>) -> Self {
139 if let Some(spawn_type) = spawn_type {
140 self.spawn_type = spawn_type;
141 }
142 self
143 }
144
145 pub fn with_globals(mut self, snapshot: StoreSnapshot) -> Self {
146 self.globals.replace(snapshot);
147 self
148 }
149
150 pub fn with_trigger(mut self, trigger: Box<WasmResumeTrigger>) -> Self {
151 self.callbacks.trigger.replace(trigger);
152 self
153 }
154
155 pub fn with_recycle(mut self, recycle: Box<TaskWasmRecycle>) -> Self {
156 self.callbacks.recycle.replace(recycle);
157 self
158 }
159
160 pub fn with_pre_run(mut self, pre_run: Box<TaskWasmPreRun>) -> Self {
161 self.callbacks.pre_run.replace(pre_run);
162 self
163 }
164}
165
166#[allow(unused_variables)]
182pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
183 fn build_memory(
187 &self,
188 mut store: &mut StoreMut,
189 spawn_type: SpawnType,
190 ) -> Result<Option<Memory>, WasiThreadError> {
191 match spawn_type {
192 SpawnType::CreateMemoryOfType(mut ty) => {
193 ty.shared = true;
194
195 let _ = ty.maximum.get_or_insert(wasmer_types::Pages::max_value());
198
199 let mem = Memory::new(&mut store, ty).map_err(|err| {
200 tracing::error!(
201 error = &err as &dyn std::error::Error,
202 memory_type=?ty,
203 "could not create memory",
204 );
205 WasiThreadError::MemoryCreateFailed(err)
206 })?;
207 Ok(Some(mem))
208 }
209 SpawnType::AttachMemory(mem) => Ok(Some(mem.attach(store))),
210 SpawnType::CreateMemory | SpawnType::NewLinkerInstanceGroup(..) => Ok(None),
211 }
212 }
213
214 fn sleep_now(
221 &self,
222 time: Duration,
223 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
224
225 fn task_shared(
232 &self,
233 task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
234 ) -> Result<(), WasiThreadError>;
235
236 fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError>;
241
242 fn task_dedicated(
247 &self,
248 task: Box<dyn FnOnce() + Send + 'static>,
249 ) -> Result<(), WasiThreadError>;
250
251 fn thread_parallelism(&self) -> Result<usize, WasiThreadError>;
253
254 fn spawn_with_module(
269 &self,
270 module: Module,
271 task: Box<dyn FnOnce(Module) + Send + 'static>,
272 ) -> Result<(), WasiThreadError> {
273 self.task_dedicated(Box::new(move || task(module)))
277 }
278}
279
280impl<D, T> VirtualTaskManager for D
281where
282 D: Deref<Target = T> + std::fmt::Debug + Send + Sync + 'static,
283 T: VirtualTaskManager + ?Sized,
284{
285 fn build_memory(
286 &self,
287 store: &mut StoreMut,
288 spawn_type: SpawnType,
289 ) -> Result<Option<Memory>, WasiThreadError> {
290 (**self).build_memory(store, spawn_type)
291 }
292
293 fn sleep_now(
294 &self,
295 time: Duration,
296 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
297 (**self).sleep_now(time)
298 }
299
300 fn task_shared(
301 &self,
302 task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
303 ) -> Result<(), WasiThreadError> {
304 (**self).task_shared(task)
305 }
306
307 fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError> {
308 (**self).task_wasm(task)
309 }
310
311 fn task_dedicated(
312 &self,
313 task: Box<dyn FnOnce() + Send + 'static>,
314 ) -> Result<(), WasiThreadError> {
315 (**self).task_dedicated(task)
316 }
317
318 fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
319 (**self).thread_parallelism()
320 }
321
322 fn spawn_with_module(
323 &self,
324 module: Module,
325 task: Box<dyn FnOnce(Module) + Send + 'static>,
326 ) -> Result<(), WasiThreadError> {
327 (**self).spawn_with_module(module, task)
328 }
329}
330
331impl dyn VirtualTaskManager {
332 #[doc(hidden)]
336 pub unsafe fn resume_wasm_after_poller(
337 &self,
338 task: Box<WasmResumeTask>,
339 ctx: WasiFunctionEnv,
340 mut store: Store,
341 trigger: Pin<Box<AsyncifyFuture>>,
342 ) -> Result<(), WasiThreadError> {
343 struct AsyncifyPollerOwned {
345 thread: WasiThread,
346 trigger: Pin<Box<AsyncifyFuture>>,
347 }
348 impl Future for AsyncifyPollerOwned {
349 type Output = Result<Bytes, ExitCode>;
350 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351 let work = self.trigger.as_mut();
352 Poll::Ready(if let Poll::Ready(res) = work.poll(cx) {
353 Ok(res)
354 } else if let Some(forced_exit) = self.thread.try_join() {
355 return Poll::Ready(Err(forced_exit.unwrap_or_else(|err| {
356 tracing::debug!("exit runtime error - {}", err);
357 Errno::Child.into()
358 })));
359 } else {
360 return Poll::Pending;
361 })
362 }
363 }
364
365 let snapshot = capture_store_snapshot(&mut store.as_store_mut());
366 let env = ctx.data(&store);
367 let env_inner = env.inner();
368 let handles = env_inner
369 .static_module_instance_handles()
370 .ok_or(WasiThreadError::Unsupported)?;
371 let module = handles.module_clone();
372 let memory = handles.memory_clone();
373 let thread = env.thread.clone();
374 let env = env.clone();
375
376 let thread_inner = thread.clone();
377 self.task_wasm(
378 TaskWasm::new(
379 Box::new(move |props| {
380 let result = props
381 .trigger_result
382 .expect("If there is no result then its likely the trigger did not run");
383 let result = match result {
384 Ok(r) => r,
385 Err(exit_code) => {
386 thread.set_status_finished(Ok(exit_code));
387 return;
388 }
389 };
390 task(props.ctx, props.store, result)
391 }),
392 env.clone(),
393 module,
394 false,
395 false,
396 )
397 .with_memory(SpawnType::AttachMemory(
398 memory.as_shared(&store).ok_or_else(|| {
399 tracing::error!("failed to get shared memory for asyncify poller");
400 WasiThreadError::Unsupported
401 })?,
402 ))
403 .with_globals(snapshot)
404 .with_trigger(Box::new(move || {
405 Box::pin(async move {
406 let mut poller = AsyncifyPollerOwned {
407 thread: thread_inner,
408 trigger,
409 };
410 let res = Pin::new(&mut poller).await;
411 let res = match res {
412 Ok(res) => res,
413 Err(exit_code) => {
414 env.thread.set_status_finished(Ok(exit_code));
415 return Err(exit_code);
416 }
417 };
418
419 tracing::trace!("deep sleep woken - res.len={}", res.len());
420 Ok(res)
421 })
422 })),
423 )
424 }
425}
426
427pub trait VirtualTaskManagerExt {
429 fn spawn_and_block_on<A>(
432 &self,
433 task: impl Future<Output = A> + Send + 'static,
434 ) -> Result<A, anyhow::Error>
435 where
436 A: Send + 'static;
437
438 fn spawn_await<O, F>(
439 &self,
440 f: F,
441 ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
442 where
443 O: Send + 'static,
444 F: FnOnce() -> O + Send + 'static;
445}
446
447impl<D, T> VirtualTaskManagerExt for D
448where
449 D: Deref<Target = T>,
450 T: VirtualTaskManager + ?Sized,
451{
452 fn spawn_and_block_on<A>(
455 &self,
456 task: impl Future<Output = A> + Send + 'static,
457 ) -> Result<A, anyhow::Error>
458 where
459 A: Send + 'static,
460 {
461 let (tx, rx) = ::tokio::sync::oneshot::channel();
462 let work = Box::pin(async move {
463 let ret = task.await;
464 tx.send(ret).ok();
465 });
466 self.task_shared(Box::new(move || work)).unwrap();
467 rx.blocking_recv()
468 .map_err(|_| anyhow::anyhow!("task execution failed - result channel dropped"))
469 }
470
471 fn spawn_await<O, F>(
472 &self,
473 f: F,
474 ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
475 where
476 O: Send + 'static,
477 F: FnOnce() -> O + Send + 'static,
478 {
479 let (sender, receiver) = ::tokio::sync::oneshot::channel();
480
481 self.task_dedicated(Box::new(move || {
482 let result = f();
483 let _ = sender.send(result);
484 }))
485 .unwrap();
486
487 Box::new(receiver.map_err(|e| Box::new(e).into()))
488 }
489}