wasmer_wasix/syscalls/wasix/
epoll_ctl.rs

1use serde::{Deserialize, Serialize};
2use tokio::sync::{mpsc::UnboundedSender, watch};
3use virtual_mio::{InterestHandler, InterestType};
4use virtual_net::net_error_into_io_err;
5use wasmer_wasix_types::wasi::{
6    EpollCtl, EpollEvent, EpollEventCtl, EpollType, SubscriptionClock, SubscriptionUnion, Userdata,
7};
8
9use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
10
11use futures::Future;
12
13use super::*;
14use crate::{
15    WasiInodes,
16    fs::{
17        EpollFd, EpollInterest, EpollJoinGuard, InodeValFilePollGuard, InodeValFilePollGuardJoin,
18        InodeValFilePollGuardMode, POLL_GUARD_MAX_RET,
19    },
20    state::PollEventSet,
21    syscalls::*,
22};
23
24/// ### `epoll_ctl()`
25/// Modifies an epoll interest list
26/// Output:
27/// - `Fd fd`
28///   The new file handle that is used to modify or wait on the interest list
29#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty, fd), ret)]
30pub fn epoll_ctl<M: MemorySize + 'static>(
31    mut ctx: FunctionEnvMut<'_, WasiEnv>,
32    epfd: WasiFd,
33    op: EpollCtl,
34    fd: WasiFd,
35    event_ref: WasmPtr<EpollEvent<M>, M>,
36) -> Result<Errno, WasiError> {
37    WasiEnv::do_pending_operations(&mut ctx)?;
38
39    let env = ctx.data();
40
41    let memory = unsafe { env.memory_view(&ctx) };
42    let event = if event_ref.offset() != M::ZERO {
43        Some(wasi_try_mem_ok!(event_ref.read(&memory)))
44    } else {
45        None
46    };
47
48    let event_ctl = event.map(|evt| EpollEventCtl {
49        events: evt.events,
50        ptr: evt.data.ptr.into(),
51        fd: evt.data.fd,
52        data1: evt.data.data1,
53        data2: evt.data.data2,
54    });
55
56    wasi_try_ok!(epoll_ctl_internal(
57        &mut ctx,
58        epfd,
59        op,
60        fd,
61        event_ctl.as_ref()
62    )?);
63    let env = ctx.data();
64
65    #[cfg(feature = "journal")]
66    if env.enable_journal {
67        JournalEffector::save_epoll_ctl(&mut ctx, epfd, op, fd, event_ctl).map_err(|err| {
68            tracing::error!("failed to save epoll_create event - {}", err);
69            WasiError::Exit(ExitCode::from(Errno::Fault))
70        })?;
71    }
72
73    Ok(Errno::Success)
74}
75
76pub(crate) fn epoll_ctl_internal(
77    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
78    epfd: WasiFd,
79    op: EpollCtl,
80    fd: WasiFd,
81    event_ctl: Option<&EpollEventCtl>,
82) -> Result<Result<(), Errno>, WasiError> {
83    let env = ctx.data();
84    let fd_entry = wasi_try_ok_ok!(env.state.fs.get_fd(epfd));
85
86    let tasks = env.tasks().clone();
87    let mut inode_guard = fd_entry.inode.read();
88    match inode_guard.deref() {
89        Kind::Epoll {
90            subscriptions, tx, ..
91        } => {
92            if let EpollCtl::Del | EpollCtl::Mod = op {
93                let mut guard = subscriptions.lock().unwrap();
94                guard.remove(&fd);
95
96                tracing::trace!(fd, "unregistering waker");
97            }
98            if let EpollCtl::Add | EpollCtl::Mod = op {
99                if let Some(event) = event_ctl {
100                    let epoll_fd = EpollFd {
101                        events: event.events,
102                        ptr: event.ptr,
103                        fd: event.fd,
104                        data1: event.data1,
105                        data2: event.data2,
106                    };
107
108                    // Output debug
109                    tracing::trace!(
110                        peb = ?event.events,
111                        ptr = ?event.ptr,
112                        data1 = event.data1,
113                        data2 = event.data2,
114                        fd = event.fd,
115                        "registering waker"
116                    );
117
118                    {
119                        // We have to register the subscription before we register the waker
120                        // as otherwise there is a race condition
121                        let mut guard = subscriptions.lock().unwrap();
122                        guard.insert(event.fd, (epoll_fd.clone(), Vec::new()));
123                    }
124
125                    // Now we register the epoll waker
126                    let tx = tx.clone();
127                    let mut fd_guard =
128                        wasi_try_ok_ok!(register_epoll_handler(&env.state, &epoll_fd, tx));
129
130                    // After the guards are created we need to attach them to the subscription
131                    let mut guard = subscriptions.lock().unwrap();
132                    if let Some(subs) = guard.get_mut(&event.fd) {
133                        if let Some(fd_guard) = fd_guard {
134                            subs.1.push(fd_guard);
135                        }
136                    }
137                }
138            }
139            Ok(Ok(()))
140        }
141        _ => Ok(Err(Errno::Inval)),
142    }
143}
144
145#[derive(Debug)]
146pub struct EpollHandler {
147    fd: WasiFd,
148    tx: Arc<watch::Sender<EpollInterest>>,
149}
150impl EpollHandler {
151    pub fn new(fd: WasiFd, tx: Arc<watch::Sender<EpollInterest>>) -> Box<Self> {
152        Box::new(Self { fd, tx })
153    }
154}
155impl InterestHandler for EpollHandler {
156    fn push_interest(&mut self, interest: InterestType) {
157        let readiness = match interest {
158            InterestType::Readable => EpollType::EPOLLIN,
159            InterestType::Writable => EpollType::EPOLLOUT,
160            InterestType::Closed => EpollType::EPOLLHUP,
161            InterestType::Error => EpollType::EPOLLERR,
162        };
163        self.tx.send_modify(|i| {
164            i.interest.insert((self.fd, readiness));
165        });
166    }
167
168    fn pop_interest(&mut self, interest: InterestType) -> bool {
169        let readiness = match interest {
170            InterestType::Readable => EpollType::EPOLLIN,
171            InterestType::Writable => EpollType::EPOLLOUT,
172            InterestType::Closed => EpollType::EPOLLHUP,
173            InterestType::Error => EpollType::EPOLLERR,
174        };
175        let mut ret = false;
176        self.tx.send_modify(move |i| {
177            ret = i.interest.iter().any(|(_, b)| *b == readiness);
178            i.interest.retain(|(_, b)| *b != readiness);
179        });
180        ret
181    }
182
183    fn has_interest(&self, interest: InterestType) -> bool {
184        let readiness = match interest {
185            InterestType::Readable => EpollType::EPOLLIN,
186            InterestType::Writable => EpollType::EPOLLOUT,
187            InterestType::Closed => EpollType::EPOLLHUP,
188            InterestType::Error => EpollType::EPOLLERR,
189        };
190        let mut ret = false;
191        self.tx.send_modify(move |i| {
192            ret = i.interest.iter().any(|(_, b)| *b == readiness);
193        });
194        ret
195    }
196}
197
198pub(super) fn register_epoll_handler(
199    state: &Arc<WasiState>,
200    event: &EpollFd,
201    tx: Arc<watch::Sender<EpollInterest>>,
202) -> Result<Option<EpollJoinGuard>, Errno> {
203    let mut type_ = Eventtype::FdRead;
204    let mut peb = PollEventBuilder::new();
205    if event.events.contains(EpollType::EPOLLOUT) {
206        type_ = Eventtype::FdWrite;
207        peb = peb.add(PollEvent::PollOut);
208    }
209    if event.events.contains(EpollType::EPOLLIN) {
210        type_ = Eventtype::FdRead;
211        peb = peb.add(PollEvent::PollIn);
212    }
213    if event.events.contains(EpollType::EPOLLERR) {
214        peb = peb.add(PollEvent::PollError);
215    }
216    if event.events.contains(EpollType::EPOLLHUP) | event.events.contains(EpollType::EPOLLRDHUP) {
217        peb = peb.add(PollEvent::PollHangUp);
218    }
219
220    // Create a dummy subscription
221    let s = Subscription {
222        userdata: event.data2,
223        type_,
224        data: SubscriptionUnion {
225            fd_readwrite: SubscriptionFsReadwrite {
226                file_descriptor: event.fd,
227            },
228        },
229    };
230
231    // Get guard object which we will register the waker against
232    let fd_guard = poll_fd_guard(state, peb.build(), event.fd, s)?;
233    let handler = EpollHandler::new(event.fd, tx);
234
235    match &fd_guard.mode {
236        InodeValFilePollGuardMode::File(_) => {
237            // Intentionally ignored, epoll doesn't work with files
238            return Ok(None);
239        }
240        InodeValFilePollGuardMode::Socket { inner, .. } => {
241            let mut inner = inner.protected.write().unwrap();
242            inner.set_handler(handler).map_err(net_error_into_io_err)?;
243            drop(inner);
244        }
245        InodeValFilePollGuardMode::EventNotifications(inner) => inner.set_interest_handler(handler),
246        InodeValFilePollGuardMode::DuplexPipe { pipe } => {
247            let mut inner = pipe.write().unwrap();
248            inner.set_interest_handler(handler);
249        }
250        InodeValFilePollGuardMode::PipeRx { rx } => {
251            let mut inner = rx.write().unwrap();
252            inner.set_interest_handler(handler);
253        }
254        InodeValFilePollGuardMode::PipeTx { tx } => {
255            // The sending end of a pipe can't have an interest handler, since we
256            // only support "readable" interest on pipes; they're considered to
257            // always be writable.
258            return Ok(None);
259        }
260    }
261
262    Ok(Some(EpollJoinGuard { fd_guard }))
263}