wasmer_wasix/syscalls/wasi/
poll_oneoff.rs

1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6    WasiInodes,
7    fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
8    state::PollEventSet,
9    syscalls::*,
10};
11
12/// An event that occurred.
13#[derive(Serialize, Deserialize)]
14pub enum EventResultType {
15    Clock(u8),
16    Fd(EventFdReadwrite),
17}
18
19/// An event that occurred.
20#[derive(Serialize, Deserialize)]
21pub struct EventResult {
22    /// User-provided value that got attached to `subscription::userdata`.
23    pub userdata: Userdata,
24    /// If non-zero, an error that occurred while processing the subscription request.
25    pub error: Errno,
26    /// Type of event that was triggered
27    pub type_: Eventtype,
28    /// The type of the event that occurred, and the contents of the event
29    pub inner: EventResultType,
30}
31impl EventResult {
32    pub fn into_event(self) -> Event {
33        Event {
34            userdata: self.userdata,
35            error: self.error,
36            type_: self.type_,
37            u: match self.inner {
38                EventResultType::Clock(id) => EventUnion { clock: id },
39                EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
40            },
41        }
42    }
43}
44
45/// ### `poll_oneoff()`
46/// Concurrently poll for a set of events
47///
48/// Inputs:
49/// - `const __wasi_subscription_t *in`
50///   The events to subscribe to
51/// - `__wasi_event_t *out`
52///   The events that have occured
53/// - `u32 nsubscriptions`
54///   The number of subscriptions and the number of events
55///
56/// Output:
57/// - `u32 nevents`
58///   The number of events seen
59#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
60pub fn poll_oneoff<M: MemorySize + 'static>(
61    mut ctx: FunctionEnvMut<'_, WasiEnv>,
62    in_: WasmPtr<Subscription, M>,
63    out_: WasmPtr<Event, M>,
64    nsubscriptions: M::Offset,
65    nevents: WasmPtr<M::Offset, M>,
66) -> Result<Errno, WasiError> {
67    WasiEnv::do_pending_operations(&mut ctx)?;
68
69    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
70    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
71
72    ctx.data_mut().poll_seed += 1;
73    let mut env = ctx.data();
74    let mut memory = unsafe { env.memory_view(&ctx) };
75
76    let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
77    let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
78    for n in 0..subscription_array.len() {
79        let n = (n + env.poll_seed) % subscription_array.len();
80        let sub = subscription_array.index(n);
81        let s = wasi_try_mem_ok!(sub.read());
82        subscriptions.push((None, PollEventSet::default(), s));
83    }
84
85    // We clear the number of events
86    wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
87
88    // Function to invoke once the poll is finished
89    let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
90        let mut env = ctx.data();
91        let mut memory = unsafe { env.memory_view(&ctx) };
92
93        // Process all the events that were triggered
94        let mut events_seen: u32 = 0;
95        let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
96        for event in triggered_events {
97            wasi_try_mem!(event_array.index(events_seen as u64).write(event));
98            events_seen += 1;
99        }
100        let events_seen: M::Offset = events_seen.into();
101        let out_ptr = nevents.deref(&memory);
102        wasi_try_mem!(out_ptr.write(events_seen));
103        Errno::Success
104    };
105
106    // Poll and receive all the events that triggered
107    poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
108}
109
110struct PollBatch {
111    pid: WasiProcessId,
112    tid: WasiThreadId,
113    evts: Vec<Event>,
114    joins: Vec<InodeValFilePollGuardJoin>,
115}
116impl PollBatch {
117    fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
118        Self {
119            pid,
120            tid,
121            evts: Vec::new(),
122            joins: fds
123                .into_iter()
124                .map(InodeValFilePollGuardJoin::new)
125                .collect(),
126        }
127    }
128}
129impl Future for PollBatch {
130    type Output = Result<Vec<EventResult>, Errno>;
131    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132        let pid = self.pid;
133        let tid = self.tid;
134        let mut done = false;
135
136        let mut evts = Vec::new();
137        for mut join in self.joins.iter_mut() {
138            let fd = join.fd();
139            let peb = join.peb();
140            let mut guard = Pin::new(join);
141            match guard.poll(cx) {
142                Poll::Pending => {}
143                Poll::Ready(e) => {
144                    for (evt, readiness) in e {
145                        tracing::trace!(
146                            fd,
147                            readiness = ?readiness,
148                            userdata = evt.userdata,
149                            ty = evt.type_ as u8,
150                            peb,
151                            "triggered"
152                        );
153                        evts.push(evt);
154                    }
155                }
156            }
157        }
158
159        if !evts.is_empty() {
160            return Poll::Ready(Ok(evts));
161        }
162
163        Poll::Pending
164    }
165}
166
167pub(crate) fn poll_fd_guard(
168    state: &Arc<WasiState>,
169    peb: PollEventSet,
170    fd: WasiFd,
171    s: Subscription,
172) -> Result<InodeValFilePollGuard, Errno> {
173    Ok(match fd {
174        __WASI_STDERR_FILENO => WasiInodes::stderr(&state.fs.fd_map)
175            .map(|g| g.into_poll_guard(fd, peb, s))
176            .map_err(fs_error_into_wasi_err)?,
177        __WASI_STDOUT_FILENO => WasiInodes::stdout(&state.fs.fd_map)
178            .map(|g| g.into_poll_guard(fd, peb, s))
179            .map_err(fs_error_into_wasi_err)?,
180        _ => {
181            let fd_entry = state.fs.get_fd(fd)?;
182            if !fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE) {
183                return Err(Errno::Access);
184            }
185            let inode = fd_entry.inode;
186
187            {
188                let guard = inode.read();
189                if let Some(guard) =
190                    crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref())
191                {
192                    guard
193                } else {
194                    return Err(Errno::Badf);
195                }
196            }
197        }
198    })
199}
200
201/// ### `poll_oneoff()`
202/// Concurrently poll for a set of events
203///
204/// Inputs:
205/// - `const __wasi_subscription_t *in`
206///   The events to subscribe to
207/// - `__wasi_event_t *out`
208///   The events that have occured
209/// - `u32 nsubscriptions`
210///   The number of subscriptions and the number of events
211///
212/// Output:
213/// - `u32 nevents`
214///   The number of events seen
215pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
216    mut ctx: FunctionEnvMut<'a, WasiEnv>,
217    mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
218    process_events: After,
219) -> Result<Errno, WasiError>
220where
221    After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
222{
223    let pid = ctx.data().pid();
224    let tid = ctx.data().tid();
225    let subs_len = subs.len();
226
227    // Determine if we are in silent polling mode
228    let mut env = ctx.data();
229    let state = ctx.data().state.deref();
230    let memory = unsafe { env.memory_view(&ctx) };
231
232    // These are used when we capture what clocks (timeouts) are being
233    // subscribed too
234    let clock_cnt = subs
235        .iter()
236        .filter(|a| a.2.type_ == Eventtype::Clock)
237        .count();
238    let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
239    let mut time_to_sleep = Duration::MAX;
240
241    // First we extract all the subscriptions into an array so that they
242    // can be processed
243    let mut env = ctx.data();
244    let state = ctx.data().state.deref();
245    let mut memory = unsafe { env.memory_view(&ctx) };
246    for (fd, peb, s) in subs.iter_mut() {
247        let fd = match s.type_ {
248            Eventtype::FdRead => {
249                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
250                match file_descriptor {
251                    __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
252                    fd => {
253                        let fd_entry = match state.fs.get_fd(fd) {
254                            Ok(a) => a,
255                            Err(err) => return Ok(err),
256                        };
257                        if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
258                            && fd_entry.inner.rights.contains(Rights::FD_READ))
259                        {
260                            return Ok(Errno::Access);
261                        }
262                    }
263                }
264                *fd = Some(file_descriptor);
265                *peb |= (PollEvent::PollIn as PollEventSet);
266                file_descriptor
267            }
268            Eventtype::FdWrite => {
269                let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
270                match file_descriptor {
271                    __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
272                    fd => {
273                        let fd_entry = match state.fs.get_fd(fd) {
274                            Ok(a) => a,
275                            Err(err) => return Ok(err),
276                        };
277                        if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
278                            && fd_entry.inner.rights.contains(Rights::FD_WRITE))
279                        {
280                            return Ok(Errno::Access);
281                        }
282                    }
283                }
284                *fd = Some(file_descriptor);
285                *peb |= (PollEvent::PollOut as PollEventSet);
286                file_descriptor
287            }
288            Eventtype::Clock => {
289                let clock_info = unsafe { s.data.clock };
290                if clock_info.clock_id == Clockid::Realtime
291                    || clock_info.clock_id == Clockid::Monotonic
292                {
293                    // Ignore duplicates
294                    if clock_subs
295                        .iter()
296                        .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
297                    {
298                        continue;
299                    }
300
301                    // If the timeout duration is zero then this is an immediate check rather than
302                    // a sleep itself
303                    if clock_info.timeout == 0 {
304                        time_to_sleep = Duration::MAX;
305                    } else if clock_info.timeout == 1 {
306                        time_to_sleep = Duration::ZERO;
307                        clock_subs.push((clock_info, s.userdata));
308                    } else {
309                        // if the timeout is specified as an absolute time in the future,
310                        // we should calculate the duration we need to sleep
311                        time_to_sleep = if clock_info
312                            .flags
313                            .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
314                        {
315                            let now = wasi_try_ok!(platform_clock_time_get(
316                                Snapshot0Clockid::Monotonic,
317                                1
318                            )) as u64;
319
320                            Duration::from_nanos(clock_info.timeout)
321                                - Duration::from_nanos(now as u64)
322                        } else {
323                            // if the timeout is not absolute, just use it as duration
324                            Duration::from_nanos(clock_info.timeout)
325                        };
326
327                        clock_subs.push((clock_info, s.userdata));
328                    }
329                    continue;
330                } else {
331                    error!("polling not implemented for these clocks yet");
332                    return Ok(Errno::Inval);
333                }
334            }
335            Eventtype::Unknown => {
336                continue;
337            }
338        };
339    }
340
341    let mut events_seen: u32 = 0;
342
343    let batch = {
344        // Build the batch of things we are going to poll
345        let state = ctx.data().state.clone();
346        let tasks = ctx.data().tasks().clone();
347        let mut guards = {
348            // We start by building a list of files we are going to poll
349            // and open a read lock on them all
350            let mut fd_guards = Vec::with_capacity(subs.len());
351
352            #[allow(clippy::significant_drop_in_scrutinee)]
353            for (fd, peb, s) in subs {
354                if let Some(fd) = fd {
355                    let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
356                    fd_guards.push(wasi_file_ref);
357                }
358            }
359
360            if fd_guards.len() > 10 {
361                let small_list: Vec<_> = fd_guards.iter().take(10).collect();
362                tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
363            } else {
364                tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
365            }
366
367            fd_guards
368        };
369
370        // Block polling the file descriptors
371        PollBatch::new(pid, tid, guards)
372    };
373
374    // If the time is infinite then we omit the time_to_sleep parameter
375    let timeout = match time_to_sleep {
376        Duration::ZERO => {
377            Span::current().record("timeout_ns", "nonblocking");
378            Some(Duration::ZERO)
379        }
380        Duration::MAX => {
381            Span::current().record("timeout_ns", "infinite");
382            None
383        }
384        time => {
385            Span::current().record("timeout_ns", time.as_millis());
386            Some(time)
387        }
388    };
389
390    // Function to process a timeout
391    let process_timeout = {
392        let clock_subs = clock_subs.clone();
393        |ctx: &FunctionEnvMut<'a, WasiEnv>| {
394            // The timeout has triggered so lets add that event
395            if clock_subs.is_empty() {
396                tracing::warn!("triggered_timeout (without any clock subscriptions)");
397            }
398            let mut evts = Vec::new();
399            for (clock_info, userdata) in clock_subs {
400                let evt = Event {
401                    userdata,
402                    error: Errno::Success,
403                    type_: Eventtype::Clock,
404                    u: EventUnion { clock: 0 },
405                };
406                Span::current().record(
407                    "seen",
408                    format!(
409                        "clock(id={},userdata={})",
410                        clock_info.clock_id as u32, evt.userdata
411                    ),
412                );
413                evts.push(evt);
414            }
415            evts
416        }
417    };
418
419    #[cfg(feature = "sys")]
420    if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
421        // Here, `poll_oneoff` is merely in a sleeping state
422        // due to a single relative timer event. This particular scenario was
423        // added following experimental findings indicating that std::thread::sleep
424        // yields more consistent sleep durations, allowing wasmer to meet
425        // real-time demands with greater precision.
426        if let Some(timeout) = timeout {
427            std::thread::sleep(timeout);
428            process_events(&ctx, process_timeout(&ctx));
429            return Ok(Errno::Success);
430        }
431    }
432
433    let tasks = env.tasks().clone();
434    let timeout = async move {
435        if let Some(timeout) = timeout {
436            tasks.sleep_now(timeout).await;
437        } else {
438            InfiniteSleep::default().await
439        }
440    };
441
442    // Build the trigger using the timeout
443    let trigger = async move {
444        tokio::select! {
445            res = batch => res,
446            _ = timeout => Err(Errno::Timedout)
447        }
448    };
449
450    // We replace the process events callback with another callback
451    // which will interpret the error codes
452    let process_events = {
453        let clock_subs = clock_subs.clone();
454        |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
455            // Process the result
456            match events {
457                Ok(evts) => {
458                    // If its a timeout then return an event for it
459                    if evts.len() == 1 {
460                        Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
461                    } else {
462                        Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
463                    }
464
465                    // Process the events
466                    process_events(ctx, evts)
467                }
468                Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
469                // If nonblocking the Errno::Again needs to be turned into an empty list
470                Err(Errno::Again) => process_events(ctx, Default::default()),
471                // Otherwise process the error
472                Err(err) => {
473                    tracing::warn!("failed to poll during deep sleep - {}", err);
474                    err
475                }
476            }
477        }
478    };
479
480    // If we are rewound then its time to process them
481    if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
482        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
483        process_events(&ctx, events);
484        return Ok(Errno::Success);
485    }
486
487    // We use asyncify with a deep sleep to wait on new IO events
488    let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
489        ctx,
490        Box::pin(trigger),
491    )?;
492    if let AsyncifyAction::Finish(mut ctx, events) = res {
493        let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
494        process_events(&ctx, events);
495    }
496    Ok(Errno::Success)
497}