wasmer_wasix/runtime/task_manager/
mod.rs

1// TODO: should be behind a different , tokio specific feature flag.
2#[cfg(feature = "sys-thread")]
3pub mod tokio;
4
5use std::ops::Deref;
6use std::task::{Context, Poll};
7use std::{pin::Pin, time::Duration};
8
9use bytes::Bytes;
10use derive_more::Debug;
11use futures::future::BoxFuture;
12use futures::{Future, TryFutureExt};
13use wasmer::{AsStoreMut, Memory, MemoryType, Module, SharedMemory, Store, StoreMut};
14use wasmer_wasix_types::wasi::{Errno, ExitCode};
15
16use crate::os::task::thread::WasiThreadError;
17use crate::{StoreSnapshot, WasiEnv, WasiFunctionEnv, WasiThread, capture_store_snapshot};
18use crate::{state::PreparedInstanceGroupData, syscalls::AsyncifyFuture};
19
20pub use virtual_mio::waker::*;
21
22#[derive(Debug)]
23pub enum SpawnType {
24    CreateMemory,
25    CreateMemoryOfType(MemoryType),
26    AttachMemory(SharedMemory),
27    #[debug("NewLinkerInstanceGroup(..)")]
28    NewLinkerInstanceGroup(PreparedInstanceGroupData),
29}
30
31/// Describes whether a new memory should be created (and, in case, its type) or if it was already
32/// created and the store it belongs to.
33///
34/// # Note
35///
36/// This type is necessary for now because we can't pass a [`wasmer::StoreRef`] between threads, so this
37/// conceptually is a Send-able [`SpawnMemoryTypeOrStore`].
38pub enum SpawnMemoryTypeOrStore {
39    New,
40    Type(wasmer::MemoryType),
41    StoreAndMemory(wasmer::Store, Memory),
42}
43
44pub type WasmResumeTask = dyn FnOnce(WasiFunctionEnv, Store, Bytes) + Send + 'static;
45
46pub type WasmResumeTrigger = dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<Bytes, ExitCode>> + Send + 'static>>
47    + Send
48    + Sync;
49
50/// The properties passed to the task
51#[derive(derive_more::Debug)]
52pub struct TaskWasmRunProperties {
53    pub ctx: WasiFunctionEnv,
54    pub store: Store,
55    /// The result of the asynchronous trigger serialized into bytes using the bincode serializer
56    /// When no trigger is associated with the run operation (i.e. spawning threads) then this will be None.
57    /// (if the trigger returns an ExitCode then the WASM process will be terminated without resuming)
58    pub trigger_result: Option<Result<Bytes, ExitCode>>,
59    /// The instance will be recycled back to this function when the WASM run has finished
60    #[debug(ignore)]
61    pub recycle: Option<Box<TaskWasmRecycle>>,
62}
63
64pub type TaskWasmPreRun = dyn (for<'a> FnOnce(
65        &'a mut WasiFunctionEnv,
66        &'a mut Store,
67    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>)
68    + Send;
69
70/// Callback that will be invoked
71pub type TaskWasmRun = dyn FnOnce(TaskWasmRunProperties) + Send + 'static;
72
73/// Callback that will be invoked
74pub type TaskExecModule = dyn FnOnce(Module) + Send + 'static;
75
76/// The properties passed to the task
77#[derive(Debug)]
78pub struct TaskWasmRecycleProperties {
79    pub env: WasiEnv,
80    pub memory: Memory,
81    pub store: Store,
82}
83
84/// Callback that will be invoked
85pub type TaskWasmRecycle = dyn FnOnce(TaskWasmRecycleProperties) + Send + 'static;
86
87pub struct TaskWasmCallbacks {
88    pub run: Box<TaskWasmRun>,
89    pub recycle: Option<Box<TaskWasmRecycle>>,
90    pub pre_run: Option<Box<TaskWasmPreRun>>,
91    pub trigger: Option<Box<WasmResumeTrigger>>,
92}
93
94/// Represents a WASM task that will be executed on a dedicated thread
95pub struct TaskWasm {
96    pub callbacks: TaskWasmCallbacks,
97    pub env: WasiEnv,
98    pub module: Module,
99    pub globals: Option<StoreSnapshot>,
100    pub spawn_type: SpawnType,
101    pub update_layout: bool,
102    pub call_initialize: bool,
103}
104
105impl TaskWasm {
106    pub fn new(
107        run: Box<TaskWasmRun>,
108        env: WasiEnv,
109        module: Module,
110        update_layout: bool,
111        call_initialize: bool,
112    ) -> Self {
113        let shared_memory = module.imports().memories().next().map(|a| *a.ty());
114        Self {
115            callbacks: TaskWasmCallbacks {
116                run,
117                recycle: None,
118                pre_run: None,
119                trigger: None,
120            },
121            env,
122            module,
123            globals: None,
124            spawn_type: match shared_memory {
125                Some(ty) => SpawnType::CreateMemoryOfType(ty),
126                None => SpawnType::CreateMemory,
127            },
128            update_layout,
129            call_initialize,
130        }
131    }
132
133    pub fn with_memory(mut self, spawn_type: SpawnType) -> Self {
134        self.spawn_type = spawn_type;
135        self
136    }
137
138    pub fn with_optional_memory(mut self, spawn_type: Option<SpawnType>) -> Self {
139        if let Some(spawn_type) = spawn_type {
140            self.spawn_type = spawn_type;
141        }
142        self
143    }
144
145    pub fn with_globals(mut self, snapshot: StoreSnapshot) -> Self {
146        self.globals.replace(snapshot);
147        self
148    }
149
150    pub fn with_trigger(mut self, trigger: Box<WasmResumeTrigger>) -> Self {
151        self.callbacks.trigger.replace(trigger);
152        self
153    }
154
155    pub fn with_recycle(mut self, recycle: Box<TaskWasmRecycle>) -> Self {
156        self.callbacks.recycle.replace(recycle);
157        self
158    }
159
160    pub fn with_pre_run(mut self, pre_run: Box<TaskWasmPreRun>) -> Self {
161        self.callbacks.pre_run.replace(pre_run);
162        self
163    }
164}
165
166/// A task executor backed by a thread pool.
167///
168/// ## Thread Safety
169///
170/// Due to [#4158], it is possible to pass non-thread safe objects across
171/// threads by capturing them in the task passed to
172/// [`VirtualTaskManager::task_shared()`] or
173/// [`VirtualTaskManager::task_dedicated()`].
174///
175/// If your task needs access to a [`wasmer::Module`], [`wasmer::Memory`], or
176/// [`wasmer::Instance`], it should explicitly transfer the objects using
177/// either [`VirtualTaskManager::task_wasm()`] when in syscall context or
178/// [`VirtualTaskManager::spawn_with_module()`] for higher level code.
179///
180/// [#4158]: https://github.com/wasmerio/wasmer/issues/4158
181#[allow(unused_variables)]
182pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
183    /// Build a new Webassembly memory.
184    ///
185    /// May return `None` if the memory can just be auto-constructed.
186    fn build_memory(
187        &self,
188        mut store: &mut StoreMut,
189        spawn_type: SpawnType,
190    ) -> Result<Option<Memory>, WasiThreadError> {
191        match spawn_type {
192            SpawnType::CreateMemoryOfType(mut ty) => {
193                ty.shared = true;
194
195                // Note: If memory is shared, maximum needs to be set in the
196                // browser otherwise creation will fail.
197                let _ = ty.maximum.get_or_insert(wasmer_types::Pages::max_value());
198
199                let mem = Memory::new(&mut store, ty).map_err(|err| {
200                    tracing::error!(
201                        error = &err as &dyn std::error::Error,
202                        memory_type=?ty,
203                        "could not create memory",
204                    );
205                    WasiThreadError::MemoryCreateFailed(err)
206                })?;
207                Ok(Some(mem))
208            }
209            SpawnType::AttachMemory(mem) => Ok(Some(mem.attach(store))),
210            SpawnType::CreateMemory | SpawnType::NewLinkerInstanceGroup(..) => Ok(None),
211        }
212    }
213
214    /// Pause the current thread of execution.
215    ///
216    /// This is typically invoked whenever a WASM thread goes idle. Besides
217    /// acting as a platform-agnostic [`std::thread::sleep()`], this also gives
218    /// the runtime a chance to do asynchronous work like pumping an event
219    /// loop.
220    fn sleep_now(
221        &self,
222        time: Duration,
223    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
224
225    /// Run an asynchronous operation on the thread pool.
226    ///
227    /// This task must not block execution or it could cause deadlocks.
228    ///
229    /// See the "Thread Safety" documentation on [`VirtualTaskManager`] for
230    /// limitations on what a `task` can and can't contain.
231    fn task_shared(
232        &self,
233        task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
234    ) -> Result<(), WasiThreadError>;
235
236    /// Run a blocking WebAssembly operation on the thread pool.
237    ///
238    /// This is primarily used inside the context of a syscall and allows
239    /// the transfer of things like [`wasmer::Module`] across threads.
240    fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError>;
241
242    /// Run a blocking operation on the thread pool.
243    ///
244    /// It is okay for this task to block execution and any async futures within
245    /// its scope.
246    fn task_dedicated(
247        &self,
248        task: Box<dyn FnOnce() + Send + 'static>,
249    ) -> Result<(), WasiThreadError>;
250
251    /// Returns the amount of parallelism that is possible on this platform.
252    fn thread_parallelism(&self) -> Result<usize, WasiThreadError>;
253
254    /// Schedule a blocking task to run on the threadpool, explicitly
255    /// transferring a [`Module`] to the task.
256    ///
257    /// This should be preferred over [`VirtualTaskManager::task_dedicated()`]
258    /// where possible because [`wasmer::Module`] is actually `!Send` in the
259    /// browser and can only be transferred to background threads via
260    /// an explicit `postMessage()`. See [#4158] for more details.
261    ///
262    /// This is very similar to [`VirtualTaskManager::task_wasm()`], but
263    /// intended for use outside of a syscall context. For example, when you are
264    /// running in the browser and want to run a WebAssembly module in the
265    /// background.
266    ///
267    /// [#4158]: https://github.com/wasmerio/wasmer/issues/4158
268    fn spawn_with_module(
269        &self,
270        module: Module,
271        task: Box<dyn FnOnce(Module) + Send + 'static>,
272    ) -> Result<(), WasiThreadError> {
273        // Note: Ideally, this function and task_wasm() would be superseded by
274        // a more general mechanism for transferring non-thread safe values
275        // to the thread pool.
276        self.task_dedicated(Box::new(move || task(module)))
277    }
278}
279
280impl<D, T> VirtualTaskManager for D
281where
282    D: Deref<Target = T> + std::fmt::Debug + Send + Sync + 'static,
283    T: VirtualTaskManager + ?Sized,
284{
285    fn build_memory(
286        &self,
287        store: &mut StoreMut,
288        spawn_type: SpawnType,
289    ) -> Result<Option<Memory>, WasiThreadError> {
290        (**self).build_memory(store, spawn_type)
291    }
292
293    fn sleep_now(
294        &self,
295        time: Duration,
296    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
297        (**self).sleep_now(time)
298    }
299
300    fn task_shared(
301        &self,
302        task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
303    ) -> Result<(), WasiThreadError> {
304        (**self).task_shared(task)
305    }
306
307    fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError> {
308        (**self).task_wasm(task)
309    }
310
311    fn task_dedicated(
312        &self,
313        task: Box<dyn FnOnce() + Send + 'static>,
314    ) -> Result<(), WasiThreadError> {
315        (**self).task_dedicated(task)
316    }
317
318    fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
319        (**self).thread_parallelism()
320    }
321
322    fn spawn_with_module(
323        &self,
324        module: Module,
325        task: Box<dyn FnOnce(Module) + Send + 'static>,
326    ) -> Result<(), WasiThreadError> {
327        (**self).spawn_with_module(module, task)
328    }
329}
330
331impl dyn VirtualTaskManager {
332    /// Starts an WebAssembly task will run on a dedicated thread
333    /// pulled from the worker pool that has a stateful thread local variable
334    /// After the poller has succeeded
335    #[doc(hidden)]
336    pub unsafe fn resume_wasm_after_poller(
337        &self,
338        task: Box<WasmResumeTask>,
339        ctx: WasiFunctionEnv,
340        mut store: Store,
341        trigger: Pin<Box<AsyncifyFuture>>,
342    ) -> Result<(), WasiThreadError> {
343        // This poller will process any signals when the main working function is idle
344        struct AsyncifyPollerOwned {
345            thread: WasiThread,
346            trigger: Pin<Box<AsyncifyFuture>>,
347        }
348        impl Future for AsyncifyPollerOwned {
349            type Output = Result<Bytes, ExitCode>;
350            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351                let work = self.trigger.as_mut();
352                Poll::Ready(if let Poll::Ready(res) = work.poll(cx) {
353                    Ok(res)
354                } else if let Some(forced_exit) = self.thread.try_join() {
355                    return Poll::Ready(Err(forced_exit.unwrap_or_else(|err| {
356                        tracing::debug!("exit runtime error - {}", err);
357                        Errno::Child.into()
358                    })));
359                } else {
360                    return Poll::Pending;
361                })
362            }
363        }
364
365        let snapshot = capture_store_snapshot(&mut store.as_store_mut());
366        let env = ctx.data(&store);
367        let env_inner = env.inner();
368        let handles = env_inner
369            .static_module_instance_handles()
370            .ok_or(WasiThreadError::Unsupported)?;
371        let module = handles.module_clone();
372        let memory = handles.memory_clone();
373        let thread = env.thread.clone();
374        let env = env.clone();
375
376        let thread_inner = thread.clone();
377        self.task_wasm(
378            TaskWasm::new(
379                Box::new(move |props| {
380                    let result = props
381                        .trigger_result
382                        .expect("If there is no result then its likely the trigger did not run");
383                    let result = match result {
384                        Ok(r) => r,
385                        Err(exit_code) => {
386                            thread.set_status_finished(Ok(exit_code));
387                            return;
388                        }
389                    };
390                    task(props.ctx, props.store, result)
391                }),
392                env.clone(),
393                module,
394                false,
395                false,
396            )
397            .with_memory(SpawnType::AttachMemory(
398                memory.as_shared(&store).ok_or_else(|| {
399                    tracing::error!("failed to get shared memory for asyncify poller");
400                    WasiThreadError::Unsupported
401                })?,
402            ))
403            .with_globals(snapshot)
404            .with_trigger(Box::new(move || {
405                Box::pin(async move {
406                    let mut poller = AsyncifyPollerOwned {
407                        thread: thread_inner,
408                        trigger,
409                    };
410                    let res = Pin::new(&mut poller).await;
411                    let res = match res {
412                        Ok(res) => res,
413                        Err(exit_code) => {
414                            env.thread.set_status_finished(Ok(exit_code));
415                            return Err(exit_code);
416                        }
417                    };
418
419                    tracing::trace!("deep sleep woken - res.len={}", res.len());
420                    Ok(res)
421                })
422            })),
423        )
424    }
425}
426
427/// Generic utility methods for VirtualTaskManager
428pub trait VirtualTaskManagerExt {
429    /// Runs the work in the background via the task managers shared background
430    /// threads while blocking the current execution until it finishes
431    fn spawn_and_block_on<A>(
432        &self,
433        task: impl Future<Output = A> + Send + 'static,
434    ) -> Result<A, anyhow::Error>
435    where
436        A: Send + 'static;
437
438    fn spawn_await<O, F>(
439        &self,
440        f: F,
441    ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
442    where
443        O: Send + 'static,
444        F: FnOnce() -> O + Send + 'static;
445}
446
447impl<D, T> VirtualTaskManagerExt for D
448where
449    D: Deref<Target = T>,
450    T: VirtualTaskManager + ?Sized,
451{
452    /// Runs the work in the background via the task managers shared background
453    /// threads while blocking the current execution until it finishes
454    fn spawn_and_block_on<A>(
455        &self,
456        task: impl Future<Output = A> + Send + 'static,
457    ) -> Result<A, anyhow::Error>
458    where
459        A: Send + 'static,
460    {
461        let (tx, rx) = ::tokio::sync::oneshot::channel();
462        let work = Box::pin(async move {
463            let ret = task.await;
464            tx.send(ret).ok();
465        });
466        self.task_shared(Box::new(move || work)).unwrap();
467        rx.blocking_recv()
468            .map_err(|_| anyhow::anyhow!("task execution failed - result channel dropped"))
469    }
470
471    fn spawn_await<O, F>(
472        &self,
473        f: F,
474    ) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
475    where
476        O: Send + 'static,
477        F: FnOnce() -> O + Send + 'static,
478    {
479        let (sender, receiver) = ::tokio::sync::oneshot::channel();
480
481        self.task_dedicated(Box::new(move || {
482            let result = f();
483            let _ = sender.send(result);
484        }))
485        .unwrap();
486
487        Box::new(receiver.map_err(|e| Box::new(e).into()))
488    }
489}