wasmer_wasix/utils/thread_local_executor.rs
1use futures::{
2 executor::{LocalPool, LocalSpawner},
3 task::LocalSpawnExt,
4};
5use std::thread::ThreadId;
6use thiserror::Error;
7
8/// A `Send`able spawner that spawns onto a thread-local executor
9///
10/// Despite being `Send`, the spawner enforces at runtime that
11/// it is only used to spawn on the thread it was created on.
12//
13// If that limitation is a problem, we can consider implementing a version that
14// accepts `Send` futures and sends them to the correct thread via channels.
15#[derive(Clone, Debug)]
16pub(crate) struct ThreadLocalSpawner {
17 /// A reference to the local executor's spawner
18 spawner: LocalSpawner,
19 /// The thread this spawner is associated with
20 ///
21 /// Used to generate better error messages when trying to spawn on the wrong thread
22 thread: ThreadId,
23}
24// SAFETY: The ThreadLocalSpawner enforces the spawner is only used on the correct thread.
25// See the safety comment in ThreadLocalSpawner::spawn_local and ThreadLocalSpawner::spawner
26unsafe impl Send for ThreadLocalSpawner {}
27// SAFETY: The ThreadLocalSpawner enforces the spawner is only used on the correct thread.
28// See the safety comment in ThreadLocalSpawner::spawn_local and ThreadLocalSpawner::spawner
29unsafe impl Sync for ThreadLocalSpawner {}
30
31/// Errors that can occur during `spawn_local` calls
32#[derive(Debug, Error)]
33pub enum ThreadLocalSpawnerError {
34 #[error(
35 "The ThreadLocalSpawner can only spawn tasks on the thread it was created on. Expected to be on {expected:?} but was actually on {found:?}"
36 )]
37 NotOnTheCorrectThread { expected: ThreadId, found: ThreadId },
38 #[error(
39 "The local executor associated with this spawner has been shut down and cannot accept new tasks"
40 )]
41 LocalPoolShutDown,
42 #[error("An error occurred while spawning the task")]
43 SpawnError,
44}
45
46impl ThreadLocalSpawner {
47 /// Spawn a future onto the same thread as the local spawner
48 ///
49 /// Needs to be called from the same thread on which the associated executor was created
50 pub(crate) fn spawn_local<F: Future<Output = ()> + 'static>(
51 &self,
52 future: F,
53 ) -> Result<(), ThreadLocalSpawnerError> {
54 // SAFETY: This is what makes implementing Send on ThreadLocalSpawner safe. We ensure that we only spawn
55 // on the same thread as the one the spawner was created on.
56 if std::thread::current().id() != self.thread {
57 return Err(ThreadLocalSpawnerError::NotOnTheCorrectThread {
58 expected: self.thread,
59 found: std::thread::current().id(),
60 });
61 }
62
63 // As we now know that we are on the correct thread, we can use the spawner safely
64 self.spawner
65 .spawn_local(future)
66 .map_err(|e| match e.is_shutdown() {
67 true => ThreadLocalSpawnerError::LocalPoolShutDown,
68 false => ThreadLocalSpawnerError::SpawnError,
69 })
70 }
71}
72
73/// A thread-local executor that can run tasks on the current thread
74pub(crate) struct ThreadLocalExecutor {
75 /// The local pool
76 pool: LocalPool,
77}
78
79impl ThreadLocalExecutor {
80 pub(crate) fn new() -> Self {
81 let local_pool = futures::executor::LocalPool::new();
82 Self { pool: local_pool }
83 }
84
85 pub(crate) fn spawner(&self) -> ThreadLocalSpawner {
86 ThreadLocalSpawner {
87 spawner: self.pool.spawner(),
88 // SAFETY: This will always be the thread where the spawner was created on, as the ThreadLocalExecutor is not Send
89 thread: std::thread::current().id(),
90 }
91 }
92
93 pub(crate) fn run_until<F: Future>(&mut self, future: F) -> F::Output {
94 self.pool.run_until(future)
95 }
96}