wasmer_wasix/utils/
thread_local_executor.rs

1use futures::{
2    executor::{LocalPool, LocalSpawner},
3    task::LocalSpawnExt,
4};
5use std::thread::ThreadId;
6use thiserror::Error;
7
8/// A `Send`able spawner that spawns onto a thread-local executor
9///
10/// Despite being `Send`, the spawner enforces at runtime that
11/// it is only used to spawn on the thread it was created on.
12//
13// If that limitation is a problem, we can consider implementing a version that
14// accepts `Send` futures and sends them to the correct thread via channels.
15#[derive(Clone, Debug)]
16pub(crate) struct ThreadLocalSpawner {
17    /// A reference to the local executor's spawner
18    spawner: LocalSpawner,
19    /// The thread this spawner is associated with
20    ///
21    /// Used to generate better error messages when trying to spawn on the wrong thread
22    thread: ThreadId,
23}
24// SAFETY: The ThreadLocalSpawner enforces the spawner is only used on the correct thread.
25// See the safety comment in ThreadLocalSpawner::spawn_local and ThreadLocalSpawner::spawner
26unsafe impl Send for ThreadLocalSpawner {}
27// SAFETY: The ThreadLocalSpawner enforces the spawner is only used on the correct thread.
28// See the safety comment in ThreadLocalSpawner::spawn_local and ThreadLocalSpawner::spawner
29unsafe impl Sync for ThreadLocalSpawner {}
30
31/// Errors that can occur during `spawn_local` calls
32#[derive(Debug, Error)]
33pub enum ThreadLocalSpawnerError {
34    #[error(
35        "The ThreadLocalSpawner can only spawn tasks on the thread it was created on. Expected to be on {expected:?} but was actually on {found:?}"
36    )]
37    NotOnTheCorrectThread { expected: ThreadId, found: ThreadId },
38    #[error(
39        "The local executor associated with this spawner has been shut down and cannot accept new tasks"
40    )]
41    LocalPoolShutDown,
42    #[error("An error occurred while spawning the task")]
43    SpawnError,
44}
45
46impl ThreadLocalSpawner {
47    /// Spawn a future onto the same thread as the local spawner
48    ///
49    /// Needs to be called from the same thread on which the associated executor was created
50    pub(crate) fn spawn_local<F: Future<Output = ()> + 'static>(
51        &self,
52        future: F,
53    ) -> Result<(), ThreadLocalSpawnerError> {
54        // SAFETY: This is what makes implementing Send on ThreadLocalSpawner safe. We ensure that we only spawn
55        // on the same thread as the one the spawner was created on.
56        if std::thread::current().id() != self.thread {
57            return Err(ThreadLocalSpawnerError::NotOnTheCorrectThread {
58                expected: self.thread,
59                found: std::thread::current().id(),
60            });
61        }
62
63        // As we now know that we are on the correct thread, we can use the spawner safely
64        self.spawner
65            .spawn_local(future)
66            .map_err(|e| match e.is_shutdown() {
67                true => ThreadLocalSpawnerError::LocalPoolShutDown,
68                false => ThreadLocalSpawnerError::SpawnError,
69            })
70    }
71}
72
73/// A thread-local executor that can run tasks on the current thread
74pub(crate) struct ThreadLocalExecutor {
75    /// The local pool
76    pool: LocalPool,
77}
78
79impl ThreadLocalExecutor {
80    pub(crate) fn new() -> Self {
81        let local_pool = futures::executor::LocalPool::new();
82        Self { pool: local_pool }
83    }
84
85    pub(crate) fn spawner(&self) -> ThreadLocalSpawner {
86        ThreadLocalSpawner {
87            spawner: self.pool.spawner(),
88            // SAFETY: This will always be the thread where the spawner was created on, as the ThreadLocalExecutor is not Send
89            thread: std::thread::current().id(),
90        }
91    }
92
93    pub(crate) fn run_until<F: Future>(&mut self, future: F) -> F::Output {
94        self.pool.run_until(future)
95    }
96}