wasmer_wasix/syscalls/wasix/
epoll_wait.rs1use wasmer_wasix_types::wasi::{EpollData, EpollEvent, EpollType, SubscriptionClock, Userdata};
2
3use super::*;
4use crate::{
5 WasiInodes,
6 fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
7 os::epoll::{EpollFd, drain_ready_events},
8 state::PollEventSet,
9 syscalls::*,
10};
11
12const TIMEOUT_FOREVER: u64 = u64::MAX;
13
14#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
17pub fn epoll_wait<M: MemorySize + 'static>(
18 mut ctx: FunctionEnvMut<'_, WasiEnv>,
19 epfd: WasiFd,
20 events: WasmPtr<EpollEvent<M>, M>,
21 maxevents: i32,
22 timeout: Timestamp,
23 ret_nevents: WasmPtr<M::Offset, M>,
24) -> Result<Errno, WasiError> {
25 WasiEnv::do_pending_operations(&mut ctx)?;
26 if maxevents <= 0 {
27 return Ok(Errno::Inval);
28 }
29 let maxevents = maxevents as usize;
30
31 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
32 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
33
34 if timeout == TIMEOUT_FOREVER {
35 tracing::trace!(maxevents, epfd, "waiting forever on wakers");
36 } else {
37 tracing::trace!(maxevents, epfd, timeout, "waiting on wakers");
38 }
39
40 let epoll_state = {
41 let fd_entry = wasi_try_ok!(ctx.data().state.fs.get_fd(epfd));
42 let mut inode_guard = fd_entry.inode.read();
43 match inode_guard.deref() {
44 Kind::Epoll { state } => state.clone(),
45 _ => return Ok(Errno::Inval),
46 }
47 };
48
49 let work = {
53 async move {
54 loop {
56 let ret = drain_ready_events(&epoll_state, maxevents);
57 if !ret.is_empty() {
58 return Ok(ret);
59 }
60
61 epoll_state.wait().await;
62 }
63 }
64 };
65
66 let trigger = {
68 let timeout = if timeout == TIMEOUT_FOREVER {
69 None
70 } else {
71 Some(ctx.data().tasks().sleep_now(Duration::from_nanos(timeout)))
72 };
73 async move {
74 if let Some(timeout) = timeout {
75 tokio::select! {
76 res = work => res,
77 _ = timeout => Err(Errno::Timedout)
78 }
79 } else {
80 work.await
81 }
82 }
83 };
84
85 let process_events = {
88 let events_out = events;
89 move |ctx: &FunctionEnvMut<'_, WasiEnv>,
90 events: Result<Vec<(EpollFd, EpollType)>, Errno>| {
91 let env = ctx.data();
92 let memory = unsafe { env.memory_view(ctx) };
93
94 match events {
96 Ok(evts) => {
97 let mut nevents = 0;
98
99 let event_array = wasi_try_mem!(events_out.slice(
100 &memory,
101 wasi_try!(maxevents.try_into().map_err(|_| Errno::Overflow))
102 ));
103 for (event, readiness) in evts {
104 tracing::trace!(fd = event.fd(), readiness = ?readiness, "triggered");
105 wasi_try_mem!(event_array.index(nevents as u64).write(EpollEvent {
106 events: readiness,
107 data: EpollData {
108 ptr: wasi_try!(event.ptr().try_into().map_err(|_| Errno::Overflow)),
109 fd: event.fd(),
110 data1: event.data1(),
111 data2: event.data2(),
112 }
113 }));
114 nevents += 1;
115 if nevents >= maxevents {
116 break;
117 }
118 }
119 tracing::trace!("{} events triggered", nevents);
120 wasi_try_mem!(ret_nevents.write(
121 &memory,
122 wasi_try!(nevents.try_into().map_err(|_| Errno::Overflow))
123 ));
124 Errno::Success
125 }
126 Err(Errno::Timedout) => {
127 wasi_try_mem!(ret_nevents.write(&memory, M::ZERO));
129 Errno::Success
130 }
131 Err(err) => {
132 tracing::warn!("failed to epoll during deep sleep - {}", err);
133 err
134 }
135 }
136 }
137 };
138
139 if let Some(events) =
141 unsafe { handle_rewind::<M, Result<Vec<(EpollFd, EpollType)>, Errno>>(&mut ctx) }
142 {
143 return Ok(process_events(&ctx, events));
144 }
145
146 let res = __asyncify_with_deep_sleep::<M, Result<Vec<(EpollFd, EpollType)>, Errno>, _>(
148 ctx,
149 Box::pin(trigger),
150 )?;
151 if let AsyncifyAction::Finish(mut ctx, events) = res {
152 Ok(process_events(&ctx, events))
153 } else {
154 Ok(Errno::Success)
155 }
156}