wasmer_wasix/syscalls/wasix/
futex_wait.rs

1use std::task::Waker;
2
3use super::*;
4use crate::syscalls::*;
5
6/// Poller returns true if its triggered and false if it times out
7struct FutexPoller {
8    state: Arc<WasiState>,
9    poller_idx: u64,
10    futex_idx: u64,
11    expected: u32,
12    timeout: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
13}
14impl Future for FutexPoller {
15    type Output = bool;
16    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
17        let mut guard = self.state.futexs.lock().unwrap();
18
19        // If the futex itself is no longer registered then it was likely
20        // woken by a wake call
21        let futex = match guard.futexes.get_mut(&self.futex_idx) {
22            Some(f) => f,
23            None => return Poll::Ready(true),
24        };
25        let waker = match futex.wakers.get_mut(&self.poller_idx) {
26            Some(w) => w,
27            None => return Poll::Ready(true),
28        };
29
30        // Register the waker
31        waker.replace(cx.waker().clone());
32
33        // Check for timeout
34        drop(guard);
35        if let Some(timeout) = self.timeout.as_mut() {
36            let timeout = timeout.as_mut();
37            if timeout.poll(cx).is_ready() {
38                self.timeout.take();
39                return Poll::Ready(false);
40            }
41        }
42
43        // We will now wait to be woken
44        Poll::Pending
45    }
46}
47impl Drop for FutexPoller {
48    fn drop(&mut self) {
49        let mut guard = self.state.futexs.lock().unwrap();
50
51        let mut should_remove = false;
52        if let Some(futex) = guard.futexes.get_mut(&self.futex_idx) {
53            if let Some(Some(waker)) = futex.wakers.remove(&self.poller_idx) {
54                waker.wake();
55            }
56            should_remove = futex.wakers.is_empty();
57        }
58        if should_remove {
59            guard.futexes.remove(&self.futex_idx);
60        }
61    }
62}
63
64/// Wait for a futex_wake operation to wake us.
65/// Returns with EINVAL if the futex doesn't hold the expected value.
66/// Returns false on timeout, and true in all other cases.
67///
68/// ## Parameters
69///
70/// * `futex` - Memory location that holds the value that will be checked
71/// * `expected` - Expected value that should be currently held at the memory location
72/// * `timeout` - Timeout should the futex not be triggered in the allocated time
73#[instrument(level = "trace", skip_all, fields(futex_idx = field::Empty, poller_idx = field::Empty, %expected, timeout = field::Empty, woken = field::Empty))]
74pub fn futex_wait<M: MemorySize + 'static>(
75    mut ctx: FunctionEnvMut<'_, WasiEnv>,
76    futex_ptr: WasmPtr<u32, M>,
77    expected: u32,
78    timeout: WasmPtr<OptionTimestamp, M>,
79    ret_woken: WasmPtr<Bool, M>,
80) -> Result<Errno, WasiError> {
81    WasiEnv::do_pending_operations(&mut ctx)?;
82
83    futex_wait_internal(ctx, futex_ptr, expected, timeout, ret_woken)
84}
85
86pub(super) fn futex_wait_internal<M: MemorySize + 'static>(
87    mut ctx: FunctionEnvMut<'_, WasiEnv>,
88    futex_ptr: WasmPtr<u32, M>,
89    expected: u32,
90    timeout: WasmPtr<OptionTimestamp, M>,
91    ret_woken: WasmPtr<Bool, M>,
92) -> Result<Errno, WasiError> {
93    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
94    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
95
96    // If we were just restored then we were woken after a deep sleep
97    // and thus we repeat all the checks again, we do not immediately
98    // exit here as it could be the case that we were woken but the
99    // expected value does not match
100    if let Some(_woken) = unsafe { handle_rewind::<M, bool>(&mut ctx) } {
101        // fall through so the normal checks kick in, this will
102        // ensure that the expected value has changed before
103        // this syscall returns even if it was woken
104    }
105
106    // Determine the timeout
107    let mut env = ctx.data();
108    let timeout = {
109        let memory = unsafe { env.memory_view(&ctx) };
110        wasi_try_mem_ok!(timeout.read(&memory))
111    };
112    let timeout = match timeout.tag {
113        OptionTag::Some => Some(Duration::from_nanos(timeout.u)),
114        _ => None,
115    };
116    Span::current().record("timeout", format!("{timeout:?}"));
117
118    let state = env.state.clone();
119    let futex_idx: u64 = futex_ptr.offset().into();
120    Span::current().record("futex_idx", futex_idx);
121
122    // We generate a new poller which also registers in the
123    // shared state futex lookup. When this object is dropped
124    // it will remove itself from the lookup. It can also be
125    // removed whenever the wake call is invoked (which could
126    // be before the poller is polled).
127    let poller = {
128        let mut guard = env.state.futexs.lock().unwrap();
129        guard.poller_seed += 1;
130        let poller_idx = guard.poller_seed;
131
132        // Create the timeout if one exists
133        let timeout = timeout.map(|timeout| env.tasks().sleep_now(timeout));
134
135        // We insert the futex before we check the condition variable to avoid
136        // certain race conditions
137        let futex = guard.futexes.entry(futex_idx).or_default();
138        futex.wakers.insert(poller_idx, Default::default());
139
140        Span::current().record("poller_idx", poller_idx);
141        FutexPoller {
142            state: env.state.clone(),
143            poller_idx,
144            futex_idx,
145            expected,
146            timeout,
147        }
148    };
149
150    // We check if the expected value has changed
151    let memory = unsafe { env.memory_view(&ctx) };
152    let val = wasi_try_mem_ok!(futex_ptr.read(&memory));
153    if val != expected {
154        // We have been triggered so do not go into a wait
155        wasi_try_mem_ok!(ret_woken.write(&memory, Bool::True));
156        return Ok(Errno::Success);
157    }
158
159    // We clear the woken flag (so if the poller fails to trigger
160    // then the value is not set) - the poller will set it to true
161    wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
162
163    // We use asyncify on the poller and potentially go into deep sleep
164    tracing::trace!("wait on {futex_idx}");
165    let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, Box::pin(poller))?;
166    if let AsyncifyAction::Finish(ctx, res) = res {
167        let mut env = ctx.data();
168        let memory = unsafe { env.memory_view(&ctx) };
169        if res {
170            wasi_try_mem_ok!(ret_woken.write(&memory, Bool::True));
171        } else {
172            wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
173        }
174    }
175    Ok(Errno::Success)
176}