wasmer_wasix/syscalls/wasi/
poll_oneoff.rs

1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6    fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
7    state::PollEventSet,
8    syscalls::*,
9};
10
11/// An event that occurred.
12#[derive(Serialize, Deserialize)]
13pub enum EventResultType {
14    Clock(u8),
15    Fd(EventFdReadwrite),
16}
17
18/// An event that occurred.
19#[derive(Serialize, Deserialize)]
20pub struct EventResult {
21    /// User-provided value that got attached to `subscription::userdata`.
22    pub userdata: Userdata,
23    /// If non-zero, an error that occurred while processing the subscription request.
24    pub error: Errno,
25    /// Type of event that was triggered
26    pub type_: Eventtype,
27    /// The type of the event that occurred, and the contents of the event
28    pub inner: EventResultType,
29}
30impl EventResult {
31    pub fn into_event(self) -> Event {
32        Event {
33            userdata: self.userdata,
34            error: self.error,
35            type_: self.type_,
36            u: match self.inner {
37                EventResultType::Clock(id) => EventUnion { clock: id },
38                EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
39            },
40        }
41    }
42}
43
44/// ### `poll_oneoff()`
45/// Concurrently poll for a set of events
46///
47/// Inputs:
48/// - `const __wasi_subscription_t *in`
49///     The events to subscribe to
50/// - `__wasi_event_t *out`
51///     The events that have occurred
52/// - `u32 nsubscriptions`
53///     The number of subscriptions and the number of events
54///
55/// Output:
56/// - `u32 nevents`
57///     The number of events seen
58#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
59pub fn poll_oneoff<M: MemorySize + 'static>(
60    mut ctx: FunctionEnvMut<'_, WasiEnv>,
61    in_: WasmPtr<Subscription, M>,
62    out_: WasmPtr<Event, M>,
63    nsubscriptions: M::Offset,
64    nevents: WasmPtr<M::Offset, M>,
65) -> Result<Errno, WasiError> {
66    WasiEnv::do_pending_operations(&mut ctx)?;
67
68    // An empty subscription list would otherwise block forever in the poll loop.
69    if nsubscriptions == M::ZERO {
70        return Ok(Errno::Inval);
71    }
72
73    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
74    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
75
76    ctx.data_mut().poll_seed += 1;
77    let mut env = ctx.data();
78    let mut memory = unsafe { env.memory_view(&ctx) };
79
80    let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
81    let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
82    for n in 0..subscription_array.len() {
83        let n = (n + env.poll_seed) % subscription_array.len();
84        let sub = subscription_array.index(n);
85        let s = wasi_try_mem_ok!(sub.read());
86        subscriptions.push((None, PollEventSet::default(), s));
87    }
88
89    // We clear the number of events
90    wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
91
92    // Function to invoke once the poll is finished
93    let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
94        let mut env = ctx.data();
95        let mut memory = unsafe { env.memory_view(&ctx) };
96
97        // Process all the events that were triggered
98        let mut events_seen: u32 = 0;
99        let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
100        for event in triggered_events {
101            wasi_try_mem!(event_array.index(events_seen as u64).write(event));
102            events_seen += 1;
103        }
104        let events_seen: M::Offset = events_seen.into();
105        let out_ptr = nevents.deref(&memory);
106        wasi_try_mem!(out_ptr.write(events_seen));
107        Errno::Success
108    };
109
110    // Poll and receive all the events that triggered
111    poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
112}
113
114struct PollBatch {
115    pid: WasiProcessId,
116    tid: WasiThreadId,
117    evts: Vec<Event>,
118    joins: Vec<InodeValFilePollGuardJoin>,
119}
120impl PollBatch {
121    fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
122        Self {
123            pid,
124            tid,
125            evts: Vec::new(),
126            joins: fds
127                .into_iter()
128                .map(InodeValFilePollGuardJoin::new)
129                .collect(),
130        }
131    }
132}
133impl Future for PollBatch {
134    type Output = Result<Vec<EventResult>, Errno>;
135    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136        let pid = self.pid;
137        let tid = self.tid;
138        let mut done = false;
139
140        let mut evts = Vec::new();
141        for mut join in self.joins.iter_mut() {
142            let fd = join.fd();
143            let peb = join.peb();
144            let mut guard = Pin::new(join);
145            match guard.poll(cx) {
146                Poll::Pending => {}
147                Poll::Ready(e) => {
148                    for (evt, readiness) in e {
149                        tracing::trace!(
150                            fd,
151                            readiness = ?readiness,
152                            userdata = evt.userdata,
153                            ty = evt.type_ as u8,
154                            peb,
155                            "triggered"
156                        );
157                        evts.push(evt);
158                    }
159                }
160            }
161        }
162
163        if !evts.is_empty() {
164            return Poll::Ready(Ok(evts));
165        }
166
167        Poll::Pending
168    }
169}
170
171pub(crate) fn poll_fd_guard(
172    state: &Arc<WasiState>,
173    peb: PollEventSet,
174    fd: WasiFd,
175    s: Subscription,
176) -> Result<InodeValFilePollGuard, Errno> {
177    let fd_entry = state.fs.get_fd(fd)?;
178    let requires_access = match s.type_ {
179        Eventtype::FdRead => Rights::FD_READ,
180        Eventtype::FdWrite => Rights::FD_WRITE,
181        _ => Rights::empty(),
182    };
183
184    if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
185        && fd_entry.inner.rights.contains(requires_access))
186    {
187        return Err(Errno::Access);
188    }
189    let inode = fd_entry.inode;
190
191    let guard = inode.read();
192    crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref()).ok_or(Errno::Badf)
193}
194
195/// ### `poll_oneoff()`
196/// Concurrently poll for a set of events
197///
198/// Inputs:
199/// - `const __wasi_subscription_t *in`
200///   The events to subscribe to
201/// - `__wasi_event_t *out`
202///   The events that have occurred
203/// - `u32 nsubscriptions`
204///   The number of subscriptions and the number of events
205///
206/// Output:
207/// - `u32 nevents`
208///   The number of events seen
209pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
210    mut ctx: FunctionEnvMut<'a, WasiEnv>,
211    mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
212    process_events: After,
213) -> Result<Errno, WasiError>
214where
215    After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
216{
217    wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
218
219    let pid = ctx.data().pid();
220    let tid = ctx.data().tid();
221    let subs_len = subs.len();
222
223    // Determine if we are in silent polling mode
224    let mut env = ctx.data();
225    let state = ctx.data().state.deref();
226    let memory = unsafe { env.memory_view(&ctx) };
227
228    // These are used when we capture what clocks (timeouts) are being
229    // subscribed too
230    let clock_cnt = subs
231        .iter()
232        .filter(|a| a.2.type_ == Eventtype::Clock)
233        .count();
234    let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
235    let mut time_to_sleep = Duration::MAX;
236
237    // First we extract all the subscriptions into an array so that they
238    // can be processed
239    let mut env = ctx.data();
240    let state = ctx.data().state.deref();
241    let mut memory = unsafe { env.memory_view(&ctx) };
242    for (fd, peb, s) in subs.iter_mut() {
243        let fd = match s.type_ {
244            Eventtype::FdRead => {
245                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
246                *fd = Some(file_descriptor);
247                *peb |= (PollEvent::PollIn as PollEventSet);
248                file_descriptor
249            }
250            Eventtype::FdWrite => {
251                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
252                *fd = Some(file_descriptor);
253                *peb |= (PollEvent::PollOut as PollEventSet);
254                file_descriptor
255            }
256            Eventtype::Clock => {
257                let clock_info = unsafe { s.data.clock };
258                if clock_info.clock_id == Clockid::Realtime
259                    || clock_info.clock_id == Clockid::Monotonic
260                {
261                    // Ignore duplicates
262                    if clock_subs
263                        .iter()
264                        .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
265                    {
266                        continue;
267                    }
268
269                    // If the timeout duration is zero then this is an immediate check rather than
270                    // a sleep itself
271                    if clock_info.timeout == 0 {
272                        time_to_sleep = Duration::MAX;
273                    } else if clock_info.timeout == 1 {
274                        time_to_sleep = Duration::ZERO;
275                        clock_subs.push((clock_info, s.userdata));
276                    } else {
277                        // if the timeout is specified as an absolute time in the future,
278                        // we should calculate the duration we need to sleep
279                        time_to_sleep = if clock_info
280                            .flags
281                            .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
282                        {
283                            let now = wasi_try_ok!(platform_clock_time_get(
284                                Snapshot0Clockid::Monotonic,
285                                1
286                            )) as u64;
287
288                            if clock_info.timeout <= now {
289                                Duration::ZERO
290                            } else {
291                                Duration::from_nanos(clock_info.timeout) - Duration::from_nanos(now)
292                            }
293                        } else {
294                            // if the timeout is not absolute, just use it as duration
295                            Duration::from_nanos(clock_info.timeout)
296                        };
297
298                        clock_subs.push((clock_info, s.userdata));
299                    }
300                    continue;
301                } else {
302                    error!("polling not implemented for these clocks yet");
303                    return Ok(Errno::Inval);
304                }
305            }
306            _ => {
307                continue;
308            }
309        };
310    }
311
312    let mut events_seen: u32 = 0;
313
314    let batch = {
315        // Build the batch of things we are going to poll
316        let state = ctx.data().state.clone();
317        let tasks = ctx.data().tasks().clone();
318        let mut guards = {
319            // We start by building a list of files we are going to poll
320            // and open a read lock on them all
321            let mut fd_guards = Vec::with_capacity(subs.len());
322
323            #[allow(clippy::significant_drop_in_scrutinee)]
324            for (fd, peb, s) in subs {
325                if let Some(fd) = fd {
326                    let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
327                    fd_guards.push(wasi_file_ref);
328                }
329            }
330
331            if fd_guards.len() > 10 {
332                let small_list: Vec<_> = fd_guards.iter().take(10).collect();
333                tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
334            } else {
335                tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
336            }
337
338            fd_guards
339        };
340
341        // Block polling the file descriptors
342        PollBatch::new(pid, tid, guards)
343    };
344
345    // If the time is infinite then we omit the time_to_sleep parameter
346    let timeout = match time_to_sleep {
347        Duration::ZERO => {
348            Span::current().record("timeout_ns", "nonblocking");
349            Some(Duration::ZERO)
350        }
351        Duration::MAX => {
352            Span::current().record("timeout_ns", "infinite");
353            None
354        }
355        time => {
356            Span::current().record("timeout_ns", time.as_millis());
357            Some(time)
358        }
359    };
360
361    // Function to process a timeout
362    let process_timeout = {
363        let clock_subs = clock_subs.clone();
364        |ctx: &FunctionEnvMut<'a, WasiEnv>| {
365            // The timeout has triggered so lets add that event
366            if clock_subs.is_empty() {
367                tracing::warn!("triggered_timeout (without any clock subscriptions)");
368            }
369            let mut evts = Vec::new();
370            for (clock_info, userdata) in clock_subs {
371                let evt = Event {
372                    userdata,
373                    error: Errno::Success,
374                    type_: Eventtype::Clock,
375                    u: EventUnion { clock: 0 },
376                };
377                Span::current().record(
378                    "seen",
379                    format!(
380                        "clock(id={},userdata={})",
381                        clock_info.clock_id as u32, evt.userdata
382                    ),
383                );
384                evts.push(evt);
385            }
386            evts
387        }
388    };
389
390    #[cfg(feature = "sys")]
391    if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
392        // Here, `poll_oneoff` is merely in a sleeping state
393        // due to a single relative timer event. This particular scenario was
394        // added following experimental findings indicating that std::thread::sleep
395        // yields more consistent sleep durations, allowing wasmer to meet
396        // real-time demands with greater precision.
397        if let Some(timeout) = timeout {
398            std::thread::sleep(timeout);
399            process_events(&ctx, process_timeout(&ctx));
400            return Ok(Errno::Success);
401        }
402    }
403
404    let tasks = env.tasks().clone();
405    let timeout = async move {
406        if let Some(timeout) = timeout {
407            tasks.sleep_now(timeout).await;
408        } else {
409            InfiniteSleep::default().await
410        }
411    };
412
413    // Build the trigger using the timeout
414    let trigger = async move {
415        tokio::select! {
416            res = batch => res,
417            _ = timeout => Err(Errno::Timedout)
418        }
419    };
420
421    // We replace the process events callback with another callback
422    // which will interpret the error codes
423    let process_events = {
424        let clock_subs = clock_subs.clone();
425        |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
426            // Process the result
427            match events {
428                Ok(evts) => {
429                    // If its a timeout then return an event for it
430                    if evts.len() == 1 {
431                        Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
432                    } else {
433                        Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
434                    }
435
436                    // Process the events
437                    process_events(ctx, evts)
438                }
439                Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
440                // If nonblocking the Errno::Again needs to be turned into an empty list
441                Err(Errno::Again) => process_events(ctx, Default::default()),
442                // Otherwise process the error
443                Err(err) => {
444                    tracing::warn!("failed to poll during deep sleep - {}", err);
445                    err
446                }
447            }
448        }
449    };
450
451    // If we are rewound then its time to process them
452    if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
453        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
454        process_events(&ctx, events);
455        return Ok(Errno::Success);
456    }
457
458    // We use asyncify with a deep sleep to wait on new IO events
459    let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
460        ctx,
461        Box::pin(trigger),
462    )?;
463    if let AsyncifyAction::Finish(mut ctx, events) = res {
464        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
465        process_events(&ctx, events);
466    }
467    Ok(Errno::Success)
468}