1#[cfg(feature = "sys-thread")]
3pub mod tokio;
4
5use std::ops::Deref;
6use std::task::{Context, Poll};
7use std::{pin::Pin, time::Duration};
8
9use bytes::Bytes;
10use derive_more::Debug;
11use futures::future::BoxFuture;
12use futures::{Future, TryFutureExt};
13use wasmer::{
14 AsStoreMut, AsStoreRef, FunctionEnv, Memory, MemoryType, Module, Store, StoreMut, StoreRef,
15};
16use wasmer_wasix_types::wasi::{Errno, ExitCode};
17
18use crate::syscalls::AsyncifyFuture;
19use crate::{StoreSnapshot, WasiEnv, WasiFunctionEnv, WasiThread, capture_store_snapshot};
20use crate::{os::task::thread::WasiThreadError, state::Linker};
21
22pub use virtual_mio::waker::*;
23
24#[derive(Debug)]
25pub enum SpawnType<'a> {
26 CreateMemory,
27 CreateMemoryOfType(MemoryType),
28 ShareMemory(Memory, StoreRef<'a>),
30 CopyMemory(Memory, StoreRef<'a>),
35 #[debug("NewLinkerInstanceGroup(..)")]
36 NewLinkerInstanceGroup(Linker, FunctionEnv<WasiEnv>, StoreMut<'a>),
37}
38
39pub enum SpawnMemoryTypeOrStore {
47 New,
48 Type(wasmer::MemoryType),
49 StoreAndMemory(wasmer::Store, Option<wasmer::Memory>),
50}
51
52pub type WasmResumeTask = dyn FnOnce(WasiFunctionEnv, Store, Bytes) + Send + 'static;
53
54pub type WasmResumeTrigger = dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<Bytes, ExitCode>> + Send + 'static>>
55 + Send
56 + Sync;
57
58#[derive(derive_more::Debug)]
60pub struct TaskWasmRunProperties {
61 pub ctx: WasiFunctionEnv,
62 pub store: Store,
63 pub trigger_result: Option<Result<Bytes, ExitCode>>,
67 #[debug(ignore)]
69 pub recycle: Option<Box<TaskWasmRecycle>>,
70}
71
72pub type TaskWasmPreRun = dyn (for<'a> FnOnce(
73 &'a mut WasiFunctionEnv,
74 &'a mut Store,
75 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>)
76 + Send;
77
78pub type TaskWasmRun = dyn FnOnce(TaskWasmRunProperties) + Send + 'static;
80
81pub type TaskExecModule = dyn FnOnce(Module) + Send + 'static;
83
84#[derive(Debug)]
86pub struct TaskWasmRecycleProperties {
87 pub env: WasiEnv,
88 pub memory: Memory,
89 pub store: Store,
90}
91
92pub type TaskWasmRecycle = dyn FnOnce(TaskWasmRecycleProperties) + Send + 'static;
94
95pub struct TaskWasm<'a> {
97 pub run: Box<TaskWasmRun>,
98 pub recycle: Option<Box<TaskWasmRecycle>>,
99 pub env: WasiEnv,
100 pub module: Module,
101 pub globals: Option<StoreSnapshot>,
102 pub spawn_type: SpawnType<'a>,
103 pub trigger: Option<Box<WasmResumeTrigger>>,
104 pub update_layout: bool,
105 pub call_initialize: bool,
106 pub pre_run: Option<Box<TaskWasmPreRun>>,
107}
108
109impl<'a> TaskWasm<'a> {
110 pub fn new(
111 run: Box<TaskWasmRun>,
112 env: WasiEnv,
113 module: Module,
114 update_layout: bool,
115 call_initialize: bool,
116 ) -> Self {
117 let shared_memory = module.imports().memories().next().map(|a| *a.ty());
118 Self {
119 run,
120 env,
121 module,
122 globals: None,
123 spawn_type: match shared_memory {
124 Some(ty) => SpawnType::CreateMemoryOfType(ty),
125 None => SpawnType::CreateMemory,
126 },
127 trigger: None,
128 update_layout,
129 call_initialize,
130 recycle: None,
131 pre_run: None,
132 }
133 }
134
135 pub fn with_memory(mut self, spawn_type: SpawnType<'a>) -> Self {
136 self.spawn_type = spawn_type;
137 self
138 }
139
140 pub fn with_optional_memory(mut self, spawn_type: Option<SpawnType<'a>>) -> Self {
141 if let Some(spawn_type) = spawn_type {
142 self.spawn_type = spawn_type;
143 }
144 self
145 }
146
147 pub fn with_globals(mut self, snapshot: StoreSnapshot) -> Self {
148 self.globals.replace(snapshot);
149 self
150 }
151
152 pub fn with_trigger(mut self, trigger: Box<WasmResumeTrigger>) -> Self {
153 self.trigger.replace(trigger);
154 self
155 }
156
157 pub fn with_recycle(mut self, recycle: Box<TaskWasmRecycle>) -> Self {
158 self.recycle.replace(recycle);
159 self
160 }
161
162 pub fn with_pre_run(mut self, pre_run: Box<TaskWasmPreRun>) -> Self {
163 self.pre_run.replace(pre_run);
164 self
165 }
166}
167
168#[allow(unused_variables)]
184pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
185 fn build_memory(
189 &self,
190 mut store: &mut StoreMut,
191 spawn_type: &SpawnType,
192 ) -> Result<Option<Memory>, WasiThreadError> {
193 match spawn_type {
194 SpawnType::CreateMemoryOfType(ty) => {
195 let mut ty = *ty;
196 ty.shared = true;
197
198 let _ = ty.maximum.get_or_insert(wasmer_types::Pages::max_value());
201
202 let mem = Memory::new(&mut store, ty).map_err(|err| {
203 tracing::error!(
204 error = &err as &dyn std::error::Error,
205 memory_type=?ty,
206 "could not create memory",
207 );
208 WasiThreadError::MemoryCreateFailed(err)
209 })?;
210 Ok(Some(mem))
211 }
212 SpawnType::ShareMemory(mem, old_store) => {
213 let mem = mem.share_in_store(&old_store, store).map_err(|err| {
214 tracing::warn!(
215 error = &err as &dyn std::error::Error,
216 "could not clone memory",
217 );
218 WasiThreadError::MemoryCreateFailed(err)
219 })?;
220 Ok(Some(mem))
221 }
222 SpawnType::CopyMemory(mem, old_store) => {
223 let mem = mem.copy_to_store(&old_store, store).map_err(|err| {
224 tracing::warn!(
225 error = &err as &dyn std::error::Error,
226 "could not copy memory",
227 );
228 WasiThreadError::MemoryCreateFailed(err)
229 })?;
230 Ok(Some(mem))
231 }
232 SpawnType::CreateMemory | SpawnType::NewLinkerInstanceGroup(..) => Ok(None),
233 }
234 }
235
236 fn sleep_now(
243 &self,
244 time: Duration,
245 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
246
247 fn task_shared(
254 &self,
255 task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
256 ) -> Result<(), WasiThreadError>;
257
258 fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError>;
263
264 fn task_dedicated(
269 &self,
270 task: Box<dyn FnOnce() + Send + 'static>,
271 ) -> Result<(), WasiThreadError>;
272
273 fn thread_parallelism(&self) -> Result<usize, WasiThreadError>;
275
276 fn spawn_with_module(
291 &self,
292 module: Module,
293 task: Box<dyn FnOnce(Module) + Send + 'static>,
294 ) -> Result<(), WasiThreadError> {
295 self.task_dedicated(Box::new(move || task(module)))
299 }
300}
301
302impl<D, T> VirtualTaskManager for D
303where
304 D: Deref<Target = T> + std::fmt::Debug + Send + Sync + 'static,
305 T: VirtualTaskManager + ?Sized,
306{
307 fn build_memory(
308 &self,
309 store: &mut StoreMut,
310 spawn_type: &SpawnType,
311 ) -> Result<Option<Memory>, WasiThreadError> {
312 (**self).build_memory(store, spawn_type)
313 }
314
315 fn sleep_now(
316 &self,
317 time: Duration,
318 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
319 (**self).sleep_now(time)
320 }
321
322 fn task_shared(
323 &self,
324 task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
325 ) -> Result<(), WasiThreadError> {
326 (**self).task_shared(task)
327 }
328
329 fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError> {
330 (**self).task_wasm(task)
331 }
332
333 fn task_dedicated(
334 &self,
335 task: Box<dyn FnOnce() + Send + 'static>,
336 ) -> Result<(), WasiThreadError> {
337 (**self).task_dedicated(task)
338 }
339
340 fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
341 (**self).thread_parallelism()
342 }
343
344 fn spawn_with_module(
345 &self,
346 module: Module,
347 task: Box<dyn FnOnce(Module) + Send + 'static>,
348 ) -> Result<(), WasiThreadError> {
349 (**self).spawn_with_module(module, task)
350 }
351}
352
353impl dyn VirtualTaskManager {
354 #[doc(hidden)]
358 pub unsafe fn resume_wasm_after_poller(
359 &self,
360 task: Box<WasmResumeTask>,
361 ctx: WasiFunctionEnv,
362 mut store: Store,
363 trigger: Pin<Box<AsyncifyFuture>>,
364 ) -> Result<(), WasiThreadError> {
365 struct AsyncifyPollerOwned {
367 thread: WasiThread,
368 trigger: Pin<Box<AsyncifyFuture>>,
369 }
370 impl Future for AsyncifyPollerOwned {
371 type Output = Result<Bytes, ExitCode>;
372 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373 let work = self.trigger.as_mut();
374 Poll::Ready(if let Poll::Ready(res) = work.poll(cx) {
375 Ok(res)
376 } else if let Some(forced_exit) = self.thread.try_join() {
377 return Poll::Ready(Err(forced_exit.unwrap_or_else(|err| {
378 tracing::debug!("exit runtime error - {}", err);
379 Errno::Child.into()
380 })));
381 } else {
382 return Poll::Pending;
383 })
384 }
385 }
386
387 let snapshot = capture_store_snapshot(&mut store.as_store_mut());
388 let env = ctx.data(&store);
389 let env_inner = env.inner();
390 let handles = env_inner
391 .static_module_instance_handles()
392 .ok_or(WasiThreadError::Unsupported)?;
393 let module = handles.module_clone();
394 let memory = handles.memory_clone();
395 let thread = env.thread.clone();
396 let env = env.clone();
397
398 let thread_inner = thread.clone();
399 self.task_wasm(
400 TaskWasm::new(
401 Box::new(move |props| {
402 let result = props
403 .trigger_result
404 .expect("If there is no result then its likely the trigger did not run");
405 let result = match result {
406 Ok(r) => r,
407 Err(exit_code) => {
408 thread.set_status_finished(Ok(exit_code));
409 return;
410 }
411 };
412 task(props.ctx, props.store, result)
413 }),
414 env.clone(),
415 module,
416 false,
417 false,
418 )
419 .with_memory(SpawnType::ShareMemory(memory, store.as_store_ref()))
420 .with_globals(snapshot)
421 .with_trigger(Box::new(move || {
422 Box::pin(async move {
423 let mut poller = AsyncifyPollerOwned {
424 thread: thread_inner,
425 trigger,
426 };
427 let res = Pin::new(&mut poller).await;
428 let res = match res {
429 Ok(res) => res,
430 Err(exit_code) => {
431 env.thread.set_status_finished(Ok(exit_code));
432 return Err(exit_code);
433 }
434 };
435
436 tracing::trace!("deep sleep woken - res.len={}", res.len());
437 Ok(res)
438 })
439 })),
440 )
441 }
442}
443
444pub trait VirtualTaskManagerExt {
446 fn spawn_and_block_on<A>(
449 &self,
450 task: impl Future<Output = A> + Send + 'static,
451 ) -> Result<A, anyhow::Error>
452 where
453 A: Send + 'static;
454
455 fn spawn_await<O, F>(
456 &self,
457 f: F,
458 ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
459 where
460 O: Send + 'static,
461 F: FnOnce() -> O + Send + 'static;
462}
463
464impl<D, T> VirtualTaskManagerExt for D
465where
466 D: Deref<Target = T>,
467 T: VirtualTaskManager + ?Sized,
468{
469 fn spawn_and_block_on<A>(
472 &self,
473 task: impl Future<Output = A> + Send + 'static,
474 ) -> Result<A, anyhow::Error>
475 where
476 A: Send + 'static,
477 {
478 let (tx, rx) = ::tokio::sync::oneshot::channel();
479 let work = Box::pin(async move {
480 let ret = task.await;
481 tx.send(ret).ok();
482 });
483 self.task_shared(Box::new(move || work)).unwrap();
484 rx.blocking_recv()
485 .map_err(|_| anyhow::anyhow!("task execution failed - result channel dropped"))
486 }
487
488 fn spawn_await<O, F>(
489 &self,
490 f: F,
491 ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
492 where
493 O: Send + 'static,
494 F: FnOnce() -> O + Send + 'static,
495 {
496 let (sender, receiver) = ::tokio::sync::oneshot::channel();
497
498 self.task_dedicated(Box::new(move || {
499 let result = f();
500 let _ = sender.send(result);
501 }))
502 .unwrap();
503
504 Box::new(receiver.map_err(|e| Box::new(e).into()))
505 }
506}