wasmer_wasix/syscalls/wasix/
epoll_ctl.rs1use serde::{Deserialize, Serialize};
2use tokio::sync::{mpsc::UnboundedSender, watch};
3use virtual_mio::{InterestHandler, InterestType};
4use virtual_net::net_error_into_io_err;
5use wasmer_wasix_types::wasi::{
6 EpollCtl, EpollEvent, EpollEventCtl, EpollType, SubscriptionClock, SubscriptionUnion, Userdata,
7};
8
9use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
10
11use futures::Future;
12
13use super::*;
14use crate::{
15 WasiInodes,
16 fs::{
17 EpollFd, EpollInterest, EpollJoinGuard, InodeValFilePollGuard, InodeValFilePollGuardJoin,
18 InodeValFilePollGuardMode, POLL_GUARD_MAX_RET,
19 },
20 state::PollEventSet,
21 syscalls::*,
22};
23
24#[instrument(level = "trace", skip_all, fields(timeout_ms = field::Empty, fd_guards = field::Empty, seen = field::Empty, fd), ret)]
30pub fn epoll_ctl<M: MemorySize + 'static>(
31 mut ctx: FunctionEnvMut<'_, WasiEnv>,
32 epfd: WasiFd,
33 op: EpollCtl,
34 fd: WasiFd,
35 event_ref: WasmPtr<EpollEvent<M>, M>,
36) -> Result<Errno, WasiError> {
37 WasiEnv::do_pending_operations(&mut ctx)?;
38
39 let env = ctx.data();
40
41 let memory = unsafe { env.memory_view(&ctx) };
42 let event = if event_ref.offset() != M::ZERO {
43 Some(wasi_try_mem_ok!(event_ref.read(&memory)))
44 } else {
45 None
46 };
47
48 let event_ctl = event.map(|evt| EpollEventCtl {
49 events: evt.events,
50 ptr: evt.data.ptr.into(),
51 fd: evt.data.fd,
52 data1: evt.data.data1,
53 data2: evt.data.data2,
54 });
55
56 wasi_try_ok!(epoll_ctl_internal(
57 &mut ctx,
58 epfd,
59 op,
60 fd,
61 event_ctl.as_ref()
62 )?);
63 let env = ctx.data();
64
65 #[cfg(feature = "journal")]
66 if env.enable_journal {
67 JournalEffector::save_epoll_ctl(&mut ctx, epfd, op, fd, event_ctl).map_err(|err| {
68 tracing::error!("failed to save epoll_create event - {}", err);
69 WasiError::Exit(ExitCode::from(Errno::Fault))
70 })?;
71 }
72
73 Ok(Errno::Success)
74}
75
76pub(crate) fn epoll_ctl_internal(
77 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
78 epfd: WasiFd,
79 op: EpollCtl,
80 fd: WasiFd,
81 event_ctl: Option<&EpollEventCtl>,
82) -> Result<Result<(), Errno>, WasiError> {
83 let env = ctx.data();
84 let fd_entry = wasi_try_ok_ok!(env.state.fs.get_fd(epfd));
85
86 let tasks = env.tasks().clone();
87 let mut inode_guard = fd_entry.inode.read();
88 match inode_guard.deref() {
89 Kind::Epoll {
90 subscriptions, tx, ..
91 } => {
92 if let EpollCtl::Del | EpollCtl::Mod = op {
93 let mut guard = subscriptions.lock().unwrap();
94 guard.remove(&fd);
95
96 tracing::trace!(fd, "unregistering waker");
97 }
98 if let EpollCtl::Add | EpollCtl::Mod = op {
99 if let Some(event) = event_ctl {
100 let epoll_fd = EpollFd {
101 events: event.events,
102 ptr: event.ptr,
103 fd: event.fd,
104 data1: event.data1,
105 data2: event.data2,
106 };
107
108 tracing::trace!(
110 peb = ?event.events,
111 ptr = ?event.ptr,
112 data1 = event.data1,
113 data2 = event.data2,
114 fd = event.fd,
115 "registering waker"
116 );
117
118 {
119 let mut guard = subscriptions.lock().unwrap();
122 guard.insert(event.fd, (epoll_fd.clone(), Vec::new()));
123 }
124
125 let tx = tx.clone();
127 let mut fd_guard =
128 wasi_try_ok_ok!(register_epoll_handler(&env.state, &epoll_fd, tx));
129
130 let mut guard = subscriptions.lock().unwrap();
132 if let Some(subs) = guard.get_mut(&event.fd) {
133 if let Some(fd_guard) = fd_guard {
134 subs.1.push(fd_guard);
135 }
136 }
137 }
138 }
139 Ok(Ok(()))
140 }
141 _ => Ok(Err(Errno::Inval)),
142 }
143}
144
145#[derive(Debug)]
146pub struct EpollHandler {
147 fd: WasiFd,
148 tx: Arc<watch::Sender<EpollInterest>>,
149}
150impl EpollHandler {
151 pub fn new(fd: WasiFd, tx: Arc<watch::Sender<EpollInterest>>) -> Box<Self> {
152 Box::new(Self { fd, tx })
153 }
154}
155impl InterestHandler for EpollHandler {
156 fn push_interest(&mut self, interest: InterestType) {
157 let readiness = match interest {
158 InterestType::Readable => EpollType::EPOLLIN,
159 InterestType::Writable => EpollType::EPOLLOUT,
160 InterestType::Closed => EpollType::EPOLLHUP,
161 InterestType::Error => EpollType::EPOLLERR,
162 };
163 self.tx.send_modify(|i| {
164 i.interest.insert((self.fd, readiness));
165 });
166 }
167
168 fn pop_interest(&mut self, interest: InterestType) -> bool {
169 let readiness = match interest {
170 InterestType::Readable => EpollType::EPOLLIN,
171 InterestType::Writable => EpollType::EPOLLOUT,
172 InterestType::Closed => EpollType::EPOLLHUP,
173 InterestType::Error => EpollType::EPOLLERR,
174 };
175 let mut ret = false;
176 self.tx.send_modify(move |i| {
177 ret = i.interest.iter().any(|(_, b)| *b == readiness);
178 i.interest.retain(|(_, b)| *b != readiness);
179 });
180 ret
181 }
182
183 fn has_interest(&self, interest: InterestType) -> bool {
184 let readiness = match interest {
185 InterestType::Readable => EpollType::EPOLLIN,
186 InterestType::Writable => EpollType::EPOLLOUT,
187 InterestType::Closed => EpollType::EPOLLHUP,
188 InterestType::Error => EpollType::EPOLLERR,
189 };
190 let mut ret = false;
191 self.tx.send_modify(move |i| {
192 ret = i.interest.iter().any(|(_, b)| *b == readiness);
193 });
194 ret
195 }
196}
197
198pub(super) fn register_epoll_handler(
199 state: &Arc<WasiState>,
200 event: &EpollFd,
201 tx: Arc<watch::Sender<EpollInterest>>,
202) -> Result<Option<EpollJoinGuard>, Errno> {
203 let mut type_ = Eventtype::FdRead;
204 let mut peb = PollEventBuilder::new();
205 if event.events.contains(EpollType::EPOLLOUT) {
206 type_ = Eventtype::FdWrite;
207 peb = peb.add(PollEvent::PollOut);
208 }
209 if event.events.contains(EpollType::EPOLLIN) {
210 type_ = Eventtype::FdRead;
211 peb = peb.add(PollEvent::PollIn);
212 }
213 if event.events.contains(EpollType::EPOLLERR) {
214 peb = peb.add(PollEvent::PollError);
215 }
216 if event.events.contains(EpollType::EPOLLHUP) | event.events.contains(EpollType::EPOLLRDHUP) {
217 peb = peb.add(PollEvent::PollHangUp);
218 }
219
220 let s = Subscription {
222 userdata: event.data2,
223 type_,
224 data: SubscriptionUnion {
225 fd_readwrite: SubscriptionFsReadwrite {
226 file_descriptor: event.fd,
227 },
228 },
229 };
230
231 let fd_guard = poll_fd_guard(state, peb.build(), event.fd, s)?;
233 let handler = EpollHandler::new(event.fd, tx);
234
235 match &fd_guard.mode {
236 InodeValFilePollGuardMode::File(_) => {
237 return Ok(None);
239 }
240 InodeValFilePollGuardMode::Socket { inner, .. } => {
241 let mut inner = inner.protected.write().unwrap();
242 inner.set_handler(handler).map_err(net_error_into_io_err)?;
243 drop(inner);
244 }
245 InodeValFilePollGuardMode::EventNotifications(inner) => inner.set_interest_handler(handler),
246 InodeValFilePollGuardMode::DuplexPipe { pipe } => {
247 let mut inner = pipe.write().unwrap();
248 inner.set_interest_handler(handler);
249 }
250 InodeValFilePollGuardMode::PipeRx { rx } => {
251 let mut inner = rx.write().unwrap();
252 inner.set_interest_handler(handler);
253 }
254 InodeValFilePollGuardMode::PipeTx { tx } => {
255 return Ok(None);
259 }
260 }
261
262 Ok(Some(EpollJoinGuard { fd_guard }))
263}