wasmer_wasix/state/context_switching.rs
1use crate::{
2 WasiError, WasiFunctionEnv,
3 utils::thread_local_executor::{
4 ThreadLocalExecutor, ThreadLocalSpawner, ThreadLocalSpawnerError,
5 },
6};
7use futures::{
8 TryFutureExt,
9 channel::oneshot::{self, Sender},
10};
11use std::{
12 collections::BTreeMap,
13 mem::forget,
14 sync::{
15 Arc, RwLock, Weak,
16 atomic::{AtomicU64, Ordering},
17 },
18};
19use thiserror::Error;
20use tracing::trace;
21use wasmer::{RuntimeError, Store};
22use wasmer_wasix_types::wasi::ExitCode;
23
24/// The context-switching environment represents all state for WASIX context-switching
25/// on a single host thread.
26#[derive(Debug)]
27pub(crate) struct ContextSwitchingEnvironment {
28 // TODO: We might be able to get rid of this Arc by passing an AsyncFunctionEnvMut to the functions instead
29 // We need to be super-careful about memory leaks by cyclical references if we do that change.
30 // The Arc allows us to have weak references to the ContextSwitchingEnvironment indepenend of the WasiEnv.
31 inner: Arc<ContextSwitchingEnvironmentInner>,
32}
33
34#[derive(Debug)]
35struct ContextSwitchingEnvironmentInner {
36 /// List of the unblockers for all suspended contexts
37 unblockers: RwLock<BTreeMap<u64, Sender<Result<(), RuntimeError>>>>,
38 /// The ID of the currently active context
39 current_context_id: AtomicU64,
40 /// The next available context ID
41 next_available_context_id: AtomicU64,
42 /// This spawner can be used to spawn tasks onto the thread-local executor
43 /// associated with this context-switching environment
44 spawner: ThreadLocalSpawner,
45}
46
47/// Errors that can occur during a context switch
48#[derive(Debug, Error)]
49pub enum ContextSwitchError {
50 #[error("Target context to switch to is missing")]
51 SwitchTargetMissing,
52}
53
54const MAIN_CONTEXT_ID: u64 = 0;
55
56/// Contexts will trap with this error as a RuntimeError::user when they are canceled
57///
58/// If encountered in a host function it MUST be propagated to the context's entrypoint.
59/// To make it harder to run into that behaviour by ignoring this error, dropping it
60/// will cause a panic with a message that it was not propagated properly. If you think
61/// you know what you are doing, you can call `defuse` (or just forget it) to avoid
62/// the panic.
63///
64/// When it bubbles up to the start of the entrypoint function of a context, it will be
65/// handled by just letting the context exit silently.
66#[derive(Error, Debug)]
67#[error("Context was canceled")]
68pub struct ContextCanceled(());
69impl ContextCanceled {
70 /// Defuse the ContextCanceled so it does not panic when dropped
71 pub fn defuse(self) {
72 // Consume self without panicking
73 forget(self);
74 }
75}
76impl Drop for ContextCanceled {
77 fn drop(&mut self) {
78 panic!(
79 "A ContextCanceled error was dropped without being propagated to the context's entrypoint. This is likely a bug in a host function, please make sure to propagate ContextCanceled errors properly."
80 );
81 }
82}
83
84/// Contexts will trap with this error as a RuntimeError::user when they entrypoint returns
85///
86/// It is not allowed for context entrypoints to return normally, they must always
87/// either get destroyed while suspended or trap with an error (like ContextCanceled)
88///
89/// This error will be picked up by the main context and cause it to trap as well.
90#[derive(Error, Debug)]
91#[error("The entrypoint of context {0} returned which is not allowed")]
92pub struct ContextEntrypointReturned(u64);
93
94impl ContextSwitchingEnvironment {
95 fn new(spawner: ThreadLocalSpawner) -> Self {
96 Self {
97 inner: Arc::new(ContextSwitchingEnvironmentInner {
98 unblockers: RwLock::new(BTreeMap::new()),
99 current_context_id: AtomicU64::new(MAIN_CONTEXT_ID),
100 next_available_context_id: AtomicU64::new(MAIN_CONTEXT_ID + 1),
101 spawner,
102 }),
103 }
104 }
105
106 /// Run the main context function in a context-switching environment
107 ///
108 /// This call blocks until the entrypoint returns or traps
109 pub(crate) fn run_main_context(
110 ctx: &WasiFunctionEnv,
111 mut store: Store,
112 entrypoint: wasmer::Function,
113 params: Vec<wasmer::Value>,
114 ) -> (Store, Result<Box<[wasmer::Value]>, RuntimeError>) {
115 if !ctx
116 .data(&store)
117 .capabilities
118 .threading
119 .enable_asynchronous_threading
120 {
121 let result = entrypoint.call(&mut store, ¶ms);
122 return (store, result);
123 }
124
125 // If we are already in a context-switching environment, something went wrong
126 if ctx
127 .data_mut(&mut store)
128 .context_switching_environment
129 .is_some()
130 {
131 panic!(
132 "Failed to start a WASIX main context as there was already a context-switching environment present."
133 );
134 }
135
136 // Do a normal call and dont install the context switching env, if the engine does not support async
137 let engine_supports_async = store.engine().supports_async();
138 if !engine_supports_async {
139 let result = entrypoint.call(&mut store, ¶ms);
140 return (store, result);
141 }
142
143 // Create a new executor
144 let mut local_executor = ThreadLocalExecutor::new();
145
146 let this = Self::new(local_executor.spawner());
147
148 // Add the context-switching environment to the WasiEnv
149 let previous = ctx
150 .data_mut(&mut store)
151 .context_switching_environment
152 .replace(this);
153 assert!(previous.is_none()); // Should never be hit because of the check at the top
154
155 // Turn the store into an async store and run the entrypoint
156 let store_async = store.into_async();
157 let result = local_executor.run_until(entrypoint.call_async(&store_async, params));
158
159 // Process if this was terminated by a context entrypoint returning
160 let result = match &result {
161 Err(e) => match e.downcast_ref::<ContextEntrypointReturned>() {
162 Some(ContextEntrypointReturned(id)) => {
163 // Context entrypoint returned, which is not allowed
164 // Exit with code 129
165 tracing::error!("The entrypoint of context {id} returned which is not allowed");
166 Err(RuntimeError::user(
167 WasiError::Exit(ExitCode::from(129)).into(),
168 ))
169 }
170 _ => result,
171 },
172 _ => result,
173 };
174 tracing::trace!("Main context finished execution and returned {result:?}");
175
176 // Drop the executor to ensure all references to the StoreAsync are gone and convert back to a normal store
177 drop(local_executor);
178 let mut store = store_async.into_store().ok().unwrap();
179
180 // Remove the context-switching environment from the WasiEnv
181 let env = ctx.data_mut(&mut store);
182
183 env.context_switching_environment
184 .take()
185 .or_else(|| {
186 env.vfork
187 .as_mut()
188 .and_then(|vfork| vfork.env.context_switching_environment.take())
189 .inspect(|_| {
190 // Grace for vforks, so they don't bring everything down with them.
191 // This is still an error.
192 // The message below is oversimplified there is more nuance to this.
193 tracing::error!("Exiting a vforked process in any other way than calling `_exit()` is undefined behavior but the current program just did that.");
194 })
195 })
196 .expect("Failed to remove wasix context-switching environment from WASIX env after main context finished. This means we lost it somehow which should never happen.");
197
198 (store, result)
199 }
200
201 /// Get the ID of the currently active context
202 pub(crate) fn active_context_id(&self) -> u64 {
203 self.inner.current_context_id.load(Ordering::Relaxed)
204 }
205
206 /// Get the id of the main context (0)
207 pub(crate) fn main_context_id(&self) -> u64 {
208 MAIN_CONTEXT_ID
209 }
210
211 pub(crate) fn destroy_context(&self, target_context_id: &u64) -> bool {
212 // For now this only queues the context up for destruction by removing its unblocker.
213 // This will only cause destruction when the context_switch is called.
214 // That could cause memory issues if many contexts are created and destroyed without switching
215 // which could happen in applications that use contexts during setup, but not during main execution.
216 // We don't do immediate destruction because that would make this more complex, as it is essentially
217 // identical to switching to the target context
218 // TODO: Implement immediate destruction if the above becoms an issue
219 self.inner
220 .unblockers
221 .write()
222 .unwrap()
223 .remove(target_context_id)
224 .is_some()
225 }
226
227 /// Unblock the target context and suspend own context
228 ///
229 /// If this function succeeds, you MUST await the returned future
230 pub(crate) fn switch_context(
231 &self,
232 target_context_id: u64,
233 ) -> Result<
234 impl Future<Output = Result<(), RuntimeError>> + Send + Sync + use<> + 'static,
235 ContextSwitchError,
236 > {
237 let (own_unblocker, wait_for_unblock) = oneshot::channel::<Result<(), RuntimeError>>();
238 let wait_for_unblock = wait_for_unblock.map_err(|_| ContextCanceled(()));
239
240 // Lock contexts for this block
241 let mut unblockers = self.inner.unblockers.write().unwrap();
242 let own_context_id = self.active_context_id();
243
244 // Assert that we are unblocked
245 if unblockers.get(&own_context_id).is_some() {
246 // This should never happen, because if we are blocked, we should not be running code at all
247 //
248 // This is a bug in WASIX and should never happen, so we panic here.
249 panic!("There is already a unblock present for the current context {own_context_id}");
250 }
251
252 // Assert that the target is blocked
253 let Some(unblock_target) = unblockers.remove(&target_context_id) else {
254 return Err(ContextSwitchError::SwitchTargetMissing);
255 };
256
257 // Unblock the target
258 // Dont mark ourself as blocked yet, as we first need to know that unblocking succeeded
259 let unblock_result: std::result::Result<(), std::result::Result<(), RuntimeError>> =
260 unblock_target.send(Ok(()));
261 let Ok(_) = unblock_result else {
262 // If there is a unblock function in unblockers, the target context must be awaiting the related future.
263 // One way we can get into this path is, when the target context was already resumed and we somehow managed to keep the unblocker around.
264 // This can't happen as calling the unblocker consumes it.
265 // Another way this could happen is if the future waiting for the unblocker was canceled before we called it.
266 // This should not happen. This would be a bug in WASIX.
267 // Another way this could happen is if the target context never awaited the unblocker future in the first place.
268 // This also would be a bug in WASIX.
269 //
270 // So if we reach this path it is a bug in WASIX and should never happen, so we panic here.
271 panic!(
272 "Context {own_context_id} tried to unblock context {target_context_id} but the unblock target does not seem to exist."
273 );
274 };
275
276 // After we have unblocked the target, we can insert our own unblock function
277 unblockers.insert(own_context_id, own_unblocker);
278 let weak_inner = Arc::downgrade(&self.inner);
279 Ok(async move {
280 let unblock_result = wait_for_unblock.await;
281
282 // Handle if we were canceled instead of being unblocked
283 let result = match unblock_result {
284 Ok(v) => v,
285 Err(canceled) => {
286 tracing::trace!("Canceled context {own_context_id} while it was suspended");
287
288 // When our context was canceled return the `ContextCanceled` error.
289 // It will be handled by the entrypoint wrapper and the context will exit silently.
290 //
291 // If we reach this point, we must try to restore our context ID as it will not be read again
292 return Err(RuntimeError::user(canceled.into()));
293 }
294 };
295
296 // Restore our own context ID
297 let Some(inner) = Weak::upgrade(&weak_inner) else {
298 // The context-switching environment has been dropped, so we can't proceed
299 //
300 // This should only happen during shutdown when the ContextSwitchingEnvironment and thus the list of unblockers
301 // is dropped and the futures continue being polled (because dropping that list would cause all wait_for_unblock
302 // futures to resolve to canceled).
303 // However looking at the implementation in `run_main_context` this should not happen, as we drop the executor
304 // before dropping the environment,
305 //
306 // In a future implementation that allows the executor to outlive the environment, we should handle this case,
307 // most likely by returning a `ContextCanceled` error here as well.
308 // For now this should never happen, so it's a WASIX bug, so we panic here.
309 panic!(
310 "The switch future for context {own_context_id} was polled after the context-switching environment was dropped, this should not happen"
311 );
312 };
313 inner
314 .current_context_id
315 .store(own_context_id, Ordering::Relaxed);
316 drop(inner);
317
318 result
319 })
320 }
321
322 /// Create a new context and spawn it onto the thread-local executor
323 ///
324 /// The entrypoint function is called when the context is unblocked for the first time
325 ///
326 /// If entrypoint returns, it must be a RuntimeError, as it is not allowed to return normally.
327 /// If the RuntimeError is a [`ContextCanceled`], the context will just exit silently.
328 /// Otherwise, the error will be propagated to the main context.
329 ///
330 /// If the context is cancelled before it is unblocked, the entrypoint will not be called
331 pub(crate) fn create_context<F>(&self, entrypoint: F) -> u64
332 where
333 F: Future<Output = Result<(), RuntimeError>> + 'static,
334 {
335 // Create a new context ID
336 let new_context_id = self
337 .inner
338 .next_available_context_id
339 .fetch_add(1, Ordering::Relaxed);
340
341 let (own_unblocker, wait_for_unblock) = oneshot::channel::<Result<(), RuntimeError>>();
342 let wait_for_unblock = wait_for_unblock.map_err(|_| ContextCanceled(()));
343
344 // Store the unblocker
345
346 let None = self
347 .inner
348 .unblockers
349 .write()
350 .unwrap()
351 .insert(new_context_id, own_unblocker)
352 else {
353 panic!("There already is a context suspended with ID {new_context_id}");
354 };
355
356 // Create the future for the new context
357 let weak_inner = Arc::downgrade(&self.inner);
358 let context_future = async move {
359 // First wait for the unblock signal
360 let prelaunch_result = wait_for_unblock.await;
361
362 // Handle if the context was canceled before it even started
363 match prelaunch_result {
364 Ok(_) => (),
365 Err(canceled) => {
366 trace!("Context {new_context_id} was successfully destroyed before it started");
367 // We know what we are doing, so we can prevent the panic on drop
368 canceled.defuse();
369 // Context was cancelled before it was started, so we can just let it return.
370 // This will resolve the original future passed to `spawn_local` with
371 // `Ok(())` which should make the executor drop it properly
372 return;
373 }
374 };
375
376 let Some(inner) = Weak::upgrade(&weak_inner) else {
377 // The context-switching environment has been dropped, so we can't proceed.
378 // See the comments on the first Weak::upgrade call in this file for background on when this can happen.
379 //
380 // Note that in case the context was canceled properly, we accept that and allowed it to exit
381 // silently (in the match block above). That could happen if the main context canceled the
382 // this context before exiting itself and the executor outlives the environment.
383 //
384 // However it should not be possible to switch to this context after the main context has exited,
385 // as there can only be one active context at a time and that one (the main context) just exited.
386 // So there can't be another context in that context-switching environment that could switch to this one.
387 panic!(
388 "Resumed context {new_context_id} after the context-switching environment was dropped. This indicates a bug where multiple contexts are active at the same time which should never happen"
389 );
390 };
391 // Set the current context ID
392 inner
393 .current_context_id
394 .store(new_context_id, Ordering::Relaxed);
395 // Drop inner again so we don't hold a strong ref while running the entrypoint, so it cleans itself up properly
396 drop(inner);
397
398 tracing::trace!("Resumed context {new_context_id} for the first time");
399
400 // Launch the context entrypoint
401 let entrypoint_result = entrypoint.await;
402
403 // If that function returns, we need to resume the main context with an error
404 // Take the underlying error, or create a new error if the context returned a value
405 let entrypoint_result = entrypoint_result.map_or_else(
406 |e| e,
407 |_| RuntimeError::user(ContextEntrypointReturned(new_context_id).into()),
408 );
409
410 // If that function returns something went wrong.
411 // If it's a cancellation, we can just let this context run out.
412 // If it's another error, we resume the main context with the error
413 let error = match entrypoint_result.downcast::<ContextCanceled>() {
414 Ok(canceled) => {
415 tracing::trace!(
416 "Destroyed context {new_context_id} successfully after it was canceled"
417 );
418 // We know what we are doing, so we can prevent the panic on drop
419 canceled.defuse();
420 // Context was cancelled, so we can just let it return.
421 // This will resolve the original future passed to `spawn_local` with
422 // `Ok(())` which should make the executor drop it properly
423 return;
424 }
425 Err(error) => error, // Propagate the runtime error to main
426 };
427
428 tracing::trace!("Context {new_context_id} entrypoint returned with {error:?}");
429
430 // Retrieve the main context
431 let Some(inner) = Weak::upgrade(&weak_inner) else {
432 // The context-switching environment has been dropped, so we can't proceed.
433 // See the comments on the first Weak::upgrade call in this file for background on when this can happen.
434 //
435 // Note that in case the context was canceled properly, we accept that and allowed it to exit
436 // silently (in the match block above). That could happen if the main context canceled the
437 // this context before exiting itself and the executor outlives the environment.
438 //
439 // However it should not be possible to switch to this context after the main context has exited,
440 // as there can only be one active context at a time and that one (the main context) just exited.
441 // So there can't be another context in that context-switching environment that could switch to this one.
442 //
443 // So in conclusion if we reach this point it is a bug in WASIX and should never happen, so we panic here.
444 panic!(
445 "Context {new_context_id} entrypoint returned after the context-switching environment was dropped. This indicates a bug where multiple contexts are active at the same time which should never happen"
446 );
447 };
448
449 tracing::trace!(
450 "Resuming main context {MAIN_CONTEXT_ID} with error from context {new_context_id}"
451 );
452 let Some(main_context) = inner.unblockers.write().unwrap().remove(&MAIN_CONTEXT_ID)
453 else {
454 // The main context should always be suspended when another context returns or traps with anything but cancellation
455 panic!(
456 "The main context should always be suspended when another context returns or traps (with anything but a cancellation)."
457 );
458 };
459 drop(inner);
460
461 // Resume the main context with the error
462 main_context
463 .send(Err(error))
464 .expect("Failed to send error to main context, this should not happen");
465 };
466
467 // Queue the future onto the thread-local executor
468 tracing::trace!("Spawning context {new_context_id} onto the thread-local executor");
469 let spawn_result = self.inner.spawner.spawn_local(context_future);
470
471 match spawn_result {
472 Ok(()) => new_context_id,
473 Err(ThreadLocalSpawnerError::LocalPoolShutDown) => {
474 // This case could happen if the executor is being shut down while it is still polling a future (this one).
475 // Which shouldn't be able with a single-threaded executor, as the shutdown would have to
476 // be initiated from within a future running on that executor.
477 // I the current WASIX context switching implemenation should not be able to produce this case,
478 // but maybe it will be possible in future implementations. If someone manages to produce this case,
479 // they should open an issue so we can discuss how to handle this case properly.
480 // If this case is reachable we could return the same error as when no context-switching environment is present,
481 panic!(
482 "Failed to spawn context {new_context_id} because the local executor has been shut down. Please open an issue and let me know how you produced this error.",
483 );
484 }
485 Err(ThreadLocalSpawnerError::NotOnTheCorrectThread { expected, found }) => {
486 // This should never happen and is a bug in WASIX, so we panic here
487 panic!(
488 "Failed to create context because the thread local spawner lives on {expected:?} but you are on {found:?}"
489 )
490 }
491 Err(ThreadLocalSpawnerError::SpawnError) => {
492 panic!("Failed to spawn context {new_context_id}, this should not happen");
493 }
494 }
495 }
496}