wasmer_wasix/syscalls/wasi/
poll_oneoff.rs

1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6    WasiInodes,
7    fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
8    state::PollEventSet,
9    syscalls::*,
10};
11
12/// An event that occurred.
13#[derive(Serialize, Deserialize)]
14pub enum EventResultType {
15    Clock(u8),
16    Fd(EventFdReadwrite),
17}
18
19/// An event that occurred.
20#[derive(Serialize, Deserialize)]
21pub struct EventResult {
22    /// User-provided value that got attached to `subscription::userdata`.
23    pub userdata: Userdata,
24    /// If non-zero, an error that occurred while processing the subscription request.
25    pub error: Errno,
26    /// Type of event that was triggered
27    pub type_: Eventtype,
28    /// The type of the event that occurred, and the contents of the event
29    pub inner: EventResultType,
30}
31impl EventResult {
32    pub fn into_event(self) -> Event {
33        Event {
34            userdata: self.userdata,
35            error: self.error,
36            type_: self.type_,
37            u: match self.inner {
38                EventResultType::Clock(id) => EventUnion { clock: id },
39                EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
40            },
41        }
42    }
43}
44
45/// ### `poll_oneoff()`
46/// Concurrently poll for a set of events
47///
48/// Inputs:
49/// - `const __wasi_subscription_t *in`
50///     The events to subscribe to
51/// - `__wasi_event_t *out`
52///     The events that have occured
53/// - `u32 nsubscriptions`
54///     The number of subscriptions and the number of events
55///
56/// Output:
57/// - `u32 nevents`
58///     The number of events seen
59#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
60pub fn poll_oneoff<M: MemorySize + 'static>(
61    mut ctx: FunctionEnvMut<'_, WasiEnv>,
62    in_: WasmPtr<Subscription, M>,
63    out_: WasmPtr<Event, M>,
64    nsubscriptions: M::Offset,
65    nevents: WasmPtr<M::Offset, M>,
66) -> Result<Errno, WasiError> {
67    wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
68
69    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
70    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
71
72    ctx.data_mut().poll_seed += 1;
73    let mut env = ctx.data();
74    let mut memory = unsafe { env.memory_view(&ctx) };
75
76    let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
77    let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
78    for n in 0..subscription_array.len() {
79        let n = (n + env.poll_seed) % subscription_array.len();
80        let sub = subscription_array.index(n);
81        let s = wasi_try_mem_ok!(sub.read());
82        subscriptions.push((None, PollEventSet::default(), s));
83    }
84
85    // We clear the number of events
86    wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
87
88    // Function to invoke once the poll is finished
89    let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
90        let mut env = ctx.data();
91        let mut memory = unsafe { env.memory_view(&ctx) };
92
93        // Process all the events that were triggered
94        let mut events_seen: u32 = 0;
95        let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
96        for event in triggered_events {
97            wasi_try_mem!(event_array.index(events_seen as u64).write(event));
98            events_seen += 1;
99        }
100        let events_seen: M::Offset = events_seen.into();
101        let out_ptr = nevents.deref(&memory);
102        wasi_try_mem!(out_ptr.write(events_seen));
103        Errno::Success
104    };
105
106    // Poll and receive all the events that triggered
107    poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
108}
109
110struct PollBatch {
111    pid: WasiProcessId,
112    tid: WasiThreadId,
113    evts: Vec<Event>,
114    joins: Vec<InodeValFilePollGuardJoin>,
115}
116impl PollBatch {
117    fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
118        Self {
119            pid,
120            tid,
121            evts: Vec::new(),
122            joins: fds
123                .into_iter()
124                .map(InodeValFilePollGuardJoin::new)
125                .collect(),
126        }
127    }
128}
129impl Future for PollBatch {
130    type Output = Result<Vec<EventResult>, Errno>;
131    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132        let pid = self.pid;
133        let tid = self.tid;
134        let mut done = false;
135
136        let mut evts = Vec::new();
137        for mut join in self.joins.iter_mut() {
138            let fd = join.fd();
139            let peb = join.peb();
140            let mut guard = Pin::new(join);
141            match guard.poll(cx) {
142                Poll::Pending => {}
143                Poll::Ready(e) => {
144                    for (evt, readiness) in e {
145                        tracing::trace!(
146                            fd,
147                            readiness = ?readiness,
148                            userdata = evt.userdata,
149                            ty = evt.type_ as u8,
150                            peb,
151                            "triggered"
152                        );
153                        evts.push(evt);
154                    }
155                }
156            }
157        }
158
159        if !evts.is_empty() {
160            return Poll::Ready(Ok(evts));
161        }
162
163        Poll::Pending
164    }
165}
166
167pub(crate) fn poll_fd_guard(
168    state: &Arc<WasiState>,
169    peb: PollEventSet,
170    fd: WasiFd,
171    s: Subscription,
172) -> Result<InodeValFilePollGuard, Errno> {
173    Ok(match fd {
174        __WASI_STDERR_FILENO => WasiInodes::stderr(&state.fs.fd_map)
175            .map(|g| g.into_poll_guard(fd, peb, s))
176            .map_err(fs_error_into_wasi_err)?,
177        __WASI_STDOUT_FILENO => WasiInodes::stdout(&state.fs.fd_map)
178            .map(|g| g.into_poll_guard(fd, peb, s))
179            .map_err(fs_error_into_wasi_err)?,
180        _ => {
181            let fd_entry = state.fs.get_fd(fd)?;
182            if !fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE) {
183                return Err(Errno::Access);
184            }
185            let inode = fd_entry.inode;
186
187            {
188                let guard = inode.read();
189                if let Some(guard) =
190                    crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref())
191                {
192                    guard
193                } else {
194                    return Err(Errno::Badf);
195                }
196            }
197        }
198    })
199}
200
201/// ### `poll_oneoff()`
202/// Concurrently poll for a set of events
203///
204/// Inputs:
205/// - `const __wasi_subscription_t *in`
206///   The events to subscribe to
207/// - `__wasi_event_t *out`
208///   The events that have occured
209/// - `u32 nsubscriptions`
210///   The number of subscriptions and the number of events
211///
212/// Output:
213/// - `u32 nevents`
214///   The number of events seen
215pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
216    mut ctx: FunctionEnvMut<'a, WasiEnv>,
217    mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
218    process_events: After,
219) -> Result<Errno, WasiError>
220where
221    After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
222{
223    wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
224
225    let pid = ctx.data().pid();
226    let tid = ctx.data().tid();
227    let subs_len = subs.len();
228
229    // Determine if we are in silent polling mode
230    let mut env = ctx.data();
231    let state = ctx.data().state.deref();
232    let memory = unsafe { env.memory_view(&ctx) };
233
234    // These are used when we capture what clocks (timeouts) are being
235    // subscribed too
236    let clock_cnt = subs
237        .iter()
238        .filter(|a| a.2.type_ == Eventtype::Clock)
239        .count();
240    let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
241    let mut time_to_sleep = Duration::MAX;
242
243    // First we extract all the subscriptions into an array so that they
244    // can be processed
245    let mut env = ctx.data();
246    let state = ctx.data().state.deref();
247    let mut memory = unsafe { env.memory_view(&ctx) };
248    for (fd, peb, s) in subs.iter_mut() {
249        let fd = match s.type_ {
250            Eventtype::FdRead => {
251                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
252                match file_descriptor {
253                    __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
254                    fd => {
255                        let fd_entry = match state.fs.get_fd(fd) {
256                            Ok(a) => a,
257                            Err(err) => return Ok(err),
258                        };
259                        if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
260                            && fd_entry.inner.rights.contains(Rights::FD_READ))
261                        {
262                            return Ok(Errno::Access);
263                        }
264                    }
265                }
266                *fd = Some(file_descriptor);
267                *peb |= (PollEvent::PollIn as PollEventSet);
268                file_descriptor
269            }
270            Eventtype::FdWrite => {
271                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
272                match file_descriptor {
273                    __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
274                    fd => {
275                        let fd_entry = match state.fs.get_fd(fd) {
276                            Ok(a) => a,
277                            Err(err) => return Ok(err),
278                        };
279                        if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
280                            && fd_entry.inner.rights.contains(Rights::FD_WRITE))
281                        {
282                            return Ok(Errno::Access);
283                        }
284                    }
285                }
286                *fd = Some(file_descriptor);
287                *peb |= (PollEvent::PollOut as PollEventSet);
288                file_descriptor
289            }
290            Eventtype::Clock => {
291                let clock_info = unsafe { s.data.clock };
292                if clock_info.clock_id == Clockid::Realtime
293                    || clock_info.clock_id == Clockid::Monotonic
294                {
295                    // Ignore duplicates
296                    if clock_subs
297                        .iter()
298                        .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
299                    {
300                        continue;
301                    }
302
303                    // If the timeout duration is zero then this is an immediate check rather than
304                    // a sleep itself
305                    if clock_info.timeout == 0 {
306                        time_to_sleep = Duration::MAX;
307                    } else if clock_info.timeout == 1 {
308                        time_to_sleep = Duration::ZERO;
309                        clock_subs.push((clock_info, s.userdata));
310                    } else {
311                        // if the timeout is specified as an absolute time in the future,
312                        // we should calculate the duration we need to sleep
313                        time_to_sleep = if clock_info
314                            .flags
315                            .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
316                        {
317                            let now = wasi_try_ok!(platform_clock_time_get(
318                                Snapshot0Clockid::Monotonic,
319                                1
320                            )) as u64;
321
322                            if clock_info.timeout <= now {
323                                Duration::ZERO
324                            } else {
325                                Duration::from_nanos(clock_info.timeout) - Duration::from_nanos(now)
326                            }
327                        } else {
328                            // if the timeout is not absolute, just use it as duration
329                            Duration::from_nanos(clock_info.timeout)
330                        };
331
332                        clock_subs.push((clock_info, s.userdata));
333                    }
334                    continue;
335                } else {
336                    error!("polling not implemented for these clocks yet");
337                    return Ok(Errno::Inval);
338                }
339            }
340            Eventtype::Unknown => {
341                continue;
342            }
343        };
344    }
345
346    let mut events_seen: u32 = 0;
347
348    let batch = {
349        // Build the batch of things we are going to poll
350        let state = ctx.data().state.clone();
351        let tasks = ctx.data().tasks().clone();
352        let mut guards = {
353            // We start by building a list of files we are going to poll
354            // and open a read lock on them all
355            let mut fd_guards = Vec::with_capacity(subs.len());
356
357            #[allow(clippy::significant_drop_in_scrutinee)]
358            for (fd, peb, s) in subs {
359                if let Some(fd) = fd {
360                    let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
361                    fd_guards.push(wasi_file_ref);
362                }
363            }
364
365            if fd_guards.len() > 10 {
366                let small_list: Vec<_> = fd_guards.iter().take(10).collect();
367                tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
368            } else {
369                tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
370            }
371
372            fd_guards
373        };
374
375        // Block polling the file descriptors
376        PollBatch::new(pid, tid, guards)
377    };
378
379    // If the time is infinite then we omit the time_to_sleep parameter
380    let timeout = match time_to_sleep {
381        Duration::ZERO => {
382            Span::current().record("timeout_ns", "nonblocking");
383            Some(Duration::ZERO)
384        }
385        Duration::MAX => {
386            Span::current().record("timeout_ns", "infinite");
387            None
388        }
389        time => {
390            Span::current().record("timeout_ns", time.as_millis());
391            Some(time)
392        }
393    };
394
395    // Function to process a timeout
396    let process_timeout = {
397        let clock_subs = clock_subs.clone();
398        |ctx: &FunctionEnvMut<'a, WasiEnv>| {
399            // The timeout has triggered so lets add that event
400            if clock_subs.is_empty() {
401                tracing::warn!("triggered_timeout (without any clock subscriptions)");
402            }
403            let mut evts = Vec::new();
404            for (clock_info, userdata) in clock_subs {
405                let evt = Event {
406                    userdata,
407                    error: Errno::Success,
408                    type_: Eventtype::Clock,
409                    u: EventUnion { clock: 0 },
410                };
411                Span::current().record(
412                    "seen",
413                    format!(
414                        "clock(id={},userdata={})",
415                        clock_info.clock_id as u32, evt.userdata
416                    ),
417                );
418                evts.push(evt);
419            }
420            evts
421        }
422    };
423
424    #[cfg(feature = "sys")]
425    if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
426        // Here, `poll_oneoff` is merely in a sleeping state
427        // due to a single relative timer event. This particular scenario was
428        // added following experimental findings indicating that std::thread::sleep
429        // yields more consistent sleep durations, allowing wasmer to meet
430        // real-time demands with greater precision.
431        if let Some(timeout) = timeout {
432            std::thread::sleep(timeout);
433            process_events(&ctx, process_timeout(&ctx));
434            return Ok(Errno::Success);
435        }
436    }
437
438    let tasks = env.tasks().clone();
439    let timeout = async move {
440        if let Some(timeout) = timeout {
441            tasks.sleep_now(timeout).await;
442        } else {
443            InfiniteSleep::default().await
444        }
445    };
446
447    // Build the trigger using the timeout
448    let trigger = async move {
449        tokio::select! {
450            res = batch => res,
451            _ = timeout => Err(Errno::Timedout)
452        }
453    };
454
455    // We replace the process events callback with another callback
456    // which will interpret the error codes
457    let process_events = {
458        let clock_subs = clock_subs.clone();
459        |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
460            // Process the result
461            match events {
462                Ok(evts) => {
463                    // If its a timeout then return an event for it
464                    if evts.len() == 1 {
465                        Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
466                    } else {
467                        Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
468                    }
469
470                    // Process the events
471                    process_events(ctx, evts)
472                }
473                Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
474                // If nonblocking the Errno::Again needs to be turned into an empty list
475                Err(Errno::Again) => process_events(ctx, Default::default()),
476                // Otherwise process the error
477                Err(err) => {
478                    tracing::warn!("failed to poll during deep sleep - {}", err);
479                    err
480                }
481            }
482        }
483    };
484
485    // If we are rewound then its time to process them
486    if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
487        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
488        process_events(&ctx, events);
489        return Ok(Errno::Success);
490    }
491
492    // We use asyncify with a deep sleep to wait on new IO events
493    let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
494        ctx,
495        Box::pin(trigger),
496    )?;
497    if let AsyncifyAction::Finish(mut ctx, events) = res {
498        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
499        process_events(&ctx, events);
500    }
501    Ok(Errno::Success)
502}