wasmer_wasix/runtime/task_manager/
mod.rs

1// TODO: should be behind a different , tokio specific feature flag.
2#[cfg(feature = "sys-thread")]
3pub mod tokio;
4
5use std::ops::Deref;
6use std::task::{Context, Poll};
7use std::{pin::Pin, time::Duration};
8
9use bytes::Bytes;
10use derive_more::Debug;
11use futures::future::BoxFuture;
12use futures::{Future, TryFutureExt};
13use wasmer::{
14    AsStoreMut, AsStoreRef, FunctionEnv, Memory, MemoryType, Module, Store, StoreMut, StoreRef,
15};
16use wasmer_wasix_types::wasi::{Errno, ExitCode};
17
18use crate::syscalls::AsyncifyFuture;
19use crate::{StoreSnapshot, WasiEnv, WasiFunctionEnv, WasiThread, capture_store_snapshot};
20use crate::{os::task::thread::WasiThreadError, state::Linker};
21
22pub use virtual_mio::waker::*;
23
24#[derive(Debug)]
25pub enum SpawnType<'a> {
26    CreateMemory,
27    CreateMemoryOfType(MemoryType),
28    // TODO: is there a way to get rid of the memory reference
29    ShareMemory(Memory, StoreRef<'a>),
30    // TODO: is there a way to get rid of the memory reference
31    // Note: The message sender is triggered once the memory
32    // has been copied, this makes sure its not modified until
33    // its been properly copied
34    CopyMemory(Memory, StoreRef<'a>),
35    #[debug("NewLinkerInstanceGroup(..)")]
36    NewLinkerInstanceGroup(Linker, FunctionEnv<WasiEnv>, StoreMut<'a>),
37}
38
39/// Describes whether a new memory should be created (and, in case, its type) or if it was already
40/// created and the store it belongs to.
41///
42/// # Note
43///
44/// This type is necessary for now because we can't pass a [`wasmer::StoreRef`] between threads, so this
45/// conceptually is a Send-able [`SpawnMemoryTypeOrStore`].
46pub enum SpawnMemoryTypeOrStore {
47    New,
48    Type(wasmer::MemoryType),
49    StoreAndMemory(wasmer::Store, Option<wasmer::Memory>),
50}
51
52pub type WasmResumeTask = dyn FnOnce(WasiFunctionEnv, Store, Bytes) + Send + 'static;
53
54pub type WasmResumeTrigger = dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<Bytes, ExitCode>> + Send + 'static>>
55    + Send
56    + Sync;
57
58/// The properties passed to the task
59#[derive(derive_more::Debug)]
60pub struct TaskWasmRunProperties {
61    pub ctx: WasiFunctionEnv,
62    pub store: Store,
63    /// The result of the asynchronous trigger serialized into bytes using the bincode serializer
64    /// When no trigger is associated with the run operation (i.e. spawning threads) then this will be None.
65    /// (if the trigger returns an ExitCode then the WASM process will be terminated without resuming)
66    pub trigger_result: Option<Result<Bytes, ExitCode>>,
67    /// The instance will be recycled back to this function when the WASM run has finished
68    #[debug(ignore)]
69    pub recycle: Option<Box<TaskWasmRecycle>>,
70}
71
72pub type TaskWasmPreRun = dyn (for<'a> FnOnce(
73        &'a mut WasiFunctionEnv,
74        &'a mut Store,
75    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>)
76    + Send;
77
78/// Callback that will be invoked
79pub type TaskWasmRun = dyn FnOnce(TaskWasmRunProperties) + Send + 'static;
80
81/// Callback that will be invoked
82pub type TaskExecModule = dyn FnOnce(Module) + Send + 'static;
83
84/// The properties passed to the task
85#[derive(Debug)]
86pub struct TaskWasmRecycleProperties {
87    pub env: WasiEnv,
88    pub memory: Memory,
89    pub store: Store,
90}
91
92/// Callback that will be invoked
93pub type TaskWasmRecycle = dyn FnOnce(TaskWasmRecycleProperties) + Send + 'static;
94
95/// Represents a WASM task that will be executed on a dedicated thread
96pub struct TaskWasm<'a> {
97    pub run: Box<TaskWasmRun>,
98    pub recycle: Option<Box<TaskWasmRecycle>>,
99    pub env: WasiEnv,
100    pub module: Module,
101    pub globals: Option<StoreSnapshot>,
102    pub spawn_type: SpawnType<'a>,
103    pub trigger: Option<Box<WasmResumeTrigger>>,
104    pub update_layout: bool,
105    pub call_initialize: bool,
106    pub pre_run: Option<Box<TaskWasmPreRun>>,
107}
108
109impl<'a> TaskWasm<'a> {
110    pub fn new(
111        run: Box<TaskWasmRun>,
112        env: WasiEnv,
113        module: Module,
114        update_layout: bool,
115        call_initialize: bool,
116    ) -> Self {
117        let shared_memory = module.imports().memories().next().map(|a| *a.ty());
118        Self {
119            run,
120            env,
121            module,
122            globals: None,
123            spawn_type: match shared_memory {
124                Some(ty) => SpawnType::CreateMemoryOfType(ty),
125                None => SpawnType::CreateMemory,
126            },
127            trigger: None,
128            update_layout,
129            call_initialize,
130            recycle: None,
131            pre_run: None,
132        }
133    }
134
135    pub fn with_memory(mut self, spawn_type: SpawnType<'a>) -> Self {
136        self.spawn_type = spawn_type;
137        self
138    }
139
140    pub fn with_optional_memory(mut self, spawn_type: Option<SpawnType<'a>>) -> Self {
141        if let Some(spawn_type) = spawn_type {
142            self.spawn_type = spawn_type;
143        }
144        self
145    }
146
147    pub fn with_globals(mut self, snapshot: StoreSnapshot) -> Self {
148        self.globals.replace(snapshot);
149        self
150    }
151
152    pub fn with_trigger(mut self, trigger: Box<WasmResumeTrigger>) -> Self {
153        self.trigger.replace(trigger);
154        self
155    }
156
157    pub fn with_recycle(mut self, recycle: Box<TaskWasmRecycle>) -> Self {
158        self.recycle.replace(recycle);
159        self
160    }
161
162    pub fn with_pre_run(mut self, pre_run: Box<TaskWasmPreRun>) -> Self {
163        self.pre_run.replace(pre_run);
164        self
165    }
166}
167
168/// A task executor backed by a thread pool.
169///
170/// ## Thread Safety
171///
172/// Due to [#4158], it is possible to pass non-thread safe objects across
173/// threads by capturing them in the task passed to
174/// [`VirtualTaskManager::task_shared()`] or
175/// [`VirtualTaskManager::task_dedicated()`].
176///
177/// If your task needs access to a [`wasmer::Module`], [`wasmer::Memory`], or
178/// [`wasmer::Instance`], it should explicitly transfer the objects using
179/// either [`VirtualTaskManager::task_wasm()`] when in syscall context or
180/// [`VirtualTaskManager::spawn_with_module()`] for higher level code.
181///
182/// [#4158]: https://github.com/wasmerio/wasmer/issues/4158
183#[allow(unused_variables)]
184pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
185    /// Build a new Webassembly memory.
186    ///
187    /// May return `None` if the memory can just be auto-constructed.
188    fn build_memory(
189        &self,
190        mut store: &mut StoreMut,
191        spawn_type: &SpawnType,
192    ) -> Result<Option<Memory>, WasiThreadError> {
193        match spawn_type {
194            SpawnType::CreateMemoryOfType(ty) => {
195                let mut ty = *ty;
196                ty.shared = true;
197
198                // Note: If memory is shared, maximum needs to be set in the
199                // browser otherwise creation will fail.
200                let _ = ty.maximum.get_or_insert(wasmer_types::Pages::max_value());
201
202                let mem = Memory::new(&mut store, ty).map_err(|err| {
203                    tracing::error!(
204                        error = &err as &dyn std::error::Error,
205                        memory_type=?ty,
206                        "could not create memory",
207                    );
208                    WasiThreadError::MemoryCreateFailed(err)
209                })?;
210                Ok(Some(mem))
211            }
212            SpawnType::ShareMemory(mem, old_store) => {
213                let mem = mem.share_in_store(&old_store, store).map_err(|err| {
214                    tracing::warn!(
215                        error = &err as &dyn std::error::Error,
216                        "could not clone memory",
217                    );
218                    WasiThreadError::MemoryCreateFailed(err)
219                })?;
220                Ok(Some(mem))
221            }
222            SpawnType::CopyMemory(mem, old_store) => {
223                let mem = mem.copy_to_store(&old_store, store).map_err(|err| {
224                    tracing::warn!(
225                        error = &err as &dyn std::error::Error,
226                        "could not copy memory",
227                    );
228                    WasiThreadError::MemoryCreateFailed(err)
229                })?;
230                Ok(Some(mem))
231            }
232            SpawnType::CreateMemory | SpawnType::NewLinkerInstanceGroup(..) => Ok(None),
233        }
234    }
235
236    /// Pause the current thread of execution.
237    ///
238    /// This is typically invoked whenever a WASM thread goes idle. Besides
239    /// acting as a platform-agnostic [`std::thread::sleep()`], this also gives
240    /// the runtime a chance to do asynchronous work like pumping an event
241    /// loop.
242    fn sleep_now(
243        &self,
244        time: Duration,
245    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
246
247    /// Run an asynchronous operation on the thread pool.
248    ///
249    /// This task must not block execution or it could cause deadlocks.
250    ///
251    /// See the "Thread Safety" documentation on [`VirtualTaskManager`] for
252    /// limitations on what a `task` can and can't contain.
253    fn task_shared(
254        &self,
255        task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
256    ) -> Result<(), WasiThreadError>;
257
258    /// Run a blocking WebAssembly operation on the thread pool.
259    ///
260    /// This is primarily used inside the context of a syscall and allows
261    /// the transfer of things like [`wasmer::Module`] across threads.
262    fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError>;
263
264    /// Run a blocking operation on the thread pool.
265    ///
266    /// It is okay for this task to block execution and any async futures within
267    /// its scope.
268    fn task_dedicated(
269        &self,
270        task: Box<dyn FnOnce() + Send + 'static>,
271    ) -> Result<(), WasiThreadError>;
272
273    /// Returns the amount of parallelism that is possible on this platform.
274    fn thread_parallelism(&self) -> Result<usize, WasiThreadError>;
275
276    /// Schedule a blocking task to run on the threadpool, explicitly
277    /// transferring a [`Module`] to the task.
278    ///
279    /// This should be preferred over [`VirtualTaskManager::task_dedicated()`]
280    /// where possible because [`wasmer::Module`] is actually `!Send` in the
281    /// browser and can only be transferred to background threads via
282    /// an explicit `postMessage()`. See [#4158] for more details.
283    ///
284    /// This is very similar to [`VirtualTaskManager::task_wasm()`], but
285    /// intended for use outside of a syscall context. For example, when you are
286    /// running in the browser and want to run a WebAssembly module in the
287    /// background.
288    ///
289    /// [#4158]: https://github.com/wasmerio/wasmer/issues/4158
290    fn spawn_with_module(
291        &self,
292        module: Module,
293        task: Box<dyn FnOnce(Module) + Send + 'static>,
294    ) -> Result<(), WasiThreadError> {
295        // Note: Ideally, this function and task_wasm() would be superseded by
296        // a more general mechanism for transferring non-thread safe values
297        // to the thread pool.
298        self.task_dedicated(Box::new(move || task(module)))
299    }
300}
301
302impl<D, T> VirtualTaskManager for D
303where
304    D: Deref<Target = T> + std::fmt::Debug + Send + Sync + 'static,
305    T: VirtualTaskManager + ?Sized,
306{
307    fn build_memory(
308        &self,
309        store: &mut StoreMut,
310        spawn_type: &SpawnType,
311    ) -> Result<Option<Memory>, WasiThreadError> {
312        (**self).build_memory(store, spawn_type)
313    }
314
315    fn sleep_now(
316        &self,
317        time: Duration,
318    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
319        (**self).sleep_now(time)
320    }
321
322    fn task_shared(
323        &self,
324        task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
325    ) -> Result<(), WasiThreadError> {
326        (**self).task_shared(task)
327    }
328
329    fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError> {
330        (**self).task_wasm(task)
331    }
332
333    fn task_dedicated(
334        &self,
335        task: Box<dyn FnOnce() + Send + 'static>,
336    ) -> Result<(), WasiThreadError> {
337        (**self).task_dedicated(task)
338    }
339
340    fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
341        (**self).thread_parallelism()
342    }
343
344    fn spawn_with_module(
345        &self,
346        module: Module,
347        task: Box<dyn FnOnce(Module) + Send + 'static>,
348    ) -> Result<(), WasiThreadError> {
349        (**self).spawn_with_module(module, task)
350    }
351}
352
353impl dyn VirtualTaskManager {
354    /// Starts an WebAssembly task will run on a dedicated thread
355    /// pulled from the worker pool that has a stateful thread local variable
356    /// After the poller has succeeded
357    #[doc(hidden)]
358    pub unsafe fn resume_wasm_after_poller(
359        &self,
360        task: Box<WasmResumeTask>,
361        ctx: WasiFunctionEnv,
362        mut store: Store,
363        trigger: Pin<Box<AsyncifyFuture>>,
364    ) -> Result<(), WasiThreadError> {
365        // This poller will process any signals when the main working function is idle
366        struct AsyncifyPollerOwned {
367            thread: WasiThread,
368            trigger: Pin<Box<AsyncifyFuture>>,
369        }
370        impl Future for AsyncifyPollerOwned {
371            type Output = Result<Bytes, ExitCode>;
372            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373                let work = self.trigger.as_mut();
374                Poll::Ready(if let Poll::Ready(res) = work.poll(cx) {
375                    Ok(res)
376                } else if let Some(forced_exit) = self.thread.try_join() {
377                    return Poll::Ready(Err(forced_exit.unwrap_or_else(|err| {
378                        tracing::debug!("exit runtime error - {}", err);
379                        Errno::Child.into()
380                    })));
381                } else {
382                    return Poll::Pending;
383                })
384            }
385        }
386
387        let snapshot = capture_store_snapshot(&mut store.as_store_mut());
388        let env = ctx.data(&store);
389        let env_inner = env.inner();
390        let handles = env_inner
391            .static_module_instance_handles()
392            .ok_or(WasiThreadError::Unsupported)?;
393        let module = handles.module_clone();
394        let memory = handles.memory_clone();
395        let thread = env.thread.clone();
396        let env = env.clone();
397
398        let thread_inner = thread.clone();
399        self.task_wasm(
400            TaskWasm::new(
401                Box::new(move |props| {
402                    let result = props
403                        .trigger_result
404                        .expect("If there is no result then its likely the trigger did not run");
405                    let result = match result {
406                        Ok(r) => r,
407                        Err(exit_code) => {
408                            thread.set_status_finished(Ok(exit_code));
409                            return;
410                        }
411                    };
412                    task(props.ctx, props.store, result)
413                }),
414                env.clone(),
415                module,
416                false,
417                false,
418            )
419            .with_memory(SpawnType::ShareMemory(memory, store.as_store_ref()))
420            .with_globals(snapshot)
421            .with_trigger(Box::new(move || {
422                Box::pin(async move {
423                    let mut poller = AsyncifyPollerOwned {
424                        thread: thread_inner,
425                        trigger,
426                    };
427                    let res = Pin::new(&mut poller).await;
428                    let res = match res {
429                        Ok(res) => res,
430                        Err(exit_code) => {
431                            env.thread.set_status_finished(Ok(exit_code));
432                            return Err(exit_code);
433                        }
434                    };
435
436                    tracing::trace!("deep sleep woken - res.len={}", res.len());
437                    Ok(res)
438                })
439            })),
440        )
441    }
442}
443
444/// Generic utility methods for VirtualTaskManager
445pub trait VirtualTaskManagerExt {
446    /// Runs the work in the background via the task managers shared background
447    /// threads while blocking the current execution until it finishs
448    fn spawn_and_block_on<A>(
449        &self,
450        task: impl Future<Output = A> + Send + 'static,
451    ) -> Result<A, anyhow::Error>
452    where
453        A: Send + 'static;
454
455    fn spawn_await<O, F>(
456        &self,
457        f: F,
458    ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
459    where
460        O: Send + 'static,
461        F: FnOnce() -> O + Send + 'static;
462}
463
464impl<D, T> VirtualTaskManagerExt for D
465where
466    D: Deref<Target = T>,
467    T: VirtualTaskManager + ?Sized,
468{
469    /// Runs the work in the background via the task managers shared background
470    /// threads while blocking the current execution until it finishs
471    fn spawn_and_block_on<A>(
472        &self,
473        task: impl Future<Output = A> + Send + 'static,
474    ) -> Result<A, anyhow::Error>
475    where
476        A: Send + 'static,
477    {
478        let (tx, rx) = ::tokio::sync::oneshot::channel();
479        let work = Box::pin(async move {
480            let ret = task.await;
481            tx.send(ret).ok();
482        });
483        self.task_shared(Box::new(move || work)).unwrap();
484        rx.blocking_recv()
485            .map_err(|_| anyhow::anyhow!("task execution failed - result channel dropped"))
486    }
487
488    fn spawn_await<O, F>(
489        &self,
490        f: F,
491    ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
492    where
493        O: Send + 'static,
494        F: FnOnce() -> O + Send + 'static,
495    {
496        let (sender, receiver) = ::tokio::sync::oneshot::channel();
497
498        self.task_dedicated(Box::new(move || {
499            let result = f();
500            let _ = sender.send(result);
501        }))
502        .unwrap();
503
504        Box::new(receiver.map_err(|e| Box::new(e).into()))
505    }
506}