wasmer_wasix/state/
context_switching.rs

1use crate::{
2    WasiError, WasiFunctionEnv,
3    utils::thread_local_executor::{
4        ThreadLocalExecutor, ThreadLocalSpawner, ThreadLocalSpawnerError,
5    },
6};
7use futures::{
8    TryFutureExt,
9    channel::oneshot::{self, Sender},
10};
11use std::{
12    collections::BTreeMap,
13    mem::forget,
14    sync::{
15        Arc, RwLock, Weak,
16        atomic::{AtomicU64, Ordering},
17    },
18};
19use thiserror::Error;
20use tracing::trace;
21use wasmer::{RuntimeError, Store};
22use wasmer_wasix_types::wasi::ExitCode;
23
24/// The context-switching environment represents all state for WASIX context-switching
25/// on a single host thread.
26#[derive(Debug)]
27pub(crate) struct ContextSwitchingEnvironment {
28    // TODO: We might be able to get rid of this Arc by passing an AsyncFunctionEnvMut to the functions instead
29    //       We need to be super-careful about memory leaks by cyclical references if we do that change.
30    //       The Arc allows us to have weak references to the ContextSwitchingEnvironment indepenend of the WasiEnv.
31    inner: Arc<ContextSwitchingEnvironmentInner>,
32}
33
34#[derive(Debug)]
35struct ContextSwitchingEnvironmentInner {
36    /// List of the unblockers for all suspended contexts
37    unblockers: RwLock<BTreeMap<u64, Sender<Result<(), RuntimeError>>>>,
38    /// The ID of the currently active context
39    current_context_id: AtomicU64,
40    /// The next available context ID
41    next_available_context_id: AtomicU64,
42    /// This spawner can be used to spawn tasks onto the thread-local executor
43    /// associated with this context-switching environment
44    spawner: ThreadLocalSpawner,
45}
46
47/// Errors that can occur during a context switch
48#[derive(Debug, Error)]
49pub enum ContextSwitchError {
50    #[error("Target context to switch to is missing")]
51    SwitchTargetMissing,
52}
53
54const MAIN_CONTEXT_ID: u64 = 0;
55
56/// Contexts will trap with this error as a RuntimeError::user when they are canceled
57///
58/// If encountered in a host function it MUST be propagated to the context's entrypoint.
59/// To make it harder to run into that behaviour by ignoring this error, dropping it
60/// will cause a panic with a message that it was not propagated properly. If you think
61/// you know what you are doing, you can call `defuse` (or just forget it) to avoid
62/// the panic.
63///
64/// When it bubbles up to the start of the entrypoint function of a context, it will be
65/// handled by just letting the context exit silently.
66#[derive(Error, Debug)]
67#[error("Context was canceled")]
68pub struct ContextCanceled(());
69impl ContextCanceled {
70    /// Defuse the ContextCanceled so it does not panic when dropped
71    pub fn defuse(self) {
72        // Consume self without panicking
73        forget(self);
74    }
75}
76impl Drop for ContextCanceled {
77    fn drop(&mut self) {
78        panic!(
79            "A ContextCanceled error was dropped without being propagated to the context's entrypoint. This is likely a bug in a host function, please make sure to propagate ContextCanceled errors properly."
80        );
81    }
82}
83
84/// Contexts will trap with this error as a RuntimeError::user when they entrypoint returns
85///
86/// It is not allowed for context entrypoints to return normally, they must always
87/// either get destroyed while suspended or trap with an error (like ContextCanceled)
88///
89/// This error will be picked up by the main context and cause it to trap as well.
90#[derive(Error, Debug)]
91#[error("The entrypoint of context {0} returned which is not allowed")]
92pub struct ContextEntrypointReturned(u64);
93
94impl ContextSwitchingEnvironment {
95    fn new(spawner: ThreadLocalSpawner) -> Self {
96        Self {
97            inner: Arc::new(ContextSwitchingEnvironmentInner {
98                unblockers: RwLock::new(BTreeMap::new()),
99                current_context_id: AtomicU64::new(MAIN_CONTEXT_ID),
100                next_available_context_id: AtomicU64::new(MAIN_CONTEXT_ID + 1),
101                spawner,
102            }),
103        }
104    }
105
106    /// Run the main context function in a context-switching environment
107    ///
108    /// This call blocks until the entrypoint returns or traps
109    pub(crate) fn run_main_context(
110        ctx: &WasiFunctionEnv,
111        mut store: Store,
112        entrypoint: wasmer::Function,
113        params: Vec<wasmer::Value>,
114    ) -> (Store, Result<Box<[wasmer::Value]>, RuntimeError>) {
115        // If we are already in a context-switching environment, something went wrong
116        if ctx
117            .data_mut(&mut store)
118            .context_switching_environment
119            .is_some()
120        {
121            panic!(
122                "Failed to start a WASIX main context as there was already a context-switching environment present."
123            );
124        }
125
126        // Do a normal call and dont install the context switching env, if the engine does not support async
127        let engine_supports_async = store.engine().supports_async();
128        if !engine_supports_async {
129            let result = entrypoint.call(&mut store, &params);
130            return (store, result);
131        }
132
133        // Create a new executor
134        let mut local_executor = ThreadLocalExecutor::new();
135
136        let this = Self::new(local_executor.spawner());
137
138        // Add the context-switching environment to the WasiEnv
139        let previous = ctx
140            .data_mut(&mut store)
141            .context_switching_environment
142            .replace(this);
143        assert!(previous.is_none()); // Should never be hit because of the check at the top
144
145        // Turn the store into an async store and run the entrypoint
146        let store_async = store.into_async();
147        let result = local_executor.run_until(entrypoint.call_async(&store_async, params));
148
149        // Process if this was terminated by a context entrypoint returning
150        let result = match &result {
151            Err(e) => match e.downcast_ref::<ContextEntrypointReturned>() {
152                Some(ContextEntrypointReturned(id)) => {
153                    // Context entrypoint returned, which is not allowed
154                    // Exit with code 129
155                    tracing::error!("The entrypoint of context {id} returned which is not allowed");
156                    Err(RuntimeError::user(
157                        WasiError::Exit(ExitCode::from(129)).into(),
158                    ))
159                }
160                _ => result,
161            },
162            _ => result,
163        };
164        tracing::trace!("Main context finished execution and returned {result:?}");
165
166        // Drop the executor to ensure all references to the StoreAsync are gone and convert back to a normal store
167        drop(local_executor);
168        let mut store = store_async.into_store().ok().unwrap();
169
170        // Remove the context-switching environment from the WasiEnv
171        let env = ctx.data_mut(&mut store);
172
173        env.context_switching_environment
174            .take()
175            .or_else(|| {
176                env.vfork
177                    .as_mut()
178                    .and_then(|vfork| vfork.env.context_switching_environment.take())
179                    .inspect(|_| {
180                        // Grace for vforks, so they don't bring everything down with them.
181                        // This is still an error.
182                        // The message below is oversimplified there is more nuance to this.
183                        tracing::error!("Exiting a vforked process in any other way than calling `_exit()` is undefined behavior but the current program just did that.");
184                    })
185            })
186            .expect("Failed to remove wasix context-switching environment from WASIX env after main context finished. This means we lost it somehow which should never happen.");
187
188        (store, result)
189    }
190
191    /// Get the ID of the currently active context
192    pub(crate) fn active_context_id(&self) -> u64 {
193        self.inner.current_context_id.load(Ordering::Relaxed)
194    }
195
196    /// Get the id of the main context (0)
197    pub(crate) fn main_context_id(&self) -> u64 {
198        MAIN_CONTEXT_ID
199    }
200
201    pub(crate) fn destroy_context(&self, target_context_id: &u64) -> bool {
202        // For now this only queues the context up for destruction by removing its unblocker.
203        // This will only cause destruction when the context_switch is called.
204        // That could cause memory issues if many contexts are created and destroyed without switching
205        // which could happen in applications that use contexts during setup, but not during main execution.
206        // We don't do immediate destruction because that would make this more complex, as it is essentially
207        // identical to switching to the target context
208        // TODO: Implement immediate destruction if the above becoms an issue
209        self.inner
210            .unblockers
211            .write()
212            .unwrap()
213            .remove(target_context_id)
214            .is_some()
215    }
216
217    /// Unblock the target context and suspend own context
218    ///
219    /// If this function succeeds, you MUST await the returned future
220    pub(crate) fn switch_context(
221        &self,
222        target_context_id: u64,
223    ) -> Result<
224        impl Future<Output = Result<(), RuntimeError>> + Send + Sync + use<> + 'static,
225        ContextSwitchError,
226    > {
227        let (own_unblocker, wait_for_unblock) = oneshot::channel::<Result<(), RuntimeError>>();
228        let wait_for_unblock = wait_for_unblock.map_err(|_| ContextCanceled(()));
229
230        // Lock contexts for this block
231        let mut unblockers = self.inner.unblockers.write().unwrap();
232        let own_context_id = self.active_context_id();
233
234        // Assert that we are unblocked
235        if unblockers.get(&own_context_id).is_some() {
236            // This should never happen, because if we are blocked, we should not be running code at all
237            //
238            // This is a bug in WASIX and should never happen, so we panic here.
239            panic!("There is already a unblock present for the current context {own_context_id}");
240        }
241
242        // Assert that the target is blocked
243        let Some(unblock_target) = unblockers.remove(&target_context_id) else {
244            return Err(ContextSwitchError::SwitchTargetMissing);
245        };
246
247        // Unblock the target
248        // Dont mark ourself as blocked yet, as we first need to know that unblocking succeeded
249        let unblock_result: std::result::Result<(), std::result::Result<(), RuntimeError>> =
250            unblock_target.send(Ok(()));
251        let Ok(_) = unblock_result else {
252            // If there is a unblock function in unblockers, the target context must be awaiting the related future.
253            // One way we can get into this path is, when the target context was already resumed and we somehow managed to keep the unblocker around.
254            // This can't happen as calling the unblocker consumes it.
255            // Another way this could happen is if the future waiting for the unblocker was canceled before we called it.
256            // This should not happen. This would be a bug in WASIX.
257            // Another way this could happen is if the target context never awaited the unblocker future in the first place.
258            // This also would be a bug in WASIX.
259            //
260            // So if we reach this path it is a bug in WASIX and should never happen, so we panic here.
261            panic!(
262                "Context {own_context_id} tried to unblock context {target_context_id} but the unblock target does not seem to exist."
263            );
264        };
265
266        // After we have unblocked the target, we can insert our own unblock function
267        unblockers.insert(own_context_id, own_unblocker);
268        let weak_inner = Arc::downgrade(&self.inner);
269        Ok(async move {
270            let unblock_result = wait_for_unblock.await;
271
272            // Handle if we were canceled instead of being unblocked
273            let result = match unblock_result {
274                Ok(v) => v,
275                Err(canceled) => {
276                    tracing::trace!("Canceled context {own_context_id} while it was suspended");
277
278                    // When our context was canceled return the `ContextCanceled` error.
279                    // It will be handled by the entrypoint wrapper and the context will exit silently.
280                    //
281                    // If we reach this point, we must try to restore our context ID as it will not be read again
282                    return Err(RuntimeError::user(canceled.into()));
283                }
284            };
285
286            // Restore our own context ID
287            let Some(inner) = Weak::upgrade(&weak_inner) else {
288                // The context-switching environment has been dropped, so we can't proceed
289                //
290                // This should only happen during shutdown when the ContextSwitchingEnvironment and thus the list of unblockers
291                // is dropped and the futures continue being polled (because dropping that list would cause all wait_for_unblock
292                // futures to resolve to canceled).
293                // However looking at the implementation in `run_main_context` this should not happen, as we drop the executor
294                // before dropping the environment,
295                //
296                // In a future implementation that allows the executor to outlive the environment, we should handle this case,
297                // most likely by returning a `ContextCanceled` error here as well.
298                // For now this should never happen, so it's a WASIX bug, so we panic here.
299                panic!(
300                    "The switch future for context {own_context_id} was polled after the context-switching environment was dropped, this should not happen"
301                );
302            };
303            inner
304                .current_context_id
305                .store(own_context_id, Ordering::Relaxed);
306            drop(inner);
307
308            result
309        })
310    }
311
312    /// Create a new context and spawn it onto the thread-local executor
313    ///
314    /// The entrypoint function is called when the context is unblocked for the first time
315    ///
316    /// If entrypoint returns, it must be a RuntimeError, as it is not allowed to return normally.
317    /// If the RuntimeError is a [`ContextCanceled`], the context will just exit silently.
318    /// Otherwise, the error will be propagated to the main context.
319    ///
320    /// If the context is cancelled before it is unblocked, the entrypoint will not be called
321    pub(crate) fn create_context<F>(&self, entrypoint: F) -> u64
322    where
323        F: Future<Output = Result<(), RuntimeError>> + 'static,
324    {
325        // Create a new context ID
326        let new_context_id = self
327            .inner
328            .next_available_context_id
329            .fetch_add(1, Ordering::Relaxed);
330
331        let (own_unblocker, wait_for_unblock) = oneshot::channel::<Result<(), RuntimeError>>();
332        let wait_for_unblock = wait_for_unblock.map_err(|_| ContextCanceled(()));
333
334        // Store the unblocker
335
336        let None = self
337            .inner
338            .unblockers
339            .write()
340            .unwrap()
341            .insert(new_context_id, own_unblocker)
342        else {
343            panic!("There already is a context suspended with ID {new_context_id}");
344        };
345
346        // Create the future for the new context
347        let weak_inner = Arc::downgrade(&self.inner);
348        let context_future = async move {
349            // First wait for the unblock signal
350            let prelaunch_result = wait_for_unblock.await;
351
352            // Handle if the context was canceled before it even started
353            match prelaunch_result {
354                Ok(_) => (),
355                Err(canceled) => {
356                    trace!("Context {new_context_id} was successfully destroyed before it started");
357                    // We know what we are doing, so we can prevent the panic on drop
358                    canceled.defuse();
359                    // Context was cancelled before it was started, so we can just let it return.
360                    // This will resolve the original future passed to `spawn_local` with
361                    // `Ok(())` which should make the executor drop it properly
362                    return;
363                }
364            };
365
366            let Some(inner) = Weak::upgrade(&weak_inner) else {
367                // The context-switching environment has been dropped, so we can't proceed.
368                // See the comments on the first Weak::upgrade call in this file for background on when this can happen.
369                //
370                // Note that in case the context was canceled properly, we accept that and allowed it to exit
371                // silently (in the match block above). That could happen if the main context canceled the
372                // this context before exiting itself and the executor outlives the environment.
373                //
374                // However it should not be possible to switch to this context after the main context has exited,
375                // as there can only be one active context at a time and that one (the main context) just exited.
376                // So there can't be another context in that context-switching environment that could switch to this one.
377                panic!(
378                    "Resumed context {new_context_id} after the context-switching environment was dropped. This indicates a bug where multiple contexts are active at the same time which should never happen"
379                );
380            };
381            // Set the current context ID
382            inner
383                .current_context_id
384                .store(new_context_id, Ordering::Relaxed);
385            // Drop inner again so we don't hold a strong ref while running the entrypoint, so it cleans itself up properly
386            drop(inner);
387
388            tracing::trace!("Resumed context {new_context_id} for the first time");
389
390            // Launch the context entrypoint
391            let entrypoint_result = entrypoint.await;
392
393            // If that function returns, we need to resume the main context with an error
394            // Take the underlying error, or create a new error if the context returned a value
395            let entrypoint_result = entrypoint_result.map_or_else(
396                |e| e,
397                |_| RuntimeError::user(ContextEntrypointReturned(new_context_id).into()),
398            );
399
400            // If that function returns something went wrong.
401            // If it's a cancellation, we can just let this context run out.
402            // If it's another error, we resume the main context with the error
403            let error = match entrypoint_result.downcast::<ContextCanceled>() {
404                Ok(canceled) => {
405                    tracing::trace!(
406                        "Destroyed context {new_context_id} successfully after it was canceled"
407                    );
408                    // We know what we are doing, so we can prevent the panic on drop
409                    canceled.defuse();
410                    // Context was cancelled, so we can just let it return.
411                    // This will resolve the original future passed to `spawn_local` with
412                    // `Ok(())` which should make the executor drop it properly
413                    return;
414                }
415                Err(error) => error, // Propagate the runtime error to main
416            };
417
418            tracing::trace!("Context {new_context_id} entrypoint returned with {error:?}");
419
420            // Retrieve the main context
421            let Some(inner) = Weak::upgrade(&weak_inner) else {
422                // The context-switching environment has been dropped, so we can't proceed.
423                // See the comments on the first Weak::upgrade call in this file for background on when this can happen.
424                //
425                // Note that in case the context was canceled properly, we accept that and allowed it to exit
426                // silently (in the match block above). That could happen if the main context canceled the
427                // this context before exiting itself and the executor outlives the environment.
428                //
429                // However it should not be possible to switch to this context after the main context has exited,
430                // as there can only be one active context at a time and that one (the main context) just exited.
431                // So there can't be another context in that context-switching environment that could switch to this one.
432                //
433                // So in conclusion if we reach this point it is a bug in WASIX and should never happen, so we panic here.
434                panic!(
435                    "Context {new_context_id} entrypoint returned after the context-switching environment was dropped. This indicates a bug where multiple contexts are active at the same time which should never happen"
436                );
437            };
438
439            tracing::trace!(
440                "Resuming main context {MAIN_CONTEXT_ID} with error from context {new_context_id}"
441            );
442            let Some(main_context) = inner.unblockers.write().unwrap().remove(&MAIN_CONTEXT_ID)
443            else {
444                // The main context should always be suspended when another context returns or traps with anything but cancellation
445                panic!(
446                    "The main context should always be suspended when another context returns or traps (with anything but a cancellation)."
447                );
448            };
449            drop(inner);
450
451            // Resume the main context with the error
452            main_context
453                .send(Err(error))
454                .expect("Failed to send error to main context, this should not happen");
455        };
456
457        // Queue the future onto the thread-local executor
458        tracing::trace!("Spawning context {new_context_id} onto the thread-local executor");
459        let spawn_result = self.inner.spawner.spawn_local(context_future);
460
461        match spawn_result {
462            Ok(()) => new_context_id,
463            Err(ThreadLocalSpawnerError::LocalPoolShutDown) => {
464                // This case could happen if the executor is being shut down while it is still polling a future (this one).
465                // Which shouldn't be able with a single-threaded executor, as the shutdown would have to
466                // be initiated from within a future running on that executor.
467                // I the current WASIX context switching implemenation should not be able to produce this case,
468                // but maybe it will be possible in future implementations. If someone manages to produce this case,
469                // they should open an issue so we can discuss how to handle this case properly.
470                // If this case is reachable we could return the same error as when no context-switching environment is present,
471                panic!(
472                    "Failed to spawn context {new_context_id} because the local executor has been shut down. Please open an issue and let me know how you produced this error.",
473                );
474            }
475            Err(ThreadLocalSpawnerError::NotOnTheCorrectThread { expected, found }) => {
476                // This should never happen and is a bug in WASIX, so we panic here
477                panic!(
478                    "Failed to create context because the thread local spawner lives on {expected:?} but you are on {found:?}"
479                )
480            }
481            Err(ThreadLocalSpawnerError::SpawnError) => {
482                panic!("Failed to spawn context {new_context_id}, this should not happen");
483            }
484        }
485    }
486}