wasmer_wasix/syscalls/wasix/
epoll_wait.rs

1use wasmer_wasix_types::wasi::{EpollData, EpollEvent, EpollType, SubscriptionClock, Userdata};
2
3use super::*;
4use crate::{
5    WasiInodes,
6    fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
7    os::epoll::{EpollFd, drain_ready_events},
8    state::PollEventSet,
9    syscalls::*,
10};
11
12const TIMEOUT_FOREVER: u64 = u64::MAX;
13
14/// ### `epoll_wait()`
15/// Wait for an I/O event on an epoll file descriptor
16#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
17pub fn epoll_wait<M: MemorySize + 'static>(
18    mut ctx: FunctionEnvMut<'_, WasiEnv>,
19    epfd: WasiFd,
20    events: WasmPtr<EpollEvent<M>, M>,
21    maxevents: i32,
22    timeout: Timestamp,
23    ret_nevents: WasmPtr<M::Offset, M>,
24) -> Result<Errno, WasiError> {
25    WasiEnv::do_pending_operations(&mut ctx)?;
26    if maxevents <= 0 {
27        return Ok(Errno::Inval);
28    }
29    let maxevents = maxevents as usize;
30
31    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
32    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
33
34    if timeout == TIMEOUT_FOREVER {
35        tracing::trace!(maxevents, epfd, "waiting forever on wakers");
36    } else {
37        tracing::trace!(maxevents, epfd, timeout, "waiting on wakers");
38    }
39
40    let epoll_state = {
41        let fd_entry = wasi_try_ok!(ctx.data().state.fs.get_fd(epfd));
42        let mut inode_guard = fd_entry.inode.read();
43        match inode_guard.deref() {
44            Kind::Epoll { state } => state.clone(),
45            _ => return Ok(Errno::Inval),
46        }
47    };
48
49    // We enter a controlled loop that will continuously poll and react to
50    // epoll events until something of interest needs to be returned to the
51    // caller or a timeout happens
52    let work = {
53        async move {
54            // Loop until some events of interest are returned
55            loop {
56                let ret = drain_ready_events(&epoll_state, maxevents);
57                if !ret.is_empty() {
58                    return Ok(ret);
59                }
60
61                epoll_state.wait().await;
62            }
63        }
64    };
65
66    // Build the trigger using the timeout
67    let trigger = {
68        let timeout = if timeout == TIMEOUT_FOREVER {
69            None
70        } else {
71            Some(ctx.data().tasks().sleep_now(Duration::from_nanos(timeout)))
72        };
73        async move {
74            if let Some(timeout) = timeout {
75                tokio::select! {
76                    res = work => res,
77                    _ = timeout => Err(Errno::Timedout)
78                }
79            } else {
80                work.await
81            }
82        }
83    };
84
85    // We replace the process events callback with another callback
86    // which will interpret the error codes
87    let process_events = {
88        let events_out = events;
89        move |ctx: &FunctionEnvMut<'_, WasiEnv>,
90              events: Result<Vec<(EpollFd, EpollType)>, Errno>| {
91            let env = ctx.data();
92            let memory = unsafe { env.memory_view(ctx) };
93
94            // Process the result
95            match events {
96                Ok(evts) => {
97                    let mut nevents = 0;
98
99                    let event_array = wasi_try_mem!(events_out.slice(
100                        &memory,
101                        wasi_try!(maxevents.try_into().map_err(|_| Errno::Overflow))
102                    ));
103                    for (event, readiness) in evts {
104                        tracing::trace!(fd = event.fd(), readiness = ?readiness, "triggered");
105                        wasi_try_mem!(event_array.index(nevents as u64).write(EpollEvent {
106                            events: readiness,
107                            data: EpollData {
108                                ptr: wasi_try!(event.ptr().try_into().map_err(|_| Errno::Overflow)),
109                                fd: event.fd(),
110                                data1: event.data1(),
111                                data2: event.data2(),
112                            }
113                        }));
114                        nevents += 1;
115                        if nevents >= maxevents {
116                            break;
117                        }
118                    }
119                    tracing::trace!("{} events triggered", nevents);
120                    wasi_try_mem!(ret_nevents.write(
121                        &memory,
122                        wasi_try!(nevents.try_into().map_err(|_| Errno::Overflow))
123                    ));
124                    Errno::Success
125                }
126                Err(Errno::Timedout) => {
127                    // In a timeout scenario we return zero events
128                    wasi_try_mem!(ret_nevents.write(&memory, M::ZERO));
129                    Errno::Success
130                }
131                Err(err) => {
132                    tracing::warn!("failed to epoll during deep sleep - {}", err);
133                    err
134                }
135            }
136        }
137    };
138
139    // If we are rewound then its time to process them
140    if let Some(events) =
141        unsafe { handle_rewind::<M, Result<Vec<(EpollFd, EpollType)>, Errno>>(&mut ctx) }
142    {
143        return Ok(process_events(&ctx, events));
144    }
145
146    // We use asyncify with a deep sleep to wait on new IO events
147    let res = __asyncify_with_deep_sleep::<M, Result<Vec<(EpollFd, EpollType)>, Errno>, _>(
148        ctx,
149        Box::pin(trigger),
150    )?;
151    if let AsyncifyAction::Finish(mut ctx, events) = res {
152        Ok(process_events(&ctx, events))
153    } else {
154        Ok(Errno::Success)
155    }
156}