1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
use serde::{Deserialize, Serialize};
use wasmer_wasix_types::wasi::{
EpollCtl, EpollData, EpollEvent, EpollType, SubscriptionClock, Userdata,
};
use super::*;
use crate::{
fs::{EpollFd, InodeValFilePollGuard, InodeValFilePollGuardJoin, POLL_GUARD_MAX_RET},
state::PollEventSet,
syscalls::*,
WasiInodes,
};
const TIMEOUT_FOREVER: u64 = u64::MAX;
/// ### `epoll_wait()`
/// Wait for an I/O event on an epoll file descriptor
#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
pub fn epoll_wait<'a, M: MemorySize + 'static>(
mut ctx: FunctionEnvMut<'a, WasiEnv>,
epfd: WasiFd,
events: WasmPtr<EpollEvent<M>, M>,
maxevents: i32,
timeout: Timestamp,
ret_nevents: WasmPtr<M::Offset, M>,
) -> Result<Errno, WasiError> {
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
if timeout == TIMEOUT_FOREVER {
tracing::trace!(maxevents, epfd, "waiting forever on wakers");
} else {
tracing::trace!(maxevents, epfd, timeout, "waiting on wakers");
}
let (rx, tx, subscriptions) = {
let fd_entry = wasi_try_ok!(ctx.data().state.fs.get_fd(epfd));
let mut inode_guard = fd_entry.inode.read();
match inode_guard.deref() {
Kind::Epoll {
rx,
tx,
subscriptions,
..
} => (rx.clone(), tx.clone(), subscriptions.clone()),
_ => return Ok(Errno::Inval),
}
};
// We enter a controlled loop that will continuously poll and react to
// epoll events until something of interest needs to be returned to the
// caller or a timeout happens
let work = {
let state = ctx.data().state.clone();
async move {
let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
// Loop until some events of interest are returned
loop {
// We wait for our turn then read the next event from the
let mut rx = rx.lock().await;
// We first extract all the interest that has been registered
// and cycle through it
let mut removed = Vec::new();
let interest: Vec<_> = rx
.borrow_and_update()
.interest
.clone()
.into_iter()
.collect();
{
let mut guard = subscriptions.lock().unwrap();
for (fd, readiness) in interest {
removed.push((fd, readiness));
// Get the data for this fd
let (fd, joins) = match guard.get_mut(&fd) {
Some(a) => a,
None => {
tracing::debug!(fd, readiness=?readiness, "orphaned interest");
continue;
}
};
// We have to renew any joins that have now been spent
for join in joins {
if join.is_spent() {
join.renew();
}
}
// Record the event
ret.push((fd.clone(), readiness));
if ret.len() + POLL_GUARD_MAX_RET >= (maxevents as usize) {
break;
}
}
}
// Remove anything that was signaled
if !removed.is_empty() {
// Now update the notification system
tx.send_modify(|i| {
for (fd, readiness) in removed {
i.interest.remove(&(fd, readiness));
}
});
}
// If we have results then return them
if !ret.is_empty() {
return Ok(ret);
}
// Otherwise we wait to be triggered again
rx.changed().await.ok();
}
}
};
// Build the trigger using the timeout
let trigger = {
let timeout = if timeout == TIMEOUT_FOREVER {
None
} else {
Some(ctx.data().tasks().sleep_now(Duration::from_nanos(timeout)))
};
async move {
if let Some(timeout) = timeout {
tokio::select! {
res = work => res,
_ = timeout => Err(Errno::Timedout)
}
} else {
work.await
}
}
};
// We replace the process events callback with another callback
// which will interpret the error codes
let process_events = {
let events_out = events;
move |ctx: &FunctionEnvMut<'a, WasiEnv>,
events: Result<Vec<(EpollFd, EpollType)>, Errno>| {
let env = ctx.data();
let memory = unsafe { env.memory_view(ctx) };
// Process the result
match events {
Ok(evts) => {
let mut nevents = 0;
let event_array = wasi_try_mem!(events_out.slice(
&memory,
wasi_try!(maxevents.try_into().map_err(|_| Errno::Overflow))
));
for (event, readiness) in evts {
tracing::trace!(fd = event.fd, readiness = ?readiness, "triggered");
wasi_try_mem!(event_array.index(nevents as u64).write(EpollEvent {
events: readiness,
data: EpollData {
ptr: wasi_try!(event.ptr.try_into().map_err(|_| Errno::Overflow)),
fd: event.fd,
data1: event.data1,
data2: event.data2
}
}));
nevents += 1;
if nevents >= maxevents {
break;
}
}
tracing::trace!("{} events triggered", nevents);
wasi_try_mem!(ret_nevents.write(
&memory,
wasi_try!(nevents.try_into().map_err(|_| Errno::Overflow))
));
Errno::Success
}
Err(Errno::Timedout) => {
// In a timeout scenario we return zero events
wasi_try_mem!(ret_nevents.write(&memory, M::ZERO));
Errno::Success
}
Err(err) => {
tracing::warn!("failed to epoll during deep sleep - {}", err);
err
}
}
}
};
// If we are rewound then its time to process them
if let Some(events) =
unsafe { handle_rewind::<M, Result<Vec<(EpollFd, EpollType)>, Errno>>(&mut ctx) }
{
return Ok(process_events(&ctx, events));
}
// We use asyncify with a deep sleep to wait on new IO events
let res = __asyncify_with_deep_sleep::<M, Result<Vec<(EpollFd, EpollType)>, Errno>, _>(
ctx,
Box::pin(trigger),
)?;
if let AsyncifyAction::Finish(mut ctx, events) = res {
Ok(process_events(&ctx, events))
} else {
Ok(Errno::Success)
}
}