1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 future::Future,
5 marker::PhantomData,
6 pin::Pin,
7 ptr,
8 rc::Rc,
9 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
10};
11
12use corosensei::{Coroutine, CoroutineResult, Yielder};
13
14use super::entities::function::Function as SysFunction;
15use crate::{
16 AsStoreMut, AsStoreRef, ForcedStoreInstallGuard, LocalRwLockWriteGuard, RuntimeError, Store,
17 StoreAsync, StoreContext, StoreInner, StoreMut, StoreRef, Value,
18};
19use wasmer_types::StoreId;
20
21type HostFuture = Pin<Box<dyn Future<Output = Result<Vec<Value>, RuntimeError>> + 'static>>;
22
23pub(crate) fn call_function_async(
24 function: SysFunction,
25 store: StoreAsync,
26 params: Vec<Value>,
27) -> AsyncCallFuture {
28 AsyncCallFuture::new(function, store, params)
29}
30
31struct AsyncYield(HostFuture);
32
33enum AsyncResume {
34 Start,
35 HostFutureReady(Result<Vec<Value>, RuntimeError>),
36}
37
38#[allow(clippy::type_complexity)]
39pub(crate) struct AsyncCallFuture {
40 coroutine: Option<Coroutine<AsyncResume, AsyncYield, Result<Box<[Value]>, RuntimeError>>>,
41 pending_store_install: Option<Pin<Box<dyn Future<Output = ForcedStoreInstallGuard>>>>,
42 pending_future: Option<HostFuture>,
43 next_resume: Option<AsyncResume>,
44 result: Option<Result<Box<[Value]>, RuntimeError>>,
45
46 store: StoreAsync,
48}
49
50struct AsyncCallStoreMut {
59 store_id: StoreId,
60}
61
62impl AsStoreRef for AsyncCallStoreMut {
63 fn as_store_ref(&self) -> StoreRef<'_> {
64 unsafe {
68 StoreRef {
69 inner: StoreContext::get_current_transient(self.store_id)
70 .as_ref()
71 .unwrap(),
72 }
73 }
74 }
75}
76
77impl AsStoreMut for AsyncCallStoreMut {
78 fn as_store_mut(&mut self) -> StoreMut<'_> {
79 unsafe {
83 StoreMut {
84 inner: StoreContext::get_current_transient(self.store_id)
85 .as_mut()
86 .unwrap(),
87 }
88 }
89 }
90
91 fn objects_mut(&mut self) -> &mut crate::StoreObjects {
92 unsafe {
96 &mut StoreContext::get_current_transient(self.store_id)
97 .as_mut()
98 .unwrap()
99 .objects
100 }
101 }
102}
103
104impl AsyncCallFuture {
105 pub(crate) fn new(function: SysFunction, store: StoreAsync, params: Vec<Value>) -> Self {
106 let store_id = store.id;
107 let coroutine =
108 Coroutine::new(move |yielder: &Yielder<AsyncResume, AsyncYield>, resume| {
109 assert!(matches!(resume, AsyncResume::Start));
110
111 let ctx_state = CoroutineContext::new(yielder);
112 ctx_state.enter();
113 let result = {
114 let mut store_mut = AsyncCallStoreMut { store_id };
115 function.call(&mut store_mut, ¶ms)
116 };
117 ctx_state.leave();
118 result
119 });
120
121 Self {
122 coroutine: Some(coroutine),
123 pending_store_install: None,
124 pending_future: None,
125 next_resume: Some(AsyncResume::Start),
126 result: None,
127 store,
128 }
129 }
130}
131
132impl Future for AsyncCallFuture {
133 type Output = Result<Box<[Value]>, RuntimeError>;
134
135 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 loop {
137 if let Some(future) = self.pending_future.as_mut() {
138 match future.as_mut().poll(cx) {
139 Poll::Ready(result) => {
140 self.pending_future = None;
141 self.next_resume = Some(AsyncResume::HostFutureReady(result));
142 }
143 Poll::Pending => return Poll::Pending,
144 }
145 }
146
147 if self.coroutine.is_none() {
149 return Poll::Ready(self.result.take().expect("polled after completion"));
150 }
151
152 if self.pending_store_install.is_none() {
154 self.pending_store_install = Some(Box::pin(install_store_context(StoreAsync {
155 id: self.store.id,
156 inner: self.store.inner.clone(),
157 })));
158 }
159
160 let store_context_guard = match self
163 .pending_store_install
164 .as_mut()
165 .unwrap()
166 .as_mut()
167 .poll(cx)
168 {
169 Poll::Ready(guard) => {
170 self.pending_store_install = None;
171 guard
172 }
173 Poll::Pending => return Poll::Pending,
174 };
175
176 let resume_arg = self.next_resume.take().expect("no resume arg available");
177 let coroutine = self.coroutine.as_mut().unwrap();
178 match coroutine.resume(resume_arg) {
179 CoroutineResult::Yield(AsyncYield(fut)) => {
180 self.pending_future = Some(fut);
181 }
182 CoroutineResult::Return(result) => {
183 self.coroutine = None;
184 self.result = Some(result);
185 }
186 }
187
188 drop(store_context_guard);
191 }
192 }
193}
194
195async fn install_store_context(store: StoreAsync) -> ForcedStoreInstallGuard {
196 match unsafe { crate::StoreContext::try_get_current_async(store.id) } {
197 crate::GetStoreAsyncGuardResult::NotInstalled => {
198 let store_guard = store.inner.write().await;
200 unsafe { crate::StoreContext::install_async(store_guard) }
201 }
202 _ => {
203 panic!(
217 "Function::call_async futures cannot be polled recursively \
218 from within another imported function. If you need to await \
219 a recursive call_async, consider spawning the future into \
220 your async runtime and awaiting the resulting task; \
221 e.g. tokio::task::spawn(func.call_async(...)).await"
222 );
223 }
224 }
225}
226
227pub enum AsyncRuntimeError {
228 YieldOutsideAsyncContext,
229 RuntimeError(RuntimeError),
230}
231
232pub(crate) fn block_on_host_future<Fut>(future: Fut) -> Result<Vec<Value>, AsyncRuntimeError>
233where
234 Fut: Future<Output = Result<Vec<Value>, RuntimeError>> + 'static,
235{
236 CURRENT_CONTEXT.with(|cell| {
237 match CoroutineContext::get_current() {
238 None => {
239 run_immediate(future)
243 }
244 Some(context) => unsafe { context.as_ref().expect("valid context pointer") }
245 .block_on_future(Box::pin(future))
246 .map_err(AsyncRuntimeError::RuntimeError),
247 }
248 })
249}
250
251thread_local! {
252 static CURRENT_CONTEXT: RefCell<Vec<*const CoroutineContext>> = const { RefCell::new(Vec::new()) };
253}
254
255struct CoroutineContext {
256 yielder: *const Yielder<AsyncResume, AsyncYield>,
257}
258
259impl CoroutineContext {
260 fn new(yielder: &Yielder<AsyncResume, AsyncYield>) -> Self {
261 Self {
262 yielder: yielder as *const _,
263 }
264 }
265
266 fn enter(&self) {
267 CURRENT_CONTEXT.with(|cell| {
268 let mut borrow = cell.borrow_mut();
269
270 borrow.push(self as *const _);
272 })
273 }
274
275 fn leave(&self) {
281 CURRENT_CONTEXT.with(|cell| {
282 let mut borrow = cell.borrow_mut();
283
284 assert_eq!(
286 borrow.pop(),
287 Some(self as *const _),
288 "Active coroutine stack corrupted"
289 );
290 });
291 }
292
293 fn get_current() -> Option<*const Self> {
294 CURRENT_CONTEXT.with(|cell| cell.borrow().last().copied())
295 }
296
297 fn block_on_future(&self, future: HostFuture) -> Result<Vec<Value>, RuntimeError> {
298 self.leave();
301
302 let yielder = unsafe { self.yielder.as_ref().expect("yielder pointer valid") };
303 let result = match yielder.suspend(AsyncYield(future)) {
304 AsyncResume::HostFutureReady(result) => result,
305 AsyncResume::Start => unreachable!("coroutine resumed without start"),
306 };
307
308 self.enter();
311
312 result
313 }
314}
315
316fn run_immediate(
317 future: impl Future<Output = Result<Vec<Value>, RuntimeError>> + 'static,
318) -> Result<Vec<Value>, AsyncRuntimeError> {
319 let waker = futures::task::noop_waker();
320 let mut cx = Context::from_waker(&waker);
321 let mut future = Box::pin(future);
322 match future.as_mut().poll(&mut cx) {
323 Poll::Ready(result) => result.map_err(AsyncRuntimeError::RuntimeError),
324 Poll::Pending => Err(AsyncRuntimeError::YieldOutsideAsyncContext),
325 }
326}