wasmer_wasix/syscalls/wasi/
poll_oneoff.rs

1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6    WasiInodes,
7    fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
8    state::PollEventSet,
9    syscalls::*,
10};
11
12/// An event that occurred.
13#[derive(Serialize, Deserialize)]
14pub enum EventResultType {
15    Clock(u8),
16    Fd(EventFdReadwrite),
17}
18
19/// An event that occurred.
20#[derive(Serialize, Deserialize)]
21pub struct EventResult {
22    /// User-provided value that got attached to `subscription::userdata`.
23    pub userdata: Userdata,
24    /// If non-zero, an error that occurred while processing the subscription request.
25    pub error: Errno,
26    /// Type of event that was triggered
27    pub type_: Eventtype,
28    /// The type of the event that occurred, and the contents of the event
29    pub inner: EventResultType,
30}
31impl EventResult {
32    pub fn into_event(self) -> Event {
33        Event {
34            userdata: self.userdata,
35            error: self.error,
36            type_: self.type_,
37            u: match self.inner {
38                EventResultType::Clock(id) => EventUnion { clock: id },
39                EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
40            },
41        }
42    }
43}
44
45/// ### `poll_oneoff()`
46/// Concurrently poll for a set of events
47///
48/// Inputs:
49/// - `const __wasi_subscription_t *in`
50///     The events to subscribe to
51/// - `__wasi_event_t *out`
52///     The events that have occured
53/// - `u32 nsubscriptions`
54///     The number of subscriptions and the number of events
55///
56/// Output:
57/// - `u32 nevents`
58///     The number of events seen
59#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
60pub fn poll_oneoff<M: MemorySize + 'static>(
61    mut ctx: FunctionEnvMut<'_, WasiEnv>,
62    in_: WasmPtr<Subscription, M>,
63    out_: WasmPtr<Event, M>,
64    nsubscriptions: M::Offset,
65    nevents: WasmPtr<M::Offset, M>,
66) -> Result<Errno, WasiError> {
67    WasiEnv::do_pending_operations(&mut ctx)?;
68
69    // An empty subscription list would otherwise block forever in the poll loop.
70    if nsubscriptions == M::ZERO {
71        return Ok(Errno::Inval);
72    }
73
74    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
75    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
76
77    ctx.data_mut().poll_seed += 1;
78    let mut env = ctx.data();
79    let mut memory = unsafe { env.memory_view(&ctx) };
80
81    let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
82    let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
83    for n in 0..subscription_array.len() {
84        let n = (n + env.poll_seed) % subscription_array.len();
85        let sub = subscription_array.index(n);
86        let s = wasi_try_mem_ok!(sub.read());
87        subscriptions.push((None, PollEventSet::default(), s));
88    }
89
90    // We clear the number of events
91    wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
92
93    // Function to invoke once the poll is finished
94    let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
95        let mut env = ctx.data();
96        let mut memory = unsafe { env.memory_view(&ctx) };
97
98        // Process all the events that were triggered
99        let mut events_seen: u32 = 0;
100        let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
101        for event in triggered_events {
102            wasi_try_mem!(event_array.index(events_seen as u64).write(event));
103            events_seen += 1;
104        }
105        let events_seen: M::Offset = events_seen.into();
106        let out_ptr = nevents.deref(&memory);
107        wasi_try_mem!(out_ptr.write(events_seen));
108        Errno::Success
109    };
110
111    // Poll and receive all the events that triggered
112    poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
113}
114
115struct PollBatch {
116    pid: WasiProcessId,
117    tid: WasiThreadId,
118    evts: Vec<Event>,
119    joins: Vec<InodeValFilePollGuardJoin>,
120}
121impl PollBatch {
122    fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
123        Self {
124            pid,
125            tid,
126            evts: Vec::new(),
127            joins: fds
128                .into_iter()
129                .map(InodeValFilePollGuardJoin::new)
130                .collect(),
131        }
132    }
133}
134impl Future for PollBatch {
135    type Output = Result<Vec<EventResult>, Errno>;
136    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137        let pid = self.pid;
138        let tid = self.tid;
139        let mut done = false;
140
141        let mut evts = Vec::new();
142        for mut join in self.joins.iter_mut() {
143            let fd = join.fd();
144            let peb = join.peb();
145            let mut guard = Pin::new(join);
146            match guard.poll(cx) {
147                Poll::Pending => {}
148                Poll::Ready(e) => {
149                    for (evt, readiness) in e {
150                        tracing::trace!(
151                            fd,
152                            readiness = ?readiness,
153                            userdata = evt.userdata,
154                            ty = evt.type_ as u8,
155                            peb,
156                            "triggered"
157                        );
158                        evts.push(evt);
159                    }
160                }
161            }
162        }
163
164        if !evts.is_empty() {
165            return Poll::Ready(Ok(evts));
166        }
167
168        Poll::Pending
169    }
170}
171
172pub(crate) fn poll_fd_guard(
173    state: &Arc<WasiState>,
174    peb: PollEventSet,
175    fd: WasiFd,
176    s: Subscription,
177) -> Result<InodeValFilePollGuard, Errno> {
178    Ok(match fd {
179        __WASI_STDERR_FILENO => WasiInodes::stderr(&state.fs.fd_map)
180            .map(|g| g.into_poll_guard(fd, peb, s))
181            .map_err(fs_error_into_wasi_err)?,
182        __WASI_STDOUT_FILENO => WasiInodes::stdout(&state.fs.fd_map)
183            .map(|g| g.into_poll_guard(fd, peb, s))
184            .map_err(fs_error_into_wasi_err)?,
185        _ => {
186            let fd_entry = state.fs.get_fd(fd)?;
187            if !fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE) {
188                return Err(Errno::Access);
189            }
190            let inode = fd_entry.inode;
191
192            {
193                let guard = inode.read();
194                if let Some(guard) =
195                    crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref())
196                {
197                    guard
198                } else {
199                    return Err(Errno::Badf);
200                }
201            }
202        }
203    })
204}
205
206/// ### `poll_oneoff()`
207/// Concurrently poll for a set of events
208///
209/// Inputs:
210/// - `const __wasi_subscription_t *in`
211///   The events to subscribe to
212/// - `__wasi_event_t *out`
213///   The events that have occured
214/// - `u32 nsubscriptions`
215///   The number of subscriptions and the number of events
216///
217/// Output:
218/// - `u32 nevents`
219///   The number of events seen
220pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
221    mut ctx: FunctionEnvMut<'a, WasiEnv>,
222    mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
223    process_events: After,
224) -> Result<Errno, WasiError>
225where
226    After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
227{
228    wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
229
230    let pid = ctx.data().pid();
231    let tid = ctx.data().tid();
232    let subs_len = subs.len();
233
234    // Determine if we are in silent polling mode
235    let mut env = ctx.data();
236    let state = ctx.data().state.deref();
237    let memory = unsafe { env.memory_view(&ctx) };
238
239    // These are used when we capture what clocks (timeouts) are being
240    // subscribed too
241    let clock_cnt = subs
242        .iter()
243        .filter(|a| a.2.type_ == Eventtype::Clock)
244        .count();
245    let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
246    let mut time_to_sleep = Duration::MAX;
247
248    // First we extract all the subscriptions into an array so that they
249    // can be processed
250    let mut env = ctx.data();
251    let state = ctx.data().state.deref();
252    let mut memory = unsafe { env.memory_view(&ctx) };
253    for (fd, peb, s) in subs.iter_mut() {
254        let fd = match s.type_ {
255            Eventtype::FdRead => {
256                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
257                match file_descriptor {
258                    __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
259                    fd => {
260                        let fd_entry = match state.fs.get_fd(fd) {
261                            Ok(a) => a,
262                            Err(err) => return Ok(err),
263                        };
264                        if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
265                            && fd_entry.inner.rights.contains(Rights::FD_READ))
266                        {
267                            return Ok(Errno::Access);
268                        }
269                    }
270                }
271                *fd = Some(file_descriptor);
272                *peb |= (PollEvent::PollIn as PollEventSet);
273                file_descriptor
274            }
275            Eventtype::FdWrite => {
276                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
277                match file_descriptor {
278                    __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
279                    fd => {
280                        let fd_entry = match state.fs.get_fd(fd) {
281                            Ok(a) => a,
282                            Err(err) => return Ok(err),
283                        };
284                        if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
285                            && fd_entry.inner.rights.contains(Rights::FD_WRITE))
286                        {
287                            return Ok(Errno::Access);
288                        }
289                    }
290                }
291                *fd = Some(file_descriptor);
292                *peb |= (PollEvent::PollOut as PollEventSet);
293                file_descriptor
294            }
295            Eventtype::Clock => {
296                let clock_info = unsafe { s.data.clock };
297                if clock_info.clock_id == Clockid::Realtime
298                    || clock_info.clock_id == Clockid::Monotonic
299                {
300                    // Ignore duplicates
301                    if clock_subs
302                        .iter()
303                        .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
304                    {
305                        continue;
306                    }
307
308                    // If the timeout duration is zero then this is an immediate check rather than
309                    // a sleep itself
310                    if clock_info.timeout == 0 {
311                        time_to_sleep = Duration::MAX;
312                    } else if clock_info.timeout == 1 {
313                        time_to_sleep = Duration::ZERO;
314                        clock_subs.push((clock_info, s.userdata));
315                    } else {
316                        // if the timeout is specified as an absolute time in the future,
317                        // we should calculate the duration we need to sleep
318                        time_to_sleep = if clock_info
319                            .flags
320                            .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
321                        {
322                            let now = wasi_try_ok!(platform_clock_time_get(
323                                Snapshot0Clockid::Monotonic,
324                                1
325                            )) as u64;
326
327                            if clock_info.timeout <= now {
328                                Duration::ZERO
329                            } else {
330                                Duration::from_nanos(clock_info.timeout) - Duration::from_nanos(now)
331                            }
332                        } else {
333                            // if the timeout is not absolute, just use it as duration
334                            Duration::from_nanos(clock_info.timeout)
335                        };
336
337                        clock_subs.push((clock_info, s.userdata));
338                    }
339                    continue;
340                } else {
341                    error!("polling not implemented for these clocks yet");
342                    return Ok(Errno::Inval);
343                }
344            }
345            _ => {
346                continue;
347            }
348        };
349    }
350
351    let mut events_seen: u32 = 0;
352
353    let batch = {
354        // Build the batch of things we are going to poll
355        let state = ctx.data().state.clone();
356        let tasks = ctx.data().tasks().clone();
357        let mut guards = {
358            // We start by building a list of files we are going to poll
359            // and open a read lock on them all
360            let mut fd_guards = Vec::with_capacity(subs.len());
361
362            #[allow(clippy::significant_drop_in_scrutinee)]
363            for (fd, peb, s) in subs {
364                if let Some(fd) = fd {
365                    let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
366                    fd_guards.push(wasi_file_ref);
367                }
368            }
369
370            if fd_guards.len() > 10 {
371                let small_list: Vec<_> = fd_guards.iter().take(10).collect();
372                tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
373            } else {
374                tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
375            }
376
377            fd_guards
378        };
379
380        // Block polling the file descriptors
381        PollBatch::new(pid, tid, guards)
382    };
383
384    // If the time is infinite then we omit the time_to_sleep parameter
385    let timeout = match time_to_sleep {
386        Duration::ZERO => {
387            Span::current().record("timeout_ns", "nonblocking");
388            Some(Duration::ZERO)
389        }
390        Duration::MAX => {
391            Span::current().record("timeout_ns", "infinite");
392            None
393        }
394        time => {
395            Span::current().record("timeout_ns", time.as_millis());
396            Some(time)
397        }
398    };
399
400    // Function to process a timeout
401    let process_timeout = {
402        let clock_subs = clock_subs.clone();
403        |ctx: &FunctionEnvMut<'a, WasiEnv>| {
404            // The timeout has triggered so lets add that event
405            if clock_subs.is_empty() {
406                tracing::warn!("triggered_timeout (without any clock subscriptions)");
407            }
408            let mut evts = Vec::new();
409            for (clock_info, userdata) in clock_subs {
410                let evt = Event {
411                    userdata,
412                    error: Errno::Success,
413                    type_: Eventtype::Clock,
414                    u: EventUnion { clock: 0 },
415                };
416                Span::current().record(
417                    "seen",
418                    format!(
419                        "clock(id={},userdata={})",
420                        clock_info.clock_id as u32, evt.userdata
421                    ),
422                );
423                evts.push(evt);
424            }
425            evts
426        }
427    };
428
429    #[cfg(feature = "sys")]
430    if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
431        // Here, `poll_oneoff` is merely in a sleeping state
432        // due to a single relative timer event. This particular scenario was
433        // added following experimental findings indicating that std::thread::sleep
434        // yields more consistent sleep durations, allowing wasmer to meet
435        // real-time demands with greater precision.
436        if let Some(timeout) = timeout {
437            std::thread::sleep(timeout);
438            process_events(&ctx, process_timeout(&ctx));
439            return Ok(Errno::Success);
440        }
441    }
442
443    let tasks = env.tasks().clone();
444    let timeout = async move {
445        if let Some(timeout) = timeout {
446            tasks.sleep_now(timeout).await;
447        } else {
448            InfiniteSleep::default().await
449        }
450    };
451
452    // Build the trigger using the timeout
453    let trigger = async move {
454        tokio::select! {
455            res = batch => res,
456            _ = timeout => Err(Errno::Timedout)
457        }
458    };
459
460    // We replace the process events callback with another callback
461    // which will interpret the error codes
462    let process_events = {
463        let clock_subs = clock_subs.clone();
464        |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
465            // Process the result
466            match events {
467                Ok(evts) => {
468                    // If its a timeout then return an event for it
469                    if evts.len() == 1 {
470                        Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
471                    } else {
472                        Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
473                    }
474
475                    // Process the events
476                    process_events(ctx, evts)
477                }
478                Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
479                // If nonblocking the Errno::Again needs to be turned into an empty list
480                Err(Errno::Again) => process_events(ctx, Default::default()),
481                // Otherwise process the error
482                Err(err) => {
483                    tracing::warn!("failed to poll during deep sleep - {}", err);
484                    err
485                }
486            }
487        }
488    };
489
490    // If we are rewound then its time to process them
491    if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
492        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
493        process_events(&ctx, events);
494        return Ok(Errno::Success);
495    }
496
497    // We use asyncify with a deep sleep to wait on new IO events
498    let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
499        ctx,
500        Box::pin(trigger),
501    )?;
502    if let AsyncifyAction::Finish(mut ctx, events) = res {
503        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
504        process_events(&ctx, events);
505    }
506    Ok(Errno::Success)
507}