wasmer/backend/sys/
async_runtime.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    future::Future,
5    marker::PhantomData,
6    pin::Pin,
7    ptr,
8    rc::Rc,
9    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
10};
11
12use corosensei::{Coroutine, CoroutineResult, Yielder};
13
14use super::entities::function::Function as SysFunction;
15use crate::{
16    AsStoreMut, AsStoreRef, ForcedStoreInstallGuard, LocalRwLockWriteGuard, RuntimeError, Store,
17    StoreAsync, StoreContext, StoreInner, StoreMut, StoreRef, Value,
18};
19use wasmer_types::StoreId;
20
21type HostFuture = Pin<Box<dyn Future<Output = Result<Vec<Value>, RuntimeError>> + 'static>>;
22
23pub(crate) fn call_function_async(
24    function: SysFunction,
25    store: StoreAsync,
26    params: Vec<Value>,
27) -> AsyncCallFuture {
28    AsyncCallFuture::new(function, store, params)
29}
30
31struct AsyncYield(HostFuture);
32
33enum AsyncResume {
34    Start,
35    HostFutureReady(Result<Vec<Value>, RuntimeError>),
36}
37
38#[allow(clippy::type_complexity)]
39pub(crate) struct AsyncCallFuture {
40    coroutine: Option<Coroutine<AsyncResume, AsyncYield, Result<Box<[Value]>, RuntimeError>>>,
41    pending_store_install: Option<Pin<Box<dyn Future<Output = ForcedStoreInstallGuard>>>>,
42    pending_future: Option<HostFuture>,
43    next_resume: Option<AsyncResume>,
44    result: Option<Result<Box<[Value]>, RuntimeError>>,
45
46    // Store handle we can use to lock the store down
47    store: StoreAsync,
48}
49
50// We can't use any of the existing AsStoreMut types here, since we keep
51// changing the store context underneath us while the coroutine yields.
52// To work around it, we use this dummy struct, which just grabs the store
53// from the store context. Since we always have a store context installed
54// when resuming the coroutine, this is safe in that it can access the store
55// through the store context. HOWEVER, references returned from this struct
56// CAN NOT BE HELD ACROSS A YIELD POINT. We don't do this anywhere in the
57// `Function::call code.
58struct AsyncCallStoreMut {
59    store_id: StoreId,
60}
61
62impl AsStoreRef for AsyncCallStoreMut {
63    fn as_store_ref(&self) -> StoreRef<'_> {
64        // Safety: This is only used with Function::call, which doesn't store
65        // the returned reference anywhere, including when calling into WASM
66        // code.
67        unsafe {
68            StoreRef {
69                inner: StoreContext::get_current_transient(self.store_id)
70                    .as_ref()
71                    .unwrap(),
72            }
73        }
74    }
75}
76
77impl AsStoreMut for AsyncCallStoreMut {
78    fn as_store_mut(&mut self) -> StoreMut<'_> {
79        // Safety: This is only used with Function::call, which doesn't store
80        // the returned reference anywhere, including when calling into WASM
81        // code.
82        unsafe {
83            StoreMut {
84                inner: StoreContext::get_current_transient(self.store_id)
85                    .as_mut()
86                    .unwrap(),
87            }
88        }
89    }
90
91    fn objects_mut(&mut self) -> &mut crate::StoreObjects {
92        // Safety: This is only used with Function::call, which doesn't store
93        // the returned reference anywhere, including when calling into WASM
94        // code.
95        unsafe {
96            &mut StoreContext::get_current_transient(self.store_id)
97                .as_mut()
98                .unwrap()
99                .objects
100        }
101    }
102}
103
104impl AsyncCallFuture {
105    pub(crate) fn new(function: SysFunction, store: StoreAsync, params: Vec<Value>) -> Self {
106        let store_id = store.id;
107        let coroutine =
108            Coroutine::new(move |yielder: &Yielder<AsyncResume, AsyncYield>, resume| {
109                assert!(matches!(resume, AsyncResume::Start));
110
111                let ctx_state = CoroutineContext::new(yielder);
112                ctx_state.enter();
113                let result = {
114                    let mut store_mut = AsyncCallStoreMut { store_id };
115                    function.call(&mut store_mut, &params)
116                };
117                ctx_state.leave();
118                result
119            });
120
121        Self {
122            coroutine: Some(coroutine),
123            pending_store_install: None,
124            pending_future: None,
125            next_resume: Some(AsyncResume::Start),
126            result: None,
127            store,
128        }
129    }
130}
131
132impl Future for AsyncCallFuture {
133    type Output = Result<Box<[Value]>, RuntimeError>;
134
135    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136        loop {
137            if let Some(future) = self.pending_future.as_mut() {
138                match future.as_mut().poll(cx) {
139                    Poll::Ready(result) => {
140                        self.pending_future = None;
141                        self.next_resume = Some(AsyncResume::HostFutureReady(result));
142                    }
143                    Poll::Pending => return Poll::Pending,
144                }
145            }
146
147            // If we're ready, return early
148            if self.coroutine.is_none() {
149                return Poll::Ready(self.result.take().expect("polled after completion"));
150            }
151
152            // Start a store installation if not in progress already
153            if self.pending_store_install.is_none() {
154                self.pending_store_install = Some(Box::pin(install_store_context(StoreAsync {
155                    id: self.store.id,
156                    inner: self.store.inner.clone(),
157                })));
158            }
159
160            // Acquiring a store lock should be the last step before resuming
161            // the coroutine, to minimize the time we hold the lock.
162            let store_context_guard = match self
163                .pending_store_install
164                .as_mut()
165                .unwrap()
166                .as_mut()
167                .poll(cx)
168            {
169                Poll::Ready(guard) => {
170                    self.pending_store_install = None;
171                    guard
172                }
173                Poll::Pending => return Poll::Pending,
174            };
175
176            let resume_arg = self.next_resume.take().expect("no resume arg available");
177            let coroutine = self.coroutine.as_mut().unwrap();
178            match coroutine.resume(resume_arg) {
179                CoroutineResult::Yield(AsyncYield(fut)) => {
180                    self.pending_future = Some(fut);
181                }
182                CoroutineResult::Return(result) => {
183                    self.coroutine = None;
184                    self.result = Some(result);
185                }
186            }
187
188            // Uninstall the store context to unlock the store after the coroutine
189            // yields or returns.
190            drop(store_context_guard);
191        }
192    }
193}
194
195async fn install_store_context(store: StoreAsync) -> ForcedStoreInstallGuard {
196    match unsafe { crate::StoreContext::try_get_current_async(store.id) } {
197        crate::GetStoreAsyncGuardResult::NotInstalled => {
198            // We always need to acquire a new write lock on the store.
199            let store_guard = store.inner.write().await;
200            unsafe { crate::StoreContext::install_async(store_guard) }
201        }
202        _ => {
203            // If we're already in a store context, it is unsafe to reuse
204            // the existing store ref since it'll also be accessible from
205            // the imported function that tried to poll us, which is a
206            // double mutable borrow.
207            // Note to people who discover this code: this *would* be safe
208            // if we had a separate variation of call_async that just
209            // used the existing coroutine context instead of spawning a
210            // new coroutine. However, the current call_async always spawns
211            // a new coroutine, so we can't allow this; every coroutine
212            // needs to own its write lock on the store to make sure there
213            // are no overlapping mutable borrows. If this is something
214            // you're interested in, feel free to open a GitHub issue outlining
215            // your use-case.
216            panic!(
217                "Function::call_async futures cannot be polled recursively \
218                    from within another imported function. If you need to await \
219                    a recursive call_async, consider spawning the future into \
220                    your async runtime and awaiting the resulting task; \
221                    e.g. tokio::task::spawn(func.call_async(...)).await"
222            );
223        }
224    }
225}
226
227pub enum AsyncRuntimeError {
228    YieldOutsideAsyncContext,
229    RuntimeError(RuntimeError),
230}
231
232pub(crate) fn block_on_host_future<Fut>(future: Fut) -> Result<Vec<Value>, AsyncRuntimeError>
233where
234    Fut: Future<Output = Result<Vec<Value>, RuntimeError>> + 'static,
235{
236    CURRENT_CONTEXT.with(|cell| {
237        match CoroutineContext::get_current() {
238            None => {
239                // If there is no async context or we haven't entered it,
240                // we can still directly run a future that doesn't block
241                // inline.
242                run_immediate(future)
243            }
244            Some(context) => unsafe { context.as_ref().expect("valid context pointer") }
245                .block_on_future(Box::pin(future))
246                .map_err(AsyncRuntimeError::RuntimeError),
247        }
248    })
249}
250
251thread_local! {
252    static CURRENT_CONTEXT: RefCell<Vec<*const CoroutineContext>> = const { RefCell::new(Vec::new()) };
253}
254
255struct CoroutineContext {
256    yielder: *const Yielder<AsyncResume, AsyncYield>,
257}
258
259impl CoroutineContext {
260    fn new(yielder: &Yielder<AsyncResume, AsyncYield>) -> Self {
261        Self {
262            yielder: yielder as *const _,
263        }
264    }
265
266    fn enter(&self) {
267        CURRENT_CONTEXT.with(|cell| {
268            let mut borrow = cell.borrow_mut();
269
270            // Push this coroutine on top of the active stack.
271            borrow.push(self as *const _);
272        })
273    }
274
275    // Note: we don't use a drop-style guard here on purpose; if a panic
276    // happens while a coroutine is running, CURRENT_CONTEXT will be in
277    // an inconsistent state. corosensei will unwind all coroutine stacks
278    // anyway, and if we had a guard that would get dropped and try to
279    // leave its context, it'd panic again at the assert_eq! below.
280    fn leave(&self) {
281        CURRENT_CONTEXT.with(|cell| {
282            let mut borrow = cell.borrow_mut();
283
284            // Pop this coroutine from the active stack.
285            assert_eq!(
286                borrow.pop(),
287                Some(self as *const _),
288                "Active coroutine stack corrupted"
289            );
290        });
291    }
292
293    fn get_current() -> Option<*const Self> {
294        CURRENT_CONTEXT.with(|cell| cell.borrow().last().copied())
295    }
296
297    fn block_on_future(&self, future: HostFuture) -> Result<Vec<Value>, RuntimeError> {
298        // Leave the coroutine context since we're yielding back to the
299        // parent stack, and will be inactive until the future is ready.
300        self.leave();
301
302        let yielder = unsafe { self.yielder.as_ref().expect("yielder pointer valid") };
303        let result = match yielder.suspend(AsyncYield(future)) {
304            AsyncResume::HostFutureReady(result) => result,
305            AsyncResume::Start => unreachable!("coroutine resumed without start"),
306        };
307
308        // Once the future is ready, we restore the current coroutine
309        // context.
310        self.enter();
311
312        result
313    }
314}
315
316fn run_immediate(
317    future: impl Future<Output = Result<Vec<Value>, RuntimeError>> + 'static,
318) -> Result<Vec<Value>, AsyncRuntimeError> {
319    let waker = futures::task::noop_waker();
320    let mut cx = Context::from_waker(&waker);
321    let mut future = Box::pin(future);
322    match future.as_mut().poll(&mut cx) {
323        Poll::Ready(result) => result.map_err(AsyncRuntimeError::RuntimeError),
324        Poll::Pending => Err(AsyncRuntimeError::YieldOutsideAsyncContext),
325    }
326}