wasmer_wasix/state/
context_switching.rs

1use crate::{
2    WasiError, WasiFunctionEnv,
3    utils::thread_local_executor::{
4        ThreadLocalExecutor, ThreadLocalSpawner, ThreadLocalSpawnerError,
5    },
6};
7use futures::{
8    TryFutureExt,
9    channel::oneshot::{self, Sender},
10};
11use std::{
12    collections::BTreeMap,
13    mem::forget,
14    sync::{
15        Arc, RwLock, Weak,
16        atomic::{AtomicU64, Ordering},
17    },
18};
19use thiserror::Error;
20use tracing::trace;
21use wasmer::{RuntimeError, Store};
22use wasmer_wasix_types::wasi::ExitCode;
23
24/// The context-switching environment represents all state for WASIX context-switching
25/// on a single host thread.
26#[derive(Debug)]
27pub(crate) struct ContextSwitchingEnvironment {
28    // TODO: We might be able to get rid of this Arc by passing an AsyncFunctionEnvMut to the functions instead
29    //       We need to be super-careful about memory leaks by cyclical references if we do that change.
30    //       The Arc allows us to have weak references to the ContextSwitchingEnvironment indepenend of the WasiEnv.
31    inner: Arc<ContextSwitchingEnvironmentInner>,
32}
33
34#[derive(Debug)]
35struct ContextSwitchingEnvironmentInner {
36    /// List of the unblockers for all suspended contexts
37    unblockers: RwLock<BTreeMap<u64, Sender<Result<(), RuntimeError>>>>,
38    /// The ID of the currently active context
39    current_context_id: AtomicU64,
40    /// The next available context ID
41    next_available_context_id: AtomicU64,
42    /// This spawner can be used to spawn tasks onto the thread-local executor
43    /// associated with this context-switching environment
44    spawner: ThreadLocalSpawner,
45}
46
47/// Errors that can occur during a context switch
48#[derive(Debug, Error)]
49pub enum ContextSwitchError {
50    #[error("Target context to switch to is missing")]
51    SwitchTargetMissing,
52}
53
54const MAIN_CONTEXT_ID: u64 = 0;
55
56/// Contexts will trap with this error as a RuntimeError::user when they are canceled
57///
58/// If encountered in a host function it MUST be propagated to the context's entrypoint.
59/// To make it harder to run into that behaviour by ignoring this error, dropping it
60/// will cause a panic with a message that it was not propagated properly. If you think
61/// you know what you are doing, you can call `defuse` (or just forget it) to avoid
62/// the panic.
63///
64/// When it bubbles up to the start of the entrypoint function of a context, it will be
65/// handled by just letting the context exit silently.
66#[derive(Error, Debug)]
67#[error("Context was canceled")]
68pub struct ContextCanceled(());
69impl ContextCanceled {
70    /// Defuse the ContextCanceled so it does not panic when dropped
71    pub fn defuse(self) {
72        // Consume self without panicking
73        forget(self);
74    }
75}
76impl Drop for ContextCanceled {
77    fn drop(&mut self) {
78        panic!(
79            "A ContextCanceled error was dropped without being propagated to the context's entrypoint. This is likely a bug in a host function, please make sure to propagate ContextCanceled errors properly."
80        );
81    }
82}
83
84/// Contexts will trap with this error as a RuntimeError::user when they entrypoint returns
85///
86/// It is not allowed for context entrypoints to return normally, they must always
87/// either get destroyed while suspended or trap with an error (like ContextCanceled)
88///
89/// This error will be picked up by the main context and cause it to trap as well.
90#[derive(Error, Debug)]
91#[error("The entrypoint of context {0} returned which is not allowed")]
92pub struct ContextEntrypointReturned(u64);
93
94impl ContextSwitchingEnvironment {
95    fn new(spawner: ThreadLocalSpawner) -> Self {
96        Self {
97            inner: Arc::new(ContextSwitchingEnvironmentInner {
98                unblockers: RwLock::new(BTreeMap::new()),
99                current_context_id: AtomicU64::new(MAIN_CONTEXT_ID),
100                next_available_context_id: AtomicU64::new(MAIN_CONTEXT_ID + 1),
101                spawner,
102            }),
103        }
104    }
105
106    /// Run the main context function in a context-switching environment
107    ///
108    /// This call blocks until the entrypoint returns or traps
109    pub(crate) fn run_main_context(
110        ctx: &WasiFunctionEnv,
111        mut store: Store,
112        entrypoint: wasmer::Function,
113        params: Vec<wasmer::Value>,
114    ) -> (Store, Result<Box<[wasmer::Value]>, RuntimeError>) {
115        if !ctx
116            .data(&store)
117            .capabilities
118            .threading
119            .enable_asynchronous_threading
120        {
121            let result = entrypoint.call(&mut store, &params);
122            return (store, result);
123        }
124
125        // If we are already in a context-switching environment, something went wrong
126        if ctx
127            .data_mut(&mut store)
128            .context_switching_environment
129            .is_some()
130        {
131            panic!(
132                "Failed to start a WASIX main context as there was already a context-switching environment present."
133            );
134        }
135
136        // Do a normal call and dont install the context switching env, if the engine does not support async
137        let engine_supports_async = store.engine().supports_async();
138        if !engine_supports_async {
139            let result = entrypoint.call(&mut store, &params);
140            return (store, result);
141        }
142
143        // Create a new executor
144        let mut local_executor = ThreadLocalExecutor::new();
145
146        let this = Self::new(local_executor.spawner());
147
148        // Add the context-switching environment to the WasiEnv
149        let previous = ctx
150            .data_mut(&mut store)
151            .context_switching_environment
152            .replace(this);
153        assert!(previous.is_none()); // Should never be hit because of the check at the top
154
155        // Turn the store into an async store and run the entrypoint
156        let store_async = store.into_async();
157        let result = local_executor.run_until(entrypoint.call_async(&store_async, params));
158
159        // Process if this was terminated by a context entrypoint returning
160        let result = match &result {
161            Err(e) => match e.downcast_ref::<ContextEntrypointReturned>() {
162                Some(ContextEntrypointReturned(id)) => {
163                    // Context entrypoint returned, which is not allowed
164                    // Exit with code 129
165                    tracing::error!("The entrypoint of context {id} returned which is not allowed");
166                    Err(RuntimeError::user(
167                        WasiError::Exit(ExitCode::from(129)).into(),
168                    ))
169                }
170                _ => result,
171            },
172            _ => result,
173        };
174        tracing::trace!("Main context finished execution and returned {result:?}");
175
176        // Drop the executor to ensure all references to the StoreAsync are gone and convert back to a normal store
177        drop(local_executor);
178        let mut store = store_async.into_store().ok().unwrap();
179
180        // Remove the context-switching environment from the WasiEnv
181        let env = ctx.data_mut(&mut store);
182
183        env.context_switching_environment
184            .take()
185            .or_else(|| {
186                env.vfork
187                    .as_mut()
188                    .and_then(|vfork| vfork.env.context_switching_environment.take())
189                    .inspect(|_| {
190                        // Grace for vforks, so they don't bring everything down with them.
191                        // This is still an error.
192                        // The message below is oversimplified there is more nuance to this.
193                        tracing::error!("Exiting a vforked process in any other way than calling `_exit()` is undefined behavior but the current program just did that.");
194                    })
195            })
196            .expect("Failed to remove wasix context-switching environment from WASIX env after main context finished. This means we lost it somehow which should never happen.");
197
198        (store, result)
199    }
200
201    /// Get the ID of the currently active context
202    pub(crate) fn active_context_id(&self) -> u64 {
203        self.inner.current_context_id.load(Ordering::Relaxed)
204    }
205
206    /// Get the id of the main context (0)
207    pub(crate) fn main_context_id(&self) -> u64 {
208        MAIN_CONTEXT_ID
209    }
210
211    pub(crate) fn destroy_context(&self, target_context_id: &u64) -> bool {
212        // For now this only queues the context up for destruction by removing its unblocker.
213        // This will only cause destruction when the context_switch is called.
214        // That could cause memory issues if many contexts are created and destroyed without switching
215        // which could happen in applications that use contexts during setup, but not during main execution.
216        // We don't do immediate destruction because that would make this more complex, as it is essentially
217        // identical to switching to the target context
218        // TODO: Implement immediate destruction if the above becoms an issue
219        self.inner
220            .unblockers
221            .write()
222            .unwrap()
223            .remove(target_context_id)
224            .is_some()
225    }
226
227    /// Unblock the target context and suspend own context
228    ///
229    /// If this function succeeds, you MUST await the returned future
230    pub(crate) fn switch_context(
231        &self,
232        target_context_id: u64,
233    ) -> Result<
234        impl Future<Output = Result<(), RuntimeError>> + Send + Sync + use<> + 'static,
235        ContextSwitchError,
236    > {
237        let (own_unblocker, wait_for_unblock) = oneshot::channel::<Result<(), RuntimeError>>();
238        let wait_for_unblock = wait_for_unblock.map_err(|_| ContextCanceled(()));
239
240        // Lock contexts for this block
241        let mut unblockers = self.inner.unblockers.write().unwrap();
242        let own_context_id = self.active_context_id();
243
244        // Assert that we are unblocked
245        if unblockers.get(&own_context_id).is_some() {
246            // This should never happen, because if we are blocked, we should not be running code at all
247            //
248            // This is a bug in WASIX and should never happen, so we panic here.
249            panic!("There is already a unblock present for the current context {own_context_id}");
250        }
251
252        // Assert that the target is blocked
253        let Some(unblock_target) = unblockers.remove(&target_context_id) else {
254            return Err(ContextSwitchError::SwitchTargetMissing);
255        };
256
257        // Unblock the target
258        // Dont mark ourself as blocked yet, as we first need to know that unblocking succeeded
259        let unblock_result: std::result::Result<(), std::result::Result<(), RuntimeError>> =
260            unblock_target.send(Ok(()));
261        let Ok(_) = unblock_result else {
262            // If there is a unblock function in unblockers, the target context must be awaiting the related future.
263            // One way we can get into this path is, when the target context was already resumed and we somehow managed to keep the unblocker around.
264            // This can't happen as calling the unblocker consumes it.
265            // Another way this could happen is if the future waiting for the unblocker was canceled before we called it.
266            // This should not happen. This would be a bug in WASIX.
267            // Another way this could happen is if the target context never awaited the unblocker future in the first place.
268            // This also would be a bug in WASIX.
269            //
270            // So if we reach this path it is a bug in WASIX and should never happen, so we panic here.
271            panic!(
272                "Context {own_context_id} tried to unblock context {target_context_id} but the unblock target does not seem to exist."
273            );
274        };
275
276        // After we have unblocked the target, we can insert our own unblock function
277        unblockers.insert(own_context_id, own_unblocker);
278        let weak_inner = Arc::downgrade(&self.inner);
279        Ok(async move {
280            let unblock_result = wait_for_unblock.await;
281
282            // Handle if we were canceled instead of being unblocked
283            let result = match unblock_result {
284                Ok(v) => v,
285                Err(canceled) => {
286                    tracing::trace!("Canceled context {own_context_id} while it was suspended");
287
288                    // When our context was canceled return the `ContextCanceled` error.
289                    // It will be handled by the entrypoint wrapper and the context will exit silently.
290                    //
291                    // If we reach this point, we must try to restore our context ID as it will not be read again
292                    return Err(RuntimeError::user(canceled.into()));
293                }
294            };
295
296            // Restore our own context ID
297            let Some(inner) = Weak::upgrade(&weak_inner) else {
298                // The context-switching environment has been dropped, so we can't proceed
299                //
300                // This should only happen during shutdown when the ContextSwitchingEnvironment and thus the list of unblockers
301                // is dropped and the futures continue being polled (because dropping that list would cause all wait_for_unblock
302                // futures to resolve to canceled).
303                // However looking at the implementation in `run_main_context` this should not happen, as we drop the executor
304                // before dropping the environment,
305                //
306                // In a future implementation that allows the executor to outlive the environment, we should handle this case,
307                // most likely by returning a `ContextCanceled` error here as well.
308                // For now this should never happen, so it's a WASIX bug, so we panic here.
309                panic!(
310                    "The switch future for context {own_context_id} was polled after the context-switching environment was dropped, this should not happen"
311                );
312            };
313            inner
314                .current_context_id
315                .store(own_context_id, Ordering::Relaxed);
316            drop(inner);
317
318            result
319        })
320    }
321
322    /// Create a new context and spawn it onto the thread-local executor
323    ///
324    /// The entrypoint function is called when the context is unblocked for the first time
325    ///
326    /// If entrypoint returns, it must be a RuntimeError, as it is not allowed to return normally.
327    /// If the RuntimeError is a [`ContextCanceled`], the context will just exit silently.
328    /// Otherwise, the error will be propagated to the main context.
329    ///
330    /// If the context is cancelled before it is unblocked, the entrypoint will not be called
331    pub(crate) fn create_context<F>(&self, entrypoint: F) -> u64
332    where
333        F: Future<Output = Result<(), RuntimeError>> + 'static,
334    {
335        // Create a new context ID
336        let new_context_id = self
337            .inner
338            .next_available_context_id
339            .fetch_add(1, Ordering::Relaxed);
340
341        let (own_unblocker, wait_for_unblock) = oneshot::channel::<Result<(), RuntimeError>>();
342        let wait_for_unblock = wait_for_unblock.map_err(|_| ContextCanceled(()));
343
344        // Store the unblocker
345
346        let None = self
347            .inner
348            .unblockers
349            .write()
350            .unwrap()
351            .insert(new_context_id, own_unblocker)
352        else {
353            panic!("There already is a context suspended with ID {new_context_id}");
354        };
355
356        // Create the future for the new context
357        let weak_inner = Arc::downgrade(&self.inner);
358        let context_future = async move {
359            // First wait for the unblock signal
360            let prelaunch_result = wait_for_unblock.await;
361
362            // Handle if the context was canceled before it even started
363            match prelaunch_result {
364                Ok(_) => (),
365                Err(canceled) => {
366                    trace!("Context {new_context_id} was successfully destroyed before it started");
367                    // We know what we are doing, so we can prevent the panic on drop
368                    canceled.defuse();
369                    // Context was cancelled before it was started, so we can just let it return.
370                    // This will resolve the original future passed to `spawn_local` with
371                    // `Ok(())` which should make the executor drop it properly
372                    return;
373                }
374            };
375
376            let Some(inner) = Weak::upgrade(&weak_inner) else {
377                // The context-switching environment has been dropped, so we can't proceed.
378                // See the comments on the first Weak::upgrade call in this file for background on when this can happen.
379                //
380                // Note that in case the context was canceled properly, we accept that and allowed it to exit
381                // silently (in the match block above). That could happen if the main context canceled the
382                // this context before exiting itself and the executor outlives the environment.
383                //
384                // However it should not be possible to switch to this context after the main context has exited,
385                // as there can only be one active context at a time and that one (the main context) just exited.
386                // So there can't be another context in that context-switching environment that could switch to this one.
387                panic!(
388                    "Resumed context {new_context_id} after the context-switching environment was dropped. This indicates a bug where multiple contexts are active at the same time which should never happen"
389                );
390            };
391            // Set the current context ID
392            inner
393                .current_context_id
394                .store(new_context_id, Ordering::Relaxed);
395            // Drop inner again so we don't hold a strong ref while running the entrypoint, so it cleans itself up properly
396            drop(inner);
397
398            tracing::trace!("Resumed context {new_context_id} for the first time");
399
400            // Launch the context entrypoint
401            let entrypoint_result = entrypoint.await;
402
403            // If that function returns, we need to resume the main context with an error
404            // Take the underlying error, or create a new error if the context returned a value
405            let entrypoint_result = entrypoint_result.map_or_else(
406                |e| e,
407                |_| RuntimeError::user(ContextEntrypointReturned(new_context_id).into()),
408            );
409
410            // If that function returns something went wrong.
411            // If it's a cancellation, we can just let this context run out.
412            // If it's another error, we resume the main context with the error
413            let error = match entrypoint_result.downcast::<ContextCanceled>() {
414                Ok(canceled) => {
415                    tracing::trace!(
416                        "Destroyed context {new_context_id} successfully after it was canceled"
417                    );
418                    // We know what we are doing, so we can prevent the panic on drop
419                    canceled.defuse();
420                    // Context was cancelled, so we can just let it return.
421                    // This will resolve the original future passed to `spawn_local` with
422                    // `Ok(())` which should make the executor drop it properly
423                    return;
424                }
425                Err(error) => error, // Propagate the runtime error to main
426            };
427
428            tracing::trace!("Context {new_context_id} entrypoint returned with {error:?}");
429
430            // Retrieve the main context
431            let Some(inner) = Weak::upgrade(&weak_inner) else {
432                // The context-switching environment has been dropped, so we can't proceed.
433                // See the comments on the first Weak::upgrade call in this file for background on when this can happen.
434                //
435                // Note that in case the context was canceled properly, we accept that and allowed it to exit
436                // silently (in the match block above). That could happen if the main context canceled the
437                // this context before exiting itself and the executor outlives the environment.
438                //
439                // However it should not be possible to switch to this context after the main context has exited,
440                // as there can only be one active context at a time and that one (the main context) just exited.
441                // So there can't be another context in that context-switching environment that could switch to this one.
442                //
443                // So in conclusion if we reach this point it is a bug in WASIX and should never happen, so we panic here.
444                panic!(
445                    "Context {new_context_id} entrypoint returned after the context-switching environment was dropped. This indicates a bug where multiple contexts are active at the same time which should never happen"
446                );
447            };
448
449            tracing::trace!(
450                "Resuming main context {MAIN_CONTEXT_ID} with error from context {new_context_id}"
451            );
452            let Some(main_context) = inner.unblockers.write().unwrap().remove(&MAIN_CONTEXT_ID)
453            else {
454                // The main context should always be suspended when another context returns or traps with anything but cancellation
455                panic!(
456                    "The main context should always be suspended when another context returns or traps (with anything but a cancellation)."
457                );
458            };
459            drop(inner);
460
461            // Resume the main context with the error
462            main_context
463                .send(Err(error))
464                .expect("Failed to send error to main context, this should not happen");
465        };
466
467        // Queue the future onto the thread-local executor
468        tracing::trace!("Spawning context {new_context_id} onto the thread-local executor");
469        let spawn_result = self.inner.spawner.spawn_local(context_future);
470
471        match spawn_result {
472            Ok(()) => new_context_id,
473            Err(ThreadLocalSpawnerError::LocalPoolShutDown) => {
474                // This case could happen if the executor is being shut down while it is still polling a future (this one).
475                // Which shouldn't be able with a single-threaded executor, as the shutdown would have to
476                // be initiated from within a future running on that executor.
477                // I the current WASIX context switching implemenation should not be able to produce this case,
478                // but maybe it will be possible in future implementations. If someone manages to produce this case,
479                // they should open an issue so we can discuss how to handle this case properly.
480                // If this case is reachable we could return the same error as when no context-switching environment is present,
481                panic!(
482                    "Failed to spawn context {new_context_id} because the local executor has been shut down. Please open an issue and let me know how you produced this error.",
483                );
484            }
485            Err(ThreadLocalSpawnerError::NotOnTheCorrectThread { expected, found }) => {
486                // This should never happen and is a bug in WASIX, so we panic here
487                panic!(
488                    "Failed to create context because the thread local spawner lives on {expected:?} but you are on {found:?}"
489                )
490            }
491            Err(ThreadLocalSpawnerError::SpawnError) => {
492                panic!("Failed to spawn context {new_context_id}, this should not happen");
493            }
494        }
495    }
496}