use std::task::Waker;
use super::*;
use crate::syscalls::*;
struct FutexPoller {
state: Arc<WasiState>,
poller_idx: u64,
futex_idx: u64,
expected: u32,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
}
impl Future for FutexPoller {
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
let mut guard = self.state.futexs.lock().unwrap();
let futex = match guard.futexes.get_mut(&self.futex_idx) {
Some(f) => f,
None => return Poll::Ready(true),
};
let waker = match futex.wakers.get_mut(&self.poller_idx) {
Some(w) => w,
None => return Poll::Ready(true),
};
waker.replace(cx.waker().clone());
drop(guard);
if let Some(timeout) = self.timeout.as_mut() {
let timeout = timeout.as_mut();
if timeout.poll(cx).is_ready() {
self.timeout.take();
return Poll::Ready(false);
}
}
Poll::Pending
}
}
impl Drop for FutexPoller {
fn drop(&mut self) {
let mut guard = self.state.futexs.lock().unwrap();
let mut should_remove = false;
if let Some(futex) = guard.futexes.get_mut(&self.futex_idx) {
if let Some(Some(waker)) = futex.wakers.remove(&self.poller_idx) {
waker.wake();
}
should_remove = futex.wakers.is_empty();
}
if should_remove {
guard.futexes.remove(&self.futex_idx);
}
}
}
#[instrument(level = "trace", skip_all, fields(futex_idx = field::Empty, poller_idx = field::Empty, %expected, timeout = field::Empty, woken = field::Empty))]
pub fn futex_wait<M: MemorySize + 'static>(
ctx: FunctionEnvMut<'_, WasiEnv>,
futex_ptr: WasmPtr<u32, M>,
expected: u32,
timeout: WasmPtr<OptionTimestamp, M>,
ret_woken: WasmPtr<Bool, M>,
) -> Result<Errno, WasiError> {
futex_wait_internal(ctx, futex_ptr, expected, timeout, ret_woken)
}
pub(super) fn futex_wait_internal<M: MemorySize + 'static>(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
futex_ptr: WasmPtr<u32, M>,
expected: u32,
timeout: WasmPtr<OptionTimestamp, M>,
ret_woken: WasmPtr<Bool, M>,
) -> Result<Errno, WasiError> {
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
if let Some(_woken) = unsafe { handle_rewind::<M, bool>(&mut ctx) } {
}
let mut env = ctx.data();
let timeout = {
let memory = unsafe { env.memory_view(&ctx) };
wasi_try_mem_ok!(timeout.read(&memory))
};
let timeout = match timeout.tag {
OptionTag::Some => Some(Duration::from_nanos(timeout.u)),
_ => None,
};
Span::current().record("timeout", format!("{:?}", timeout));
let state = env.state.clone();
let futex_idx: u64 = futex_ptr.offset().into();
Span::current().record("futex_idx", futex_idx);
let poller = {
let mut guard = env.state.futexs.lock().unwrap();
guard.poller_seed += 1;
let poller_idx = guard.poller_seed;
let timeout = timeout.map(|timeout| env.tasks().sleep_now(timeout));
let futex = guard.futexes.entry(futex_idx).or_default();
futex.wakers.insert(poller_idx, Default::default());
Span::current().record("poller_idx", poller_idx);
FutexPoller {
state: env.state.clone(),
poller_idx,
futex_idx,
expected,
timeout,
}
};
let memory = unsafe { env.memory_view(&ctx) };
let val = wasi_try_mem_ok!(futex_ptr.read(&memory));
if val != expected {
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::True));
return Ok(Errno::Success);
}
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
tracing::trace!("wait on {futex_idx}");
let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, Box::pin(poller))?;
if let AsyncifyAction::Finish(ctx, res) = res {
let mut env = ctx.data();
let memory = unsafe { env.memory_view(&ctx) };
if res {
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::True));
} else {
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
}
}
Ok(Errno::Success)
}