wasmer_wasix/syscalls/wasi/
poll_oneoff.rs1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6 WasiInodes,
7 fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
8 state::PollEventSet,
9 syscalls::*,
10};
11
12#[derive(Serialize, Deserialize)]
14pub enum EventResultType {
15 Clock(u8),
16 Fd(EventFdReadwrite),
17}
18
19#[derive(Serialize, Deserialize)]
21pub struct EventResult {
22 pub userdata: Userdata,
24 pub error: Errno,
26 pub type_: Eventtype,
28 pub inner: EventResultType,
30}
31impl EventResult {
32 pub fn into_event(self) -> Event {
33 Event {
34 userdata: self.userdata,
35 error: self.error,
36 type_: self.type_,
37 u: match self.inner {
38 EventResultType::Clock(id) => EventUnion { clock: id },
39 EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
40 },
41 }
42 }
43}
44
45#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
60pub fn poll_oneoff<M: MemorySize + 'static>(
61 mut ctx: FunctionEnvMut<'_, WasiEnv>,
62 in_: WasmPtr<Subscription, M>,
63 out_: WasmPtr<Event, M>,
64 nsubscriptions: M::Offset,
65 nevents: WasmPtr<M::Offset, M>,
66) -> Result<Errno, WasiError> {
67 WasiEnv::do_pending_operations(&mut ctx)?;
68
69 if nsubscriptions == M::ZERO {
71 return Ok(Errno::Inval);
72 }
73
74 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
75 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
76
77 ctx.data_mut().poll_seed += 1;
78 let mut env = ctx.data();
79 let mut memory = unsafe { env.memory_view(&ctx) };
80
81 let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
82 let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
83 for n in 0..subscription_array.len() {
84 let n = (n + env.poll_seed) % subscription_array.len();
85 let sub = subscription_array.index(n);
86 let s = wasi_try_mem_ok!(sub.read());
87 subscriptions.push((None, PollEventSet::default(), s));
88 }
89
90 wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
92
93 let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
95 let mut env = ctx.data();
96 let mut memory = unsafe { env.memory_view(&ctx) };
97
98 let mut events_seen: u32 = 0;
100 let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
101 for event in triggered_events {
102 wasi_try_mem!(event_array.index(events_seen as u64).write(event));
103 events_seen += 1;
104 }
105 let events_seen: M::Offset = events_seen.into();
106 let out_ptr = nevents.deref(&memory);
107 wasi_try_mem!(out_ptr.write(events_seen));
108 Errno::Success
109 };
110
111 poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
113}
114
115struct PollBatch {
116 pid: WasiProcessId,
117 tid: WasiThreadId,
118 evts: Vec<Event>,
119 joins: Vec<InodeValFilePollGuardJoin>,
120}
121impl PollBatch {
122 fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
123 Self {
124 pid,
125 tid,
126 evts: Vec::new(),
127 joins: fds
128 .into_iter()
129 .map(InodeValFilePollGuardJoin::new)
130 .collect(),
131 }
132 }
133}
134impl Future for PollBatch {
135 type Output = Result<Vec<EventResult>, Errno>;
136 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137 let pid = self.pid;
138 let tid = self.tid;
139 let mut done = false;
140
141 let mut evts = Vec::new();
142 for mut join in self.joins.iter_mut() {
143 let fd = join.fd();
144 let peb = join.peb();
145 let mut guard = Pin::new(join);
146 match guard.poll(cx) {
147 Poll::Pending => {}
148 Poll::Ready(e) => {
149 for (evt, readiness) in e {
150 tracing::trace!(
151 fd,
152 readiness = ?readiness,
153 userdata = evt.userdata,
154 ty = evt.type_ as u8,
155 peb,
156 "triggered"
157 );
158 evts.push(evt);
159 }
160 }
161 }
162 }
163
164 if !evts.is_empty() {
165 return Poll::Ready(Ok(evts));
166 }
167
168 Poll::Pending
169 }
170}
171
172pub(crate) fn poll_fd_guard(
173 state: &Arc<WasiState>,
174 peb: PollEventSet,
175 fd: WasiFd,
176 s: Subscription,
177) -> Result<InodeValFilePollGuard, Errno> {
178 Ok(match fd {
179 __WASI_STDERR_FILENO => WasiInodes::stderr(&state.fs.fd_map)
180 .map(|g| g.into_poll_guard(fd, peb, s))
181 .map_err(fs_error_into_wasi_err)?,
182 __WASI_STDOUT_FILENO => WasiInodes::stdout(&state.fs.fd_map)
183 .map(|g| g.into_poll_guard(fd, peb, s))
184 .map_err(fs_error_into_wasi_err)?,
185 _ => {
186 let fd_entry = state.fs.get_fd(fd)?;
187 if !fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE) {
188 return Err(Errno::Access);
189 }
190 let inode = fd_entry.inode;
191
192 {
193 let guard = inode.read();
194 if let Some(guard) =
195 crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref())
196 {
197 guard
198 } else {
199 return Err(Errno::Badf);
200 }
201 }
202 }
203 })
204}
205
206pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
221 mut ctx: FunctionEnvMut<'a, WasiEnv>,
222 mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
223 process_events: After,
224) -> Result<Errno, WasiError>
225where
226 After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
227{
228 wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
229
230 let pid = ctx.data().pid();
231 let tid = ctx.data().tid();
232 let subs_len = subs.len();
233
234 let mut env = ctx.data();
236 let state = ctx.data().state.deref();
237 let memory = unsafe { env.memory_view(&ctx) };
238
239 let clock_cnt = subs
242 .iter()
243 .filter(|a| a.2.type_ == Eventtype::Clock)
244 .count();
245 let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
246 let mut time_to_sleep = Duration::MAX;
247
248 let mut env = ctx.data();
251 let state = ctx.data().state.deref();
252 let mut memory = unsafe { env.memory_view(&ctx) };
253 for (fd, peb, s) in subs.iter_mut() {
254 let fd = match s.type_ {
255 Eventtype::FdRead => {
256 let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
257 match file_descriptor {
258 __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
259 fd => {
260 let fd_entry = match state.fs.get_fd(fd) {
261 Ok(a) => a,
262 Err(err) => return Ok(err),
263 };
264 if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
265 && fd_entry.inner.rights.contains(Rights::FD_READ))
266 {
267 return Ok(Errno::Access);
268 }
269 }
270 }
271 *fd = Some(file_descriptor);
272 *peb |= (PollEvent::PollIn as PollEventSet);
273 file_descriptor
274 }
275 Eventtype::FdWrite => {
276 let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
277 match file_descriptor {
278 __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
279 fd => {
280 let fd_entry = match state.fs.get_fd(fd) {
281 Ok(a) => a,
282 Err(err) => return Ok(err),
283 };
284 if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
285 && fd_entry.inner.rights.contains(Rights::FD_WRITE))
286 {
287 return Ok(Errno::Access);
288 }
289 }
290 }
291 *fd = Some(file_descriptor);
292 *peb |= (PollEvent::PollOut as PollEventSet);
293 file_descriptor
294 }
295 Eventtype::Clock => {
296 let clock_info = unsafe { s.data.clock };
297 if clock_info.clock_id == Clockid::Realtime
298 || clock_info.clock_id == Clockid::Monotonic
299 {
300 if clock_subs
302 .iter()
303 .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
304 {
305 continue;
306 }
307
308 if clock_info.timeout == 0 {
311 time_to_sleep = Duration::MAX;
312 } else if clock_info.timeout == 1 {
313 time_to_sleep = Duration::ZERO;
314 clock_subs.push((clock_info, s.userdata));
315 } else {
316 time_to_sleep = if clock_info
319 .flags
320 .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
321 {
322 let now = wasi_try_ok!(platform_clock_time_get(
323 Snapshot0Clockid::Monotonic,
324 1
325 )) as u64;
326
327 if clock_info.timeout <= now {
328 Duration::ZERO
329 } else {
330 Duration::from_nanos(clock_info.timeout) - Duration::from_nanos(now)
331 }
332 } else {
333 Duration::from_nanos(clock_info.timeout)
335 };
336
337 clock_subs.push((clock_info, s.userdata));
338 }
339 continue;
340 } else {
341 error!("polling not implemented for these clocks yet");
342 return Ok(Errno::Inval);
343 }
344 }
345 _ => {
346 continue;
347 }
348 };
349 }
350
351 let mut events_seen: u32 = 0;
352
353 let batch = {
354 let state = ctx.data().state.clone();
356 let tasks = ctx.data().tasks().clone();
357 let mut guards = {
358 let mut fd_guards = Vec::with_capacity(subs.len());
361
362 #[allow(clippy::significant_drop_in_scrutinee)]
363 for (fd, peb, s) in subs {
364 if let Some(fd) = fd {
365 let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
366 fd_guards.push(wasi_file_ref);
367 }
368 }
369
370 if fd_guards.len() > 10 {
371 let small_list: Vec<_> = fd_guards.iter().take(10).collect();
372 tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
373 } else {
374 tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
375 }
376
377 fd_guards
378 };
379
380 PollBatch::new(pid, tid, guards)
382 };
383
384 let timeout = match time_to_sleep {
386 Duration::ZERO => {
387 Span::current().record("timeout_ns", "nonblocking");
388 Some(Duration::ZERO)
389 }
390 Duration::MAX => {
391 Span::current().record("timeout_ns", "infinite");
392 None
393 }
394 time => {
395 Span::current().record("timeout_ns", time.as_millis());
396 Some(time)
397 }
398 };
399
400 let process_timeout = {
402 let clock_subs = clock_subs.clone();
403 |ctx: &FunctionEnvMut<'a, WasiEnv>| {
404 if clock_subs.is_empty() {
406 tracing::warn!("triggered_timeout (without any clock subscriptions)");
407 }
408 let mut evts = Vec::new();
409 for (clock_info, userdata) in clock_subs {
410 let evt = Event {
411 userdata,
412 error: Errno::Success,
413 type_: Eventtype::Clock,
414 u: EventUnion { clock: 0 },
415 };
416 Span::current().record(
417 "seen",
418 format!(
419 "clock(id={},userdata={})",
420 clock_info.clock_id as u32, evt.userdata
421 ),
422 );
423 evts.push(evt);
424 }
425 evts
426 }
427 };
428
429 #[cfg(feature = "sys")]
430 if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
431 if let Some(timeout) = timeout {
437 std::thread::sleep(timeout);
438 process_events(&ctx, process_timeout(&ctx));
439 return Ok(Errno::Success);
440 }
441 }
442
443 let tasks = env.tasks().clone();
444 let timeout = async move {
445 if let Some(timeout) = timeout {
446 tasks.sleep_now(timeout).await;
447 } else {
448 InfiniteSleep::default().await
449 }
450 };
451
452 let trigger = async move {
454 tokio::select! {
455 res = batch => res,
456 _ = timeout => Err(Errno::Timedout)
457 }
458 };
459
460 let process_events = {
463 let clock_subs = clock_subs.clone();
464 |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
465 match events {
467 Ok(evts) => {
468 if evts.len() == 1 {
470 Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
471 } else {
472 Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
473 }
474
475 process_events(ctx, evts)
477 }
478 Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
479 Err(Errno::Again) => process_events(ctx, Default::default()),
481 Err(err) => {
483 tracing::warn!("failed to poll during deep sleep - {}", err);
484 err
485 }
486 }
487 }
488 };
489
490 if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
492 let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
493 process_events(&ctx, events);
494 return Ok(Errno::Success);
495 }
496
497 let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
499 ctx,
500 Box::pin(trigger),
501 )?;
502 if let AsyncifyAction::Finish(mut ctx, events) = res {
503 let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
504 process_events(&ctx, events);
505 }
506 Ok(Errno::Success)
507}