wasmer_wasix/syscalls/wasix/
epoll_wait.rs1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{
3 EpollCtl, EpollData, EpollEvent, EpollType, SubscriptionClock, Userdata,
4};
5
6use super::*;
7use crate::{
8 WasiInodes,
9 fs::{EpollFd, InodeValFilePollGuard, InodeValFilePollGuardJoin, POLL_GUARD_MAX_RET},
10 state::PollEventSet,
11 syscalls::*,
12};
13
14const TIMEOUT_FOREVER: u64 = u64::MAX;
15
16#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
19pub fn epoll_wait<M: MemorySize + 'static>(
20 mut ctx: FunctionEnvMut<'_, WasiEnv>,
21 epfd: WasiFd,
22 events: WasmPtr<EpollEvent<M>, M>,
23 maxevents: i32,
24 timeout: Timestamp,
25 ret_nevents: WasmPtr<M::Offset, M>,
26) -> Result<Errno, WasiError> {
27 WasiEnv::do_pending_operations(&mut ctx)?;
28
29 ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
30 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
31
32 if timeout == TIMEOUT_FOREVER {
33 tracing::trace!(maxevents, epfd, "waiting forever on wakers");
34 } else {
35 tracing::trace!(maxevents, epfd, timeout, "waiting on wakers");
36 }
37
38 let (rx, tx, subscriptions) = {
39 let fd_entry = wasi_try_ok!(ctx.data().state.fs.get_fd(epfd));
40 let mut inode_guard = fd_entry.inode.read();
41 match inode_guard.deref() {
42 Kind::Epoll {
43 rx,
44 tx,
45 subscriptions,
46 ..
47 } => (rx.clone(), tx.clone(), subscriptions.clone()),
48 _ => return Ok(Errno::Inval),
49 }
50 };
51
52 let work = {
56 let state = ctx.data().state.clone();
57 async move {
58 let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
59
60 loop {
62 let mut rx = rx.lock().await;
64
65 let mut removed = Vec::new();
68 let interest: Vec<_> = rx
69 .borrow_and_update()
70 .interest
71 .clone()
72 .into_iter()
73 .collect();
74 {
75 let mut guard = subscriptions.lock().unwrap();
76 for (fd, readiness) in interest {
77 removed.push((fd, readiness));
78
79 let (fd, joins) = match guard.get_mut(&fd) {
81 Some(a) => a,
82 None => {
83 tracing::debug!(fd, readiness=?readiness, "orphaned interest");
84 continue;
85 }
86 };
87
88 ret.push((fd.clone(), readiness));
90 if ret.len() + POLL_GUARD_MAX_RET >= (maxevents as usize) {
91 break;
92 }
93 }
94 }
95
96 if !removed.is_empty() {
98 tx.send_modify(|i| {
100 for (fd, readiness) in removed {
101 i.interest.remove(&(fd, readiness));
102 }
103 });
104 }
105
106 if !ret.is_empty() {
108 return Ok(ret);
109 }
110
111 rx.changed().await.ok();
113 }
114 }
115 };
116
117 let trigger = {
119 let timeout = if timeout == TIMEOUT_FOREVER {
120 None
121 } else {
122 Some(ctx.data().tasks().sleep_now(Duration::from_nanos(timeout)))
123 };
124 async move {
125 if let Some(timeout) = timeout {
126 tokio::select! {
127 res = work => res,
128 _ = timeout => Err(Errno::Timedout)
129 }
130 } else {
131 work.await
132 }
133 }
134 };
135
136 let process_events = {
139 let events_out = events;
140 move |ctx: &FunctionEnvMut<'_, WasiEnv>,
141 events: Result<Vec<(EpollFd, EpollType)>, Errno>| {
142 let env = ctx.data();
143 let memory = unsafe { env.memory_view(ctx) };
144
145 match events {
147 Ok(evts) => {
148 let mut nevents = 0;
149
150 let event_array = wasi_try_mem!(events_out.slice(
151 &memory,
152 wasi_try!(maxevents.try_into().map_err(|_| Errno::Overflow))
153 ));
154 for (event, readiness) in evts {
155 tracing::trace!(fd = event.fd, readiness = ?readiness, "triggered");
156 wasi_try_mem!(event_array.index(nevents as u64).write(EpollEvent {
157 events: readiness,
158 data: EpollData {
159 ptr: wasi_try!(event.ptr.try_into().map_err(|_| Errno::Overflow)),
160 fd: event.fd,
161 data1: event.data1,
162 data2: event.data2
163 }
164 }));
165 nevents += 1;
166 if nevents >= maxevents {
167 break;
168 }
169 }
170 tracing::trace!("{} events triggered", nevents);
171 wasi_try_mem!(ret_nevents.write(
172 &memory,
173 wasi_try!(nevents.try_into().map_err(|_| Errno::Overflow))
174 ));
175 Errno::Success
176 }
177 Err(Errno::Timedout) => {
178 wasi_try_mem!(ret_nevents.write(&memory, M::ZERO));
180 Errno::Success
181 }
182 Err(err) => {
183 tracing::warn!("failed to epoll during deep sleep - {}", err);
184 err
185 }
186 }
187 }
188 };
189
190 if let Some(events) =
192 unsafe { handle_rewind::<M, Result<Vec<(EpollFd, EpollType)>, Errno>>(&mut ctx) }
193 {
194 return Ok(process_events(&ctx, events));
195 }
196
197 let res = __asyncify_with_deep_sleep::<M, Result<Vec<(EpollFd, EpollType)>, Errno>, _>(
199 ctx,
200 Box::pin(trigger),
201 )?;
202 if let AsyncifyAction::Finish(mut ctx, events) = res {
203 Ok(process_events(&ctx, events))
204 } else {
205 Ok(Errno::Success)
206 }
207}