wasmer_wasix/syscalls/wasi/
poll_oneoff.rs1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{Subclockflags, SubscriptionClock, Userdata};
3
4use super::*;
5use crate::{
6 WasiInodes,
7 fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
8 state::PollEventSet,
9 syscalls::*,
10};
11
12#[derive(Serialize, Deserialize)]
14pub enum EventResultType {
15 Clock(u8),
16 Fd(EventFdReadwrite),
17}
18
19#[derive(Serialize, Deserialize)]
21pub struct EventResult {
22 pub userdata: Userdata,
24 pub error: Errno,
26 pub type_: Eventtype,
28 pub inner: EventResultType,
30}
31impl EventResult {
32 pub fn into_event(self) -> Event {
33 Event {
34 userdata: self.userdata,
35 error: self.error,
36 type_: self.type_,
37 u: match self.inner {
38 EventResultType::Clock(id) => EventUnion { clock: id },
39 EventResultType::Fd(fd) => EventUnion { fd_readwrite: fd },
40 },
41 }
42 }
43}
44
45#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
60pub fn poll_oneoff<M: MemorySize + 'static>(
61 mut ctx: FunctionEnvMut<'_, WasiEnv>,
62 in_: WasmPtr<Subscription, M>,
63 out_: WasmPtr<Event, M>,
64 nsubscriptions: M::Offset,
65 nevents: WasmPtr<M::Offset, M>,
66) -> Result<Errno, WasiError> {
67 wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
68
69 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
70 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
71
72 ctx.data_mut().poll_seed += 1;
73 let mut env = ctx.data();
74 let mut memory = unsafe { env.memory_view(&ctx) };
75
76 let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
77 let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
78 for n in 0..subscription_array.len() {
79 let n = (n + env.poll_seed) % subscription_array.len();
80 let sub = subscription_array.index(n);
81 let s = wasi_try_mem_ok!(sub.read());
82 subscriptions.push((None, PollEventSet::default(), s));
83 }
84
85 wasi_try_mem_ok!(nevents.write(&memory, M::ZERO));
87
88 let process_events = |ctx: &FunctionEnvMut<'_, WasiEnv>, triggered_events: Vec<Event>| {
90 let mut env = ctx.data();
91 let mut memory = unsafe { env.memory_view(&ctx) };
92
93 let mut events_seen: u32 = 0;
95 let event_array = wasi_try_mem!(out_.slice(&memory, nsubscriptions));
96 for event in triggered_events {
97 wasi_try_mem!(event_array.index(events_seen as u64).write(event));
98 events_seen += 1;
99 }
100 let events_seen: M::Offset = events_seen.into();
101 let out_ptr = nevents.deref(&memory);
102 wasi_try_mem!(out_ptr.write(events_seen));
103 Errno::Success
104 };
105
106 poll_oneoff_internal::<M, _>(ctx, subscriptions, process_events)
108}
109
110struct PollBatch {
111 pid: WasiProcessId,
112 tid: WasiThreadId,
113 evts: Vec<Event>,
114 joins: Vec<InodeValFilePollGuardJoin>,
115}
116impl PollBatch {
117 fn new(pid: WasiProcessId, tid: WasiThreadId, fds: Vec<InodeValFilePollGuard>) -> Self {
118 Self {
119 pid,
120 tid,
121 evts: Vec::new(),
122 joins: fds
123 .into_iter()
124 .map(InodeValFilePollGuardJoin::new)
125 .collect(),
126 }
127 }
128}
129impl Future for PollBatch {
130 type Output = Result<Vec<EventResult>, Errno>;
131 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132 let pid = self.pid;
133 let tid = self.tid;
134 let mut done = false;
135
136 let mut evts = Vec::new();
137 for mut join in self.joins.iter_mut() {
138 let fd = join.fd();
139 let peb = join.peb();
140 let mut guard = Pin::new(join);
141 match guard.poll(cx) {
142 Poll::Pending => {}
143 Poll::Ready(e) => {
144 for (evt, readiness) in e {
145 tracing::trace!(
146 fd,
147 readiness = ?readiness,
148 userdata = evt.userdata,
149 ty = evt.type_ as u8,
150 peb,
151 "triggered"
152 );
153 evts.push(evt);
154 }
155 }
156 }
157 }
158
159 if !evts.is_empty() {
160 return Poll::Ready(Ok(evts));
161 }
162
163 Poll::Pending
164 }
165}
166
167pub(crate) fn poll_fd_guard(
168 state: &Arc<WasiState>,
169 peb: PollEventSet,
170 fd: WasiFd,
171 s: Subscription,
172) -> Result<InodeValFilePollGuard, Errno> {
173 Ok(match fd {
174 __WASI_STDERR_FILENO => WasiInodes::stderr(&state.fs.fd_map)
175 .map(|g| g.into_poll_guard(fd, peb, s))
176 .map_err(fs_error_into_wasi_err)?,
177 __WASI_STDOUT_FILENO => WasiInodes::stdout(&state.fs.fd_map)
178 .map(|g| g.into_poll_guard(fd, peb, s))
179 .map_err(fs_error_into_wasi_err)?,
180 _ => {
181 let fd_entry = state.fs.get_fd(fd)?;
182 if !fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE) {
183 return Err(Errno::Access);
184 }
185 let inode = fd_entry.inode;
186
187 {
188 let guard = inode.read();
189 if let Some(guard) =
190 crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref())
191 {
192 guard
193 } else {
194 return Err(Errno::Badf);
195 }
196 }
197 }
198 })
199}
200
201pub(crate) fn poll_oneoff_internal<'a, M: MemorySize, After>(
216 mut ctx: FunctionEnvMut<'a, WasiEnv>,
217 mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
218 process_events: After,
219) -> Result<Errno, WasiError>
220where
221 After: FnOnce(&FunctionEnvMut<'a, WasiEnv>, Vec<Event>) -> Errno,
222{
223 wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
224
225 let pid = ctx.data().pid();
226 let tid = ctx.data().tid();
227 let subs_len = subs.len();
228
229 let mut env = ctx.data();
231 let state = ctx.data().state.deref();
232 let memory = unsafe { env.memory_view(&ctx) };
233
234 let clock_cnt = subs
237 .iter()
238 .filter(|a| a.2.type_ == Eventtype::Clock)
239 .count();
240 let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
241 let mut time_to_sleep = Duration::MAX;
242
243 let mut env = ctx.data();
246 let state = ctx.data().state.deref();
247 let mut memory = unsafe { env.memory_view(&ctx) };
248 for (fd, peb, s) in subs.iter_mut() {
249 let fd = match s.type_ {
250 Eventtype::FdRead => {
251 let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
252 match file_descriptor {
253 __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
254 fd => {
255 let fd_entry = match state.fs.get_fd(fd) {
256 Ok(a) => a,
257 Err(err) => return Ok(err),
258 };
259 if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
260 && fd_entry.inner.rights.contains(Rights::FD_READ))
261 {
262 return Ok(Errno::Access);
263 }
264 }
265 }
266 *fd = Some(file_descriptor);
267 *peb |= (PollEvent::PollIn as PollEventSet);
268 file_descriptor
269 }
270 Eventtype::FdWrite => {
271 let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
272 match file_descriptor {
273 __WASI_STDIN_FILENO | __WASI_STDOUT_FILENO | __WASI_STDERR_FILENO => (),
274 fd => {
275 let fd_entry = match state.fs.get_fd(fd) {
276 Ok(a) => a,
277 Err(err) => return Ok(err),
278 };
279 if !(fd_entry.inner.rights.contains(Rights::POLL_FD_READWRITE)
280 && fd_entry.inner.rights.contains(Rights::FD_WRITE))
281 {
282 return Ok(Errno::Access);
283 }
284 }
285 }
286 *fd = Some(file_descriptor);
287 *peb |= (PollEvent::PollOut as PollEventSet);
288 file_descriptor
289 }
290 Eventtype::Clock => {
291 let clock_info = unsafe { s.data.clock };
292 if clock_info.clock_id == Clockid::Realtime
293 || clock_info.clock_id == Clockid::Monotonic
294 {
295 if clock_subs
297 .iter()
298 .any(|c| c.0.clock_id == clock_info.clock_id && c.1 == s.userdata)
299 {
300 continue;
301 }
302
303 if clock_info.timeout == 0 {
306 time_to_sleep = Duration::MAX;
307 } else if clock_info.timeout == 1 {
308 time_to_sleep = Duration::ZERO;
309 clock_subs.push((clock_info, s.userdata));
310 } else {
311 time_to_sleep = if clock_info
314 .flags
315 .contains(Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME)
316 {
317 let now = wasi_try_ok!(platform_clock_time_get(
318 Snapshot0Clockid::Monotonic,
319 1
320 )) as u64;
321
322 if clock_info.timeout <= now {
323 Duration::ZERO
324 } else {
325 Duration::from_nanos(clock_info.timeout) - Duration::from_nanos(now)
326 }
327 } else {
328 Duration::from_nanos(clock_info.timeout)
330 };
331
332 clock_subs.push((clock_info, s.userdata));
333 }
334 continue;
335 } else {
336 error!("polling not implemented for these clocks yet");
337 return Ok(Errno::Inval);
338 }
339 }
340 Eventtype::Unknown => {
341 continue;
342 }
343 };
344 }
345
346 let mut events_seen: u32 = 0;
347
348 let batch = {
349 let state = ctx.data().state.clone();
351 let tasks = ctx.data().tasks().clone();
352 let mut guards = {
353 let mut fd_guards = Vec::with_capacity(subs.len());
356
357 #[allow(clippy::significant_drop_in_scrutinee)]
358 for (fd, peb, s) in subs {
359 if let Some(fd) = fd {
360 let wasi_file_ref = wasi_try_ok!(poll_fd_guard(&state, peb, fd, s));
361 fd_guards.push(wasi_file_ref);
362 }
363 }
364
365 if fd_guards.len() > 10 {
366 let small_list: Vec<_> = fd_guards.iter().take(10).collect();
367 tracing::Span::current().record("fd_guards", format!("{small_list:?}..."));
368 } else {
369 tracing::Span::current().record("fd_guards", format!("{fd_guards:?}"));
370 }
371
372 fd_guards
373 };
374
375 PollBatch::new(pid, tid, guards)
377 };
378
379 let timeout = match time_to_sleep {
381 Duration::ZERO => {
382 Span::current().record("timeout_ns", "nonblocking");
383 Some(Duration::ZERO)
384 }
385 Duration::MAX => {
386 Span::current().record("timeout_ns", "infinite");
387 None
388 }
389 time => {
390 Span::current().record("timeout_ns", time.as_millis());
391 Some(time)
392 }
393 };
394
395 let process_timeout = {
397 let clock_subs = clock_subs.clone();
398 |ctx: &FunctionEnvMut<'a, WasiEnv>| {
399 if clock_subs.is_empty() {
401 tracing::warn!("triggered_timeout (without any clock subscriptions)");
402 }
403 let mut evts = Vec::new();
404 for (clock_info, userdata) in clock_subs {
405 let evt = Event {
406 userdata,
407 error: Errno::Success,
408 type_: Eventtype::Clock,
409 u: EventUnion { clock: 0 },
410 };
411 Span::current().record(
412 "seen",
413 format!(
414 "clock(id={},userdata={})",
415 clock_info.clock_id as u32, evt.userdata
416 ),
417 );
418 evts.push(evt);
419 }
420 evts
421 }
422 };
423
424 #[cfg(feature = "sys")]
425 if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
426 if let Some(timeout) = timeout {
432 std::thread::sleep(timeout);
433 process_events(&ctx, process_timeout(&ctx));
434 return Ok(Errno::Success);
435 }
436 }
437
438 let tasks = env.tasks().clone();
439 let timeout = async move {
440 if let Some(timeout) = timeout {
441 tasks.sleep_now(timeout).await;
442 } else {
443 InfiniteSleep::default().await
444 }
445 };
446
447 let trigger = async move {
449 tokio::select! {
450 res = batch => res,
451 _ = timeout => Err(Errno::Timedout)
452 }
453 };
454
455 let process_events = {
458 let clock_subs = clock_subs.clone();
459 |ctx: &FunctionEnvMut<'a, WasiEnv>, events: Result<Vec<Event>, Errno>| {
460 match events {
462 Ok(evts) => {
463 if evts.len() == 1 {
465 Span::current().record("seen", format!("{:?}", evts.first().unwrap()));
466 } else {
467 Span::current().record("seen", format!("trigger_cnt=({})", evts.len()));
468 }
469
470 process_events(ctx, evts)
472 }
473 Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
474 Err(Errno::Again) => process_events(ctx, Default::default()),
476 Err(err) => {
478 tracing::warn!("failed to poll during deep sleep - {}", err);
479 err
480 }
481 }
482 }
483 };
484
485 if let Some(events) = unsafe { handle_rewind::<M, Result<Vec<EventResult>, Errno>>(&mut ctx) } {
487 let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
488 process_events(&ctx, events);
489 return Ok(Errno::Success);
490 }
491
492 let res = __asyncify_with_deep_sleep::<M, Result<Vec<EventResult>, Errno>, _>(
494 ctx,
495 Box::pin(trigger),
496 )?;
497 if let AsyncifyAction::Finish(mut ctx, events) = res {
498 let events = events.map(|events| events.into_iter().map(EventResult::into_event).collect());
499 process_events(&ctx, events);
500 }
501 Ok(Errno::Success)
502}