wasmer_wasix/syscalls/wasix/
epoll_wait.rs

1use serde::{Deserialize, Serialize};
2use wasmer_wasix_types::wasi::{
3    EpollCtl, EpollData, EpollEvent, EpollType, SubscriptionClock, Userdata,
4};
5
6use super::*;
7use crate::{
8    WasiInodes,
9    fs::{EpollFd, InodeValFilePollGuard, InodeValFilePollGuardJoin, POLL_GUARD_MAX_RET},
10    state::PollEventSet,
11    syscalls::*,
12};
13
14const TIMEOUT_FOREVER: u64 = u64::MAX;
15
16/// ### `epoll_wait()`
17/// Wait for an I/O event on an epoll file descriptor
18#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty), ret)]
19pub fn epoll_wait<M: MemorySize + 'static>(
20    mut ctx: FunctionEnvMut<'_, WasiEnv>,
21    epfd: WasiFd,
22    events: WasmPtr<EpollEvent<M>, M>,
23    maxevents: i32,
24    timeout: Timestamp,
25    ret_nevents: WasmPtr<M::Offset, M>,
26) -> Result<Errno, WasiError> {
27    WasiEnv::do_pending_operations(&mut ctx)?;
28
29    ctx = wasi_try_ok!(maybe_backoff::<M>(ctx)?);
30    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
31
32    if timeout == TIMEOUT_FOREVER {
33        tracing::trace!(maxevents, epfd, "waiting forever on wakers");
34    } else {
35        tracing::trace!(maxevents, epfd, timeout, "waiting on wakers");
36    }
37
38    let (rx, tx, subscriptions) = {
39        let fd_entry = wasi_try_ok!(ctx.data().state.fs.get_fd(epfd));
40        let mut inode_guard = fd_entry.inode.read();
41        match inode_guard.deref() {
42            Kind::Epoll {
43                rx,
44                tx,
45                subscriptions,
46                ..
47            } => (rx.clone(), tx.clone(), subscriptions.clone()),
48            _ => return Ok(Errno::Inval),
49        }
50    };
51
52    // We enter a controlled loop that will continuously poll and react to
53    // epoll events until something of interest needs to be returned to the
54    // caller or a timeout happens
55    let work = {
56        let state = ctx.data().state.clone();
57        async move {
58            let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
59
60            // Loop until some events of interest are returned
61            loop {
62                // We wait for our turn then read the next event from the
63                let mut rx = rx.lock().await;
64
65                // We first extract all the interest that has been registered
66                // and cycle through it
67                let mut removed = Vec::new();
68                let interest: Vec<_> = rx
69                    .borrow_and_update()
70                    .interest
71                    .clone()
72                    .into_iter()
73                    .collect();
74                {
75                    let mut guard = subscriptions.lock().unwrap();
76                    for (fd, readiness) in interest {
77                        removed.push((fd, readiness));
78
79                        // Get the data for this fd
80                        let (fd, joins) = match guard.get_mut(&fd) {
81                            Some(a) => a,
82                            None => {
83                                tracing::debug!(fd, readiness=?readiness, "orphaned interest");
84                                continue;
85                            }
86                        };
87
88                        // Record the event
89                        ret.push((fd.clone(), readiness));
90                        if ret.len() + POLL_GUARD_MAX_RET >= (maxevents as usize) {
91                            break;
92                        }
93                    }
94                }
95
96                // Remove anything that was signaled
97                if !removed.is_empty() {
98                    // Now update the notification system
99                    tx.send_modify(|i| {
100                        for (fd, readiness) in removed {
101                            i.interest.remove(&(fd, readiness));
102                        }
103                    });
104                }
105
106                // If we have results then return them
107                if !ret.is_empty() {
108                    return Ok(ret);
109                }
110
111                // Otherwise we wait to be triggered again
112                rx.changed().await.ok();
113            }
114        }
115    };
116
117    // Build the trigger using the timeout
118    let trigger = {
119        let timeout = if timeout == TIMEOUT_FOREVER {
120            None
121        } else {
122            Some(ctx.data().tasks().sleep_now(Duration::from_nanos(timeout)))
123        };
124        async move {
125            if let Some(timeout) = timeout {
126                tokio::select! {
127                    res = work => res,
128                    _ = timeout => Err(Errno::Timedout)
129                }
130            } else {
131                work.await
132            }
133        }
134    };
135
136    // We replace the process events callback with another callback
137    // which will interpret the error codes
138    let process_events = {
139        let events_out = events;
140        move |ctx: &FunctionEnvMut<'_, WasiEnv>,
141              events: Result<Vec<(EpollFd, EpollType)>, Errno>| {
142            let env = ctx.data();
143            let memory = unsafe { env.memory_view(ctx) };
144
145            // Process the result
146            match events {
147                Ok(evts) => {
148                    let mut nevents = 0;
149
150                    let event_array = wasi_try_mem!(events_out.slice(
151                        &memory,
152                        wasi_try!(maxevents.try_into().map_err(|_| Errno::Overflow))
153                    ));
154                    for (event, readiness) in evts {
155                        tracing::trace!(fd = event.fd, readiness = ?readiness, "triggered");
156                        wasi_try_mem!(event_array.index(nevents as u64).write(EpollEvent {
157                            events: readiness,
158                            data: EpollData {
159                                ptr: wasi_try!(event.ptr.try_into().map_err(|_| Errno::Overflow)),
160                                fd: event.fd,
161                                data1: event.data1,
162                                data2: event.data2
163                            }
164                        }));
165                        nevents += 1;
166                        if nevents >= maxevents {
167                            break;
168                        }
169                    }
170                    tracing::trace!("{} events triggered", nevents);
171                    wasi_try_mem!(ret_nevents.write(
172                        &memory,
173                        wasi_try!(nevents.try_into().map_err(|_| Errno::Overflow))
174                    ));
175                    Errno::Success
176                }
177                Err(Errno::Timedout) => {
178                    // In a timeout scenario we return zero events
179                    wasi_try_mem!(ret_nevents.write(&memory, M::ZERO));
180                    Errno::Success
181                }
182                Err(err) => {
183                    tracing::warn!("failed to epoll during deep sleep - {}", err);
184                    err
185                }
186            }
187        }
188    };
189
190    // If we are rewound then its time to process them
191    if let Some(events) =
192        unsafe { handle_rewind::<M, Result<Vec<(EpollFd, EpollType)>, Errno>>(&mut ctx) }
193    {
194        return Ok(process_events(&ctx, events));
195    }
196
197    // We use asyncify with a deep sleep to wait on new IO events
198    let res = __asyncify_with_deep_sleep::<M, Result<Vec<(EpollFd, EpollType)>, Errno>, _>(
199        ctx,
200        Box::pin(trigger),
201    )?;
202    if let AsyncifyAction::Finish(mut ctx, events) = res {
203        Ok(process_events(&ctx, events))
204    } else {
205        Ok(Errno::Success)
206    }
207}