wasmer_wasix/syscalls/wasi/
poll_oneoff.rs1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6 fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
7 state::PollEventSet,
8 syscalls::*,
9};
10
11#[derive(Serialize, Deserialize)]
13pub enum EventResultType {
14 Clock(u8),
15 Fd(EventFdReadwrite),
16}
17
18#[derive(Serialize, Deserialize)]
20pub struct EventResult {
21 pub userdata: Userdata,
23 pub error: Errno,
25 pub type_: Eventtype,
27 pub inner: EventResultType,
29}
30impl EventResult {
31 pub fn into_event(self) -> Event {
32 Event {
33 userdata: self.userdata,
34 error: self.error,
35 type_: self.type_,
36 u: match self.inner {
37 EventResultType::Clock(id) => EventUnion { clock: id },
38 EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
39 },
40 }
41 }
42}
43
44#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
59pub fn poll_oneoff<M: MemorySize + 'static>(
60 mut ctx: FunctionEnvMut<'_, WasiEnv>,
61 in_: WasmPtr<Subscription, M>,
62 out_: WasmPtr<Event, M>,
63 nsubscriptions: M::Offset,
64 nevents: WasmPtr<M::Offset, M>,
65) -> Result<Errno, WasiError> {
66 WasiEnv::do_pending_operations(&mut ctx)?;
67
68 if nsubscriptions == M::ZERO {
70 return Ok(Errno::Inval);
71 }
72
73 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
74 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
75
76 ctx.data_mut().poll_seed += 1;
77 let mut env = ctx.data();
78 let mut memory = unsafe { env.memory_view(&ctx) };
79
80 let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
81 let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
82 for n in 0..subscription_array.len() {
83 let n = (n + env.poll_seed) % subscription_array.len();
84 let sub = subscription_array.index(n);
85 let s = wasi_try_mem_ok!(sub.read());
86 subscriptions.push((None, PollEventSet::default(), s));
87 }
88
89 wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
91
92 let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
94 let mut env = ctx.data();
95 let mut memory = unsafe { env.memory_view(&ctx) };
96
97 let mut events_seen: u32 = 0;
99 let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
100 for event in triggered_events {
101 wasi_try_mem!(event_array.index(events_seen as u64).write(event));
102 events_seen += 1;
103 }
104 let events_seen: M::Offset = events_seen.into();
105 let out_ptr = nevents.deref(&memory);
106 wasi_try_mem!(out_ptr.write(events_seen));
107 Errno::Success
108 };
109
110 poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
112}
113
114struct PollBatch {
115 pid: WasiProcessId,
116 tid: WasiThreadId,
117 evts: Vec<Event>,
118 joins: Vec<InodeValFilePollGuardJoin>,
119}
120impl PollBatch {
121 fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
122 Self {
123 pid,
124 tid,
125 evts: Vec::new(),
126 joins: fds
127 .into_iter()
128 .map(InodeValFilePollGuardJoin::new)
129 .collect(),
130 }
131 }
132}
133impl Future for PollBatch {
134 type Output = Result<Vec<EventResult>, Errno>;
135 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 let pid = self.pid;
137 let tid = self.tid;
138 let mut done = false;
139
140 let mut evts = Vec::new();
141 for mut join in self.joins.iter_mut() {
142 let fd = join.fd();
143 let peb = join.peb();
144 let mut guard = Pin::new(join);
145 match guard.poll(cx) {
146 Poll::Pending => {}
147 Poll::Ready(e) => {
148 for (evt, readiness) in e {
149 tracing::trace!(
150 fd,
151 readiness = ?readiness,
152 userdata = evt.userdata,
153 ty = evt.type_ as u8,
154 peb,
155 "triggered"
156 );
157 evts.push(evt);
158 }
159 }
160 }
161 }
162
163 if !evts.is_empty() {
164 return Poll::Ready(Ok(evts));
165 }
166
167 Poll::Pending
168 }
169}
170
171pub(crate) fn poll_fd_guard(
172 state: &Arc<WasiState>,
173 peb: PollEventSet,
174 fd: WasiFd,
175 s: Subscription,
176) -> Result<InodeValFilePollGuard, Errno> {
177 let fd_entry = state.fs.get_fd(fd)?;
178 let requires_access = match s.type_ {
179 Eventtype::FdRead => Rights::FD_READ,
180 Eventtype::FdWrite => Rights::FD_WRITE,
181 _ => Rights::empty(),
182 };
183
184 if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
185 && fd_entry.inner.rights.contains(requires_access))
186 {
187 return Err(Errno::Access);
188 }
189 let inode = fd_entry.inode;
190
191 let guard = inode.read();
192 crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref()).ok_or(Errno::Badf)
193}
194
195pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
210 mut ctx: FunctionEnvMut<'a, WasiEnv>,
211 mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
212 process_events: After,
213) -> Result<Errno, WasiError>
214where
215 After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
216{
217 wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
218
219 let pid = ctx.data().pid();
220 let tid = ctx.data().tid();
221 let subs_len = subs.len();
222
223 let mut env = ctx.data();
225 let state = ctx.data().state.deref();
226 let memory = unsafe { env.memory_view(&ctx) };
227
228 let clock_cnt = subs
231 .iter()
232 .filter(|a| a.2.type_ == Eventtype::Clock)
233 .count();
234 let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
235 let mut time_to_sleep = Duration::MAX;
236
237 let mut env = ctx.data();
240 let state = ctx.data().state.deref();
241 let mut memory = unsafe { env.memory_view(&ctx) };
242 for (fd, peb, s) in subs.iter_mut() {
243 let fd = match s.type_ {
244 Eventtype::FdRead => {
245 let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
246 *fd = Some(file_descriptor);
247 *peb |= (PollEvent::PollIn as PollEventSet);
248 file_descriptor
249 }
250 Eventtype::FdWrite => {
251 let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
252 *fd = Some(file_descriptor);
253 *peb |= (PollEvent::PollOut as PollEventSet);
254 file_descriptor
255 }
256 Eventtype::Clock => {
257 let clock_info = unsafe { s.data.clock };
258 if clock_info.clock_id == Clockid::Realtime
259 || clock_info.clock_id == Clockid::Monotonic
260 {
261 if clock_subs
263 .iter()
264 .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
265 {
266 continue;
267 }
268
269 if clock_info.timeout == 0 {
272 time_to_sleep = Duration::MAX;
273 } else if clock_info.timeout == 1 {
274 time_to_sleep = Duration::ZERO;
275 clock_subs.push((clock_info, s.userdata));
276 } else {
277 time_to_sleep = if clock_info
280 .flags
281 .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
282 {
283 let now = wasi_try_ok!(platform_clock_time_get(
284 Snapshot0Clockid::Monotonic,
285 1
286 )) as u64;
287
288 if clock_info.timeout <= now {
289 Duration::ZERO
290 } else {
291 Duration::from_nanos(clock_info.timeout) - Duration::from_nanos(now)
292 }
293 } else {
294 Duration::from_nanos(clock_info.timeout)
296 };
297
298 clock_subs.push((clock_info, s.userdata));
299 }
300 continue;
301 } else {
302 error!("polling not implemented for these clocks yet");
303 return Ok(Errno::Inval);
304 }
305 }
306 _ => {
307 continue;
308 }
309 };
310 }
311
312 let mut events_seen: u32 = 0;
313
314 let batch = {
315 let state = ctx.data().state.clone();
317 let tasks = ctx.data().tasks().clone();
318 let mut guards = {
319 let mut fd_guards = Vec::with_capacity(subs.len());
322
323 #[allow(clippy::significant_drop_in_scrutinee)]
324 for (fd, peb, s) in subs {
325 if let Some(fd) = fd {
326 let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
327 fd_guards.push(wasi_file_ref);
328 }
329 }
330
331 if fd_guards.len() > 10 {
332 let small_list: Vec<_> = fd_guards.iter().take(10).collect();
333 tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
334 } else {
335 tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
336 }
337
338 fd_guards
339 };
340
341 PollBatch::new(pid, tid, guards)
343 };
344
345 let timeout = match time_to_sleep {
347 Duration::ZERO => {
348 Span::current().record("timeout_ns", "nonblocking");
349 Some(Duration::ZERO)
350 }
351 Duration::MAX => {
352 Span::current().record("timeout_ns", "infinite");
353 None
354 }
355 time => {
356 Span::current().record("timeout_ns", time.as_millis());
357 Some(time)
358 }
359 };
360
361 let process_timeout = {
363 let clock_subs = clock_subs.clone();
364 |ctx: &FunctionEnvMut<'a, WasiEnv>| {
365 if clock_subs.is_empty() {
367 tracing::warn!("triggered_timeout (without any clock subscriptions)");
368 }
369 let mut evts = Vec::new();
370 for (clock_info, userdata) in clock_subs {
371 let evt = Event {
372 userdata,
373 error: Errno::Success,
374 type_: Eventtype::Clock,
375 u: EventUnion { clock: 0 },
376 };
377 Span::current().record(
378 "seen",
379 format!(
380 "clock(id={},userdata={})",
381 clock_info.clock_id as u32, evt.userdata
382 ),
383 );
384 evts.push(evt);
385 }
386 evts
387 }
388 };
389
390 #[cfg(feature = "sys")]
391 if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
392 if let Some(timeout) = timeout {
398 std::thread::sleep(timeout);
399 process_events(&ctx, process_timeout(&ctx));
400 return Ok(Errno::Success);
401 }
402 }
403
404 let tasks = env.tasks().clone();
405 let timeout = async move {
406 if let Some(timeout) = timeout {
407 tasks.sleep_now(timeout).await;
408 } else {
409 InfiniteSleep::default().await
410 }
411 };
412
413 let trigger = async move {
415 tokio::select! {
416 res = batch => res,
417 _ = timeout => Err(Errno::Timedout)
418 }
419 };
420
421 let process_events = {
424 let clock_subs = clock_subs.clone();
425 |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
426 match events {
428 Ok(evts) => {
429 if evts.len() == 1 {
431 Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
432 } else {
433 Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
434 }
435
436 process_events(ctx, evts)
438 }
439 Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
440 Err(Errno::Again) => process_events(ctx, Default::default()),
442 Err(err) => {
444 tracing::warn!("failed to poll during deep sleep - {}", err);
445 err
446 }
447 }
448 }
449 };
450
451 if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
453 let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
454 process_events(&ctx, events);
455 return Ok(Errno::Success);
456 }
457
458 let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
460 ctx,
461 Box::pin(trigger),
462 )?;
463 if let AsyncifyAction::Finish(mut ctx, events) = res {
464 let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
465 process_events(&ctx, events);
466 }
467 Ok(Errno::Success)
468}