wasmer_wasix/os/epoll/
mod.rs

1//! Epoll runtime implementation for WASIX.
2//!
3//! This module centralizes epoll internals behind a small crate-internal API used
4//! by `epoll_create`, `epoll_ctl`, and `epoll_wait`.
5//!
6//! ## Architecture
7//!
8//! The implementation uses:
9//! - `EpollState`: global state for one epoll fd (`subscriptions`, ready queue, notifier).
10//! - `EpollSubState`: per-watched-fd state (`pending_bits`, `enqueued`, `generation`, metadata).
11//! - `EpollHandler`: producer-side interest handler attached to socket/pipe/notification sources.
12//!
13//! ## Important flows
14//!
15//! ### Registration (`epoll_ctl` ADD/MOD/DEL)
16//! 1. `epoll_ctl` creates/removes `EpollSubState` entries in `EpollState.subscriptions`.
17//! 2. On ADD/MOD, `register_epoll_handler()` installs an `EpollHandler` on the target fd.
18//! 3. Guard ownership is stored in `EpollSubState.joins`; dropping a subscription drops guards
19//!    and unregisters handlers.
20//!
21//! ### Producer path (`EpollHandler::push_interest`)
22//! 1. Convert `InterestType` to a readiness bit.
23//! 2. Set `pending_bits` atomically.
24//! 3. Enqueue exactly one `ReadyItem` per subscription while `enqueued == true`.
25//! 4. Wake one waiter via `Notify`.
26//!
27//! ### Consumer path (`drain_ready_events` used by `epoll_wait`)
28//! 1. Pop `ReadyItem` from the ready queue.
29//! 2. Resolve the current subscription and drop stale/missing entries.
30//! 3. Atomically take readiness bits, clear `enqueued`, and map bits to output events.
31//! 4. Repair queue state if a race set new bits during draining.
32//!
33//! ## Correctness model
34//!
35//! - `pending_bits` is the source of truth for unread readiness.
36//! - `enqueued` deduplicates queue presence per subscription.
37//! - `generation` prevents stale queued entries from emitting after DEL/MOD.
38//! - Consumer logic tolerates stale queue entries and must never await while holding locks.
39//!
40use std::{
41    collections::VecDeque,
42    sync::{
43        Arc, Mutex as StdMutex,
44        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
45    },
46};
47
48use fnv::FnvHashMap;
49use serde::{Deserialize, Serialize};
50use tokio::sync::Notify;
51use virtual_mio::{InterestHandler, InterestType};
52use virtual_net::net_error_into_io_err;
53use wasmer_wasix_types::wasi::{
54    EpollEventCtl, EpollType, Errno, Eventtype, Fd as WasiFd, Subscription,
55    SubscriptionFsReadwrite, SubscriptionUnion,
56};
57
58use crate::{
59    fs::{InodeValFilePollGuard, InodeValFilePollGuardMode},
60    state::{PollEvent, PollEventBuilder, WasiState},
61    syscalls::poll_fd_guard,
62};
63
64const READABLE_BIT: u8 = 1 << 0;
65const WRITABLE_BIT: u8 = 1 << 1;
66const HUP_BIT: u8 = 1 << 2;
67const ERR_BIT: u8 = 1 << 3;
68
69static EPOLL_ENQUEUE_ATTEMPTS: AtomicU64 = AtomicU64::new(0);
70static EPOLL_ENQUEUE_DEDUPE_HITS: AtomicU64 = AtomicU64::new(0);
71static EPOLL_STALE_GENERATION_DROPS: AtomicU64 = AtomicU64::new(0);
72static EPOLL_EMPTY_DEQUEUE_ENTRIES: AtomicU64 = AtomicU64::new(0);
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct EpollFd {
76    /// Event mask configured by the caller (`epoll_ctl`).
77    events: EpollType,
78    /// Opaque pointer payload from the caller.
79    ptr: u64,
80    /// Watched file descriptor.
81    fd: WasiFd,
82    /// Additional user payload.
83    data1: u32,
84    /// Additional user payload.
85    data2: u64,
86}
87
88impl EpollFd {
89    /// Creates immutable metadata for one epoll subscription.
90    pub fn new(events: EpollType, ptr: u64, fd: WasiFd, data1: u32, data2: u64) -> Self {
91        Self {
92            events,
93            ptr,
94            fd,
95            data1,
96            data2,
97        }
98    }
99
100    /// Converts the syscall control payload into subscription metadata.
101    pub fn from_event_ctl(fd: WasiFd, event: &EpollEventCtl) -> Self {
102        Self::new(event.events, event.ptr, fd, event.data1, event.data2)
103    }
104
105    /// Returns the configured event mask for this subscription.
106    pub fn events(&self) -> EpollType {
107        self.events
108    }
109
110    /// Returns the caller-supplied pointer payload.
111    pub fn ptr(&self) -> u64 {
112        self.ptr
113    }
114
115    /// Returns the watched file descriptor.
116    pub fn fd(&self) -> WasiFd {
117        self.fd
118    }
119
120    /// Returns the first user payload value.
121    pub fn data1(&self) -> u32 {
122        self.data1
123    }
124
125    /// Returns the second user payload value.
126    pub fn data2(&self) -> u64 {
127        self.data2
128    }
129}
130
131#[derive(Debug)]
132pub struct EpollJoinGuard {
133    /// Underlying poll registration guard.
134    fd_guard: InodeValFilePollGuard,
135}
136
137impl EpollJoinGuard {
138    fn new(fd_guard: InodeValFilePollGuard) -> Self {
139        Self { fd_guard }
140    }
141}
142
143impl Drop for EpollJoinGuard {
144    fn drop(&mut self) {
145        // Dropping a subscription must detach its interest handler from the source.
146        match &self.fd_guard.mode {
147            InodeValFilePollGuardMode::File(_) => {
148                // Intentionally ignored, epoll doesn't work with files
149            }
150            InodeValFilePollGuardMode::Socket { inner } => {
151                let mut inner = inner.protected.write().unwrap();
152                inner.remove_handler();
153            }
154            InodeValFilePollGuardMode::EventNotifications(inner) => {
155                inner.remove_interest_handler();
156            }
157            InodeValFilePollGuardMode::DuplexPipe { pipe } => {
158                let inner = pipe.write().unwrap();
159                inner.remove_interest_handler();
160            }
161            InodeValFilePollGuardMode::PipeRx { rx } => {
162                let inner = rx.write().unwrap();
163                inner.remove_interest_handler();
164            }
165            InodeValFilePollGuardMode::PipeTx { .. } => {
166                // Intentionally ignored, the sending end of a pipe can't have an interest handler
167            }
168        }
169    }
170}
171
172#[derive(Debug)]
173pub struct EpollState {
174    /// Active subscriptions keyed by watched fd.
175    subscriptions: StdMutex<FnvHashMap<WasiFd, Arc<EpollSubState>>>,
176    /// Ready queue of subscriptions with potentially pending bits.
177    ready: StdMutex<VecDeque<ReadyItem>>,
178    /// Wake primitive for blocked `epoll_wait`.
179    notify: Notify,
180}
181
182impl Default for EpollState {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl EpollState {
189    /// Creates a fresh epoll runtime state.
190    pub fn new() -> Self {
191        Self {
192            subscriptions: StdMutex::new(FnvHashMap::default()),
193            ready: StdMutex::new(VecDeque::new()),
194            notify: Notify::new(),
195        }
196    }
197
198    #[cfg(test)]
199    fn insert_subscription(&self, fd: WasiFd, state: Arc<EpollSubState>) {
200        self.subscriptions.lock().unwrap().insert(fd, state);
201    }
202
203    fn restore_subscription(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
204        let mut subscriptions = self.subscriptions.lock().unwrap();
205        subscriptions.remove(&fd);
206        if let Some(previous) = previous {
207            subscriptions.insert(fd, previous);
208        }
209    }
210
211    fn subscription(&self, fd: WasiFd) -> Option<Arc<EpollSubState>> {
212        self.subscriptions.lock().unwrap().get(&fd).cloned()
213    }
214
215    fn enqueue_ready(&self, fd: WasiFd, generation: u64) {
216        self.ready
217            .lock()
218            .unwrap()
219            .push_back(ReadyItem { fd, generation });
220        self.notify.notify_one();
221    }
222
223    fn dequeue_ready(&self) -> Option<ReadyItem> {
224        self.ready.lock().unwrap().pop_front()
225    }
226
227    /// Waits until a producer enqueues readiness and notifies.
228    pub async fn wait(&self) {
229        self.notify.notified().await;
230    }
231
232    pub(crate) fn prepare_add(
233        &self,
234        fd: WasiFd,
235        event: &EpollEventCtl,
236    ) -> Result<(EpollFd, Arc<EpollSubState>), Errno> {
237        let mut subscriptions = self.subscriptions.lock().unwrap();
238        if subscriptions.contains_key(&fd) {
239            return Err(Errno::Exist);
240        }
241
242        let (epoll_fd, sub_state) = self.build_pending_subscription(fd, event, 1);
243        subscriptions.insert(fd, sub_state.clone());
244        Ok((epoll_fd, sub_state))
245    }
246
247    pub(crate) fn prepare_mod(
248        &self,
249        fd: WasiFd,
250        event: &EpollEventCtl,
251    ) -> Result<(EpollFd, Arc<EpollSubState>, Arc<EpollSubState>), Errno> {
252        let mut subscriptions = self.subscriptions.lock().unwrap();
253        let Some(previous) = subscriptions.remove(&fd) else {
254            return Err(Errno::Noent);
255        };
256        tracing::trace!(fd, "unregistering waker");
257
258        let (epoll_fd, sub_state) =
259            self.build_pending_subscription(fd, event, previous.next_generation());
260        subscriptions.insert(fd, sub_state.clone());
261        Ok((epoll_fd, sub_state, previous))
262    }
263
264    pub(crate) fn apply_del(&self, fd: WasiFd) -> Result<(), Errno> {
265        let removed = self
266            .subscriptions
267            .lock()
268            .unwrap()
269            .remove(&fd)
270            .ok_or(Errno::Noent)?;
271        removed.detach_joins();
272        Ok(())
273    }
274
275    pub(crate) fn rollback_registration(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
276        self.restore_subscription(fd, previous);
277    }
278
279    fn build_pending_subscription(
280        &self,
281        fd: WasiFd,
282        event: &EpollEventCtl,
283        generation: u64,
284    ) -> (EpollFd, Arc<EpollSubState>) {
285        let epoll_fd = EpollFd::from_event_ctl(fd, event);
286        tracing::trace!(
287            peb = ?event.events,
288            ptr = ?event.ptr,
289            data1 = event.data1,
290            data2 = event.data2,
291            fd,
292            "registering waker"
293        );
294        let sub_state = Arc::new(EpollSubState::new(epoll_fd.clone(), generation));
295        (epoll_fd, sub_state)
296    }
297}
298
299#[derive(Debug)]
300pub struct EpollSubState {
301    /// Snapshot of user-visible metadata.
302    fd_meta: StdMutex<EpollFd>,
303    /// Guard ownership for all attached handlers.
304    joins: StdMutex<Vec<EpollJoinGuard>>,
305    /// Atomic readiness bitset (EPOLLIN/OUT/HUP/ERR).
306    pending_bits: AtomicU8,
307    /// Queue dedupe flag: whether this sub already has a ready-queue entry.
308    enqueued: AtomicBool,
309    /// Generation used to invalidate stale queue entries after DEL/MOD.
310    generation: AtomicU64,
311}
312
313impl EpollSubState {
314    /// Creates a new subscription state with empty readiness and queue state.
315    pub fn new(fd_meta: EpollFd, generation: u64) -> Self {
316        Self {
317            fd_meta: StdMutex::new(fd_meta),
318            joins: StdMutex::new(Vec::new()),
319            pending_bits: AtomicU8::new(0),
320            enqueued: AtomicBool::new(false),
321            generation: AtomicU64::new(generation),
322        }
323    }
324
325    /// Returns `generation + 1` without mutating the current subscription.
326    ///
327    /// Callers use this to seed the generation of a replacement subscription.
328    pub fn next_generation(&self) -> u64 {
329        self.generation.load(Ordering::Acquire).saturating_add(1)
330    }
331
332    /// Adds a registration guard that will detach handlers when dropped.
333    pub fn add_join(&self, join: EpollJoinGuard) {
334        self.joins.lock().unwrap().push(join);
335    }
336
337    /// Detaches and drops all registered handlers for this subscription.
338    pub fn detach_joins(&self) {
339        self.joins.lock().unwrap().clear();
340    }
341
342    fn generation(&self) -> u64 {
343        self.generation.load(Ordering::Acquire)
344    }
345
346    pub(crate) fn fd_meta(&self) -> EpollFd {
347        self.fd_meta.lock().unwrap().clone()
348    }
349
350    fn set_pending(&self, bit: u8) -> bool {
351        let old_bits = self.pending_bits.fetch_or(bit, Ordering::AcqRel);
352        (old_bits & bit) == 0
353    }
354
355    fn take_pending_bits(&self) -> u8 {
356        self.pending_bits.swap(0, Ordering::AcqRel)
357    }
358
359    fn pending_bits(&self) -> u8 {
360        self.pending_bits.load(Ordering::Acquire)
361    }
362
363    fn mark_enqueued(&self) -> bool {
364        self.enqueued
365            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
366            .is_ok()
367    }
368
369    fn clear_enqueued(&self) {
370        self.enqueued.store(false, Ordering::Release);
371    }
372}
373
374#[derive(Debug, Clone, Copy)]
375struct ReadyItem {
376    /// Watched fd key used to resolve current subscription state.
377    fd: WasiFd,
378    /// Generation snapshot captured when enqueued.
379    generation: u64,
380}
381
382/// Maps epoll readiness flags into internal pending-bit positions.
383pub(crate) fn epoll_type_to_pending_bit(readiness: EpollType) -> Option<u8> {
384    if readiness == EpollType::EPOLLIN {
385        Some(READABLE_BIT)
386    } else if readiness == EpollType::EPOLLOUT {
387        Some(WRITABLE_BIT)
388    } else if readiness == EpollType::EPOLLHUP {
389        Some(HUP_BIT)
390    } else if readiness == EpollType::EPOLLERR {
391        Some(ERR_BIT)
392    } else {
393        None
394    }
395}
396
397/// Maps a source interest callback variant into the internal pending-bit mask.
398fn interest_to_pending_bit(interest: InterestType) -> u8 {
399    match interest {
400        InterestType::Readable => READABLE_BIT,
401        InterestType::Writable => WRITABLE_BIT,
402        InterestType::Closed => HUP_BIT,
403        InterestType::Error => ERR_BIT,
404    }
405}
406
407/// Converts consumed pending bits into caller-visible epoll events.
408fn pending_bits_to_event(bits: u8, mask: EpollType) -> EpollType {
409    let mut event = EpollType::empty();
410    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLIN)
411        && (bits & bit) != 0
412        && mask.contains(EpollType::EPOLLIN)
413    {
414        event |= EpollType::EPOLLIN;
415    }
416    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLOUT)
417        && (bits & bit) != 0
418        && mask.contains(EpollType::EPOLLOUT)
419    {
420        event |= EpollType::EPOLLOUT;
421    }
422    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLHUP)
423        && (bits & bit) != 0
424    {
425        event |= EpollType::EPOLLHUP;
426    }
427    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLERR)
428        && (bits & bit) != 0
429    {
430        event |= EpollType::EPOLLERR;
431    }
432    event
433}
434
435fn prime_immediate_writable_if_applicable(
436    event: &EpollFd,
437    fd_guard: &InodeValFilePollGuard,
438    epoll_state: &Arc<EpollState>,
439    sub_state: &Arc<EpollSubState>,
440) {
441    if !event.events().contains(EpollType::EPOLLOUT) {
442        return;
443    }
444
445    // Some fd kinds are effectively writable immediately (for example eventfd-like
446    // notifications and pipe write-ends), but may not emit a writable transition.
447    // Prime EPOLLOUT once at registration so level-triggered epoll can observe them.
448    let writable_now = matches!(
449        fd_guard.mode,
450        InodeValFilePollGuardMode::EventNotifications(_) | InodeValFilePollGuardMode::PipeTx { .. }
451    );
452
453    if !writable_now {
454        return;
455    }
456
457    sub_state
458        .pending_bits
459        .fetch_or(WRITABLE_BIT, Ordering::AcqRel);
460    if sub_state.mark_enqueued() {
461        epoll_state.enqueue_ready(event.fd(), sub_state.generation());
462    }
463}
464
465/// Re-enqueues a subscription if new pending bits arrived during/after consumer drain.
466fn repair_ready_queue_after_drain(
467    epoll_state: &Arc<EpollState>,
468    fd: WasiFd,
469    sub_state: &Arc<EpollSubState>,
470) {
471    if sub_state.pending_bits() != 0 && sub_state.mark_enqueued() {
472        epoll_state.enqueue_ready(fd, sub_state.generation());
473    }
474}
475
476/// Drains ready items into `(EpollFd, readiness)` events up to `maxevents`.
477///
478/// This is the consumer hot path used by `epoll_wait`. It is designed to be:
479/// - O(number of dequeued ready items)
480/// - tolerant of stale queue entries
481/// - race-safe with producers setting bits concurrently
482pub(crate) fn drain_ready_events(
483    epoll_state: &Arc<EpollState>,
484    maxevents: usize,
485) -> Vec<(EpollFd, EpollType)> {
486    let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
487    while ret.len() < maxevents {
488        let Some(item) = epoll_state.dequeue_ready() else {
489            break;
490        };
491
492        let Some(sub_state) = epoll_state.subscription(item.fd) else {
493            epoll_empty_dequeue_entry();
494            continue;
495        };
496
497        if sub_state.generation() != item.generation {
498            epoll_stale_generation_drop();
499            continue;
500        }
501
502        let bits = sub_state.take_pending_bits();
503        sub_state.clear_enqueued();
504
505        if bits == 0 {
506            repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
507            epoll_empty_dequeue_entry();
508            continue;
509        }
510
511        let event = sub_state.fd_meta();
512        let readiness = pending_bits_to_event(bits, event.events());
513        if readiness != EpollType::empty() {
514            ret.push((event, readiness));
515        }
516        repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
517
518        if ret.len() >= maxevents {
519            break;
520        }
521    }
522    ret
523}
524
525#[derive(Debug)]
526struct EpollHandler {
527    /// Watched fd associated with the subscription.
528    fd: WasiFd,
529    /// Parent epoll state for queueing and wakeups.
530    epoll_state: Arc<EpollState>,
531    /// Per-subscription state updated by interest callbacks.
532    sub_state: Arc<EpollSubState>,
533}
534
535impl EpollHandler {
536    fn new(fd: WasiFd, epoll_state: Arc<EpollState>, sub_state: Arc<EpollSubState>) -> Box<Self> {
537        Box::new(Self {
538            fd,
539            epoll_state,
540            sub_state,
541        })
542    }
543}
544
545impl InterestHandler for EpollHandler {
546    /// Producer path:
547    /// set pending bits, enqueue once, and wake one waiter.
548    fn push_interest(&mut self, interest: InterestType) {
549        EPOLL_ENQUEUE_ATTEMPTS.fetch_add(1, Ordering::Relaxed);
550        let bit = interest_to_pending_bit(interest);
551        if !self.sub_state.set_pending(bit) {
552            EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
553            return;
554        }
555
556        if self.sub_state.mark_enqueued() {
557            self.epoll_state
558                .enqueue_ready(self.fd, self.sub_state.generation());
559        } else {
560            EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
561        }
562    }
563
564    /// Clears one readiness bit from this subscription only.
565    fn pop_interest(&mut self, interest: InterestType) -> bool {
566        let bit = interest_to_pending_bit(interest);
567        let old = self
568            .sub_state
569            .pending_bits
570            .fetch_and(!bit, Ordering::AcqRel);
571        (old & bit) != 0
572    }
573
574    /// Checks whether this subscription currently has a readiness bit set.
575    fn has_interest(&self, interest: InterestType) -> bool {
576        let bit = interest_to_pending_bit(interest);
577        (self.sub_state.pending_bits() & bit) != 0
578    }
579}
580
581/// Registers an epoll interest handler on the watched fd and returns a guard.
582///
583/// `None` means the fd kind does not support handler attachment for epoll.
584pub(crate) fn register_epoll_handler(
585    state: &Arc<WasiState>,
586    event: &EpollFd,
587    epoll_state: Arc<EpollState>,
588    sub_state: Arc<EpollSubState>,
589) -> Result<Option<EpollJoinGuard>, Errno> {
590    let mut type_ = Eventtype::FdRead;
591    let mut peb = PollEventBuilder::new();
592    if event.events().contains(EpollType::EPOLLOUT) {
593        type_ = Eventtype::FdWrite;
594        peb = peb.add(PollEvent::PollOut);
595    }
596    if event.events().contains(EpollType::EPOLLIN) {
597        type_ = Eventtype::FdRead;
598        peb = peb.add(PollEvent::PollIn);
599    }
600    // EPOLLERR/EPOLLHUP are always delivered by epoll regardless of requested mask.
601    peb = peb.add(PollEvent::PollError);
602    peb = peb.add(PollEvent::PollHangUp);
603
604    let s = Subscription {
605        userdata: event.data2(),
606        type_,
607        data: SubscriptionUnion {
608            fd_readwrite: SubscriptionFsReadwrite {
609                file_descriptor: event.fd(),
610            },
611        },
612    };
613
614    let fd_guard = poll_fd_guard(state, peb.build(), event.fd(), s)?;
615    let handler = EpollHandler::new(event.fd(), epoll_state.clone(), sub_state.clone());
616
617    match &fd_guard.mode {
618        InodeValFilePollGuardMode::File(_) => {
619            // Intentionally ignored, epoll doesn't work with files
620            return Ok(None);
621        }
622        InodeValFilePollGuardMode::Socket { inner, .. } => {
623            let mut inner = inner.protected.write().unwrap();
624            inner.set_handler(handler).map_err(net_error_into_io_err)?;
625            drop(inner);
626        }
627        InodeValFilePollGuardMode::EventNotifications(inner) => inner.set_interest_handler(handler),
628        InodeValFilePollGuardMode::DuplexPipe { pipe } => {
629            let inner = pipe.write().unwrap();
630            inner.set_interest_handler(handler);
631        }
632        InodeValFilePollGuardMode::PipeRx { rx } => {
633            let inner = rx.write().unwrap();
634            inner.set_interest_handler(handler);
635        }
636        InodeValFilePollGuardMode::PipeTx { .. } => {
637            // The sending end of a pipe can't have an interest handler, since we
638            // only support "readable" interest on pipes; they're considered to
639            // always be writable.
640            prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
641            return Ok(None);
642        }
643    }
644
645    prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
646
647    Ok(Some(EpollJoinGuard::new(fd_guard)))
648}
649
650/// Increments stale-generation dequeue metric.
651pub(crate) fn epoll_stale_generation_drop() {
652    EPOLL_STALE_GENERATION_DROPS.fetch_add(1, Ordering::Relaxed);
653}
654
655/// Increments empty dequeue metric.
656pub(crate) fn epoll_empty_dequeue_entry() {
657    EPOLL_EMPTY_DEQUEUE_ENTRIES.fetch_add(1, Ordering::Relaxed);
658}
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663    use std::sync::RwLock;
664    use virtual_fs::Pipe;
665
666    fn test_epoll_event_ctl(fd: WasiFd) -> EpollEventCtl {
667        EpollEventCtl {
668            events: EpollType::EPOLLIN,
669            ptr: 77,
670            fd,
671            data1: 88,
672            data2: 99,
673        }
674    }
675
676    fn test_epoll_handler(fd: WasiFd) -> (Arc<EpollState>, Arc<EpollSubState>, Box<EpollHandler>) {
677        let epoll_state = Arc::new(EpollState::new());
678        let sub_state = Arc::new(EpollSubState::new(
679            EpollFd::new(
680                EpollType::EPOLLIN
681                    | EpollType::EPOLLOUT
682                    | EpollType::EPOLLERR
683                    | EpollType::EPOLLHUP,
684                0,
685                fd,
686                0,
687                0,
688            ),
689            1,
690        ));
691        let handler = EpollHandler::new(fd, epoll_state.clone(), sub_state.clone());
692        (epoll_state, sub_state, handler)
693    }
694
695    fn test_sub_state(fd: WasiFd, generation: u64) -> Arc<EpollSubState> {
696        Arc::new(EpollSubState::new(
697            EpollFd::new(
698                EpollType::EPOLLIN
699                    | EpollType::EPOLLOUT
700                    | EpollType::EPOLLERR
701                    | EpollType::EPOLLHUP,
702                0,
703                fd,
704                0,
705                0,
706            ),
707            generation,
708        ))
709    }
710
711    #[test]
712    fn epoll_fd_from_event_ctl_uses_explicit_fd() {
713        let event = test_epoll_event_ctl(1234);
714        let epoll_fd = EpollFd::from_event_ctl(5678, &event);
715        assert_eq!(epoll_fd.fd(), 5678);
716        assert_eq!(epoll_fd.ptr(), 77);
717        assert_eq!(epoll_fd.data1(), 88);
718        assert_eq!(epoll_fd.data2(), 99);
719        assert_eq!(epoll_fd.events(), EpollType::EPOLLIN);
720    }
721
722    #[test]
723    fn epoll_handler_pop_interest_is_scoped_to_fd() {
724        let epoll_state = Arc::new(EpollState::new());
725        let sub_state1 = Arc::new(EpollSubState::new(
726            EpollFd::new(EpollType::EPOLLIN, 0, 10, 0, 0),
727            1,
728        ));
729        let sub_state2 = Arc::new(EpollSubState::new(
730            EpollFd::new(EpollType::EPOLLIN, 0, 11, 0, 0),
731            1,
732        ));
733        let mut handler1 = EpollHandler::new(10, epoll_state.clone(), sub_state1.clone());
734        let mut handler2 = EpollHandler::new(11, epoll_state.clone(), sub_state2.clone());
735
736        handler1.push_interest(InterestType::Readable);
737        handler2.push_interest(InterestType::Readable);
738
739        assert!(handler1.has_interest(InterestType::Readable));
740        assert!(handler2.has_interest(InterestType::Readable));
741
742        assert!(handler1.pop_interest(InterestType::Readable));
743        assert!(!handler1.has_interest(InterestType::Readable));
744        assert!(
745            handler2.has_interest(InterestType::Readable),
746            "popping one fd interest must not clear another fd with the same readiness"
747        );
748
749        assert!(sub_state1.pending_bits() == 0);
750        assert!(sub_state2.pending_bits() != 0);
751        assert_eq!(epoll_state.ready.lock().unwrap().len(), 2);
752    }
753
754    #[test]
755    fn epoll_handler_dedupes_queue_until_consumer_drains() {
756        let (epoll_state, sub_state, mut handler) = test_epoll_handler(7);
757
758        handler.push_interest(InterestType::Readable);
759        handler.push_interest(InterestType::Readable);
760        handler.push_interest(InterestType::Writable);
761
762        assert_eq!(
763            epoll_state.ready.lock().unwrap().len(),
764            1,
765            "multiple pushes while enqueued must keep a single queue entry"
766        );
767        assert!(handler.has_interest(InterestType::Readable));
768        assert!(handler.has_interest(InterestType::Writable));
769
770        epoll_state.ready.lock().unwrap().pop_front().unwrap();
771        sub_state.take_pending_bits();
772        sub_state.clear_enqueued();
773
774        handler.push_interest(InterestType::Readable);
775        assert_eq!(
776            epoll_state.ready.lock().unwrap().len(),
777            1,
778            "after drain, a new event should enqueue again"
779        );
780    }
781
782    #[test]
783    fn epoll_type_to_pending_bit_has_stable_mapping() {
784        assert_eq!(
785            epoll_type_to_pending_bit(EpollType::EPOLLIN),
786            Some(READABLE_BIT)
787        );
788        assert_eq!(
789            epoll_type_to_pending_bit(EpollType::EPOLLOUT),
790            Some(WRITABLE_BIT)
791        );
792        assert_eq!(
793            epoll_type_to_pending_bit(EpollType::EPOLLHUP),
794            Some(HUP_BIT)
795        );
796        assert_eq!(
797            epoll_type_to_pending_bit(EpollType::EPOLLERR),
798            Some(ERR_BIT)
799        );
800    }
801
802    #[test]
803    fn interest_to_pending_bit_has_stable_mapping() {
804        assert_eq!(
805            interest_to_pending_bit(InterestType::Readable),
806            READABLE_BIT
807        );
808        assert_eq!(
809            interest_to_pending_bit(InterestType::Writable),
810            WRITABLE_BIT
811        );
812        assert_eq!(interest_to_pending_bit(InterestType::Closed), HUP_BIT);
813        assert_eq!(interest_to_pending_bit(InterestType::Error), ERR_BIT);
814    }
815
816    #[test]
817    fn pending_bits_to_event_always_includes_hup_and_err() {
818        let event = pending_bits_to_event(HUP_BIT | ERR_BIT, EpollType::EPOLLIN);
819        assert_eq!(event, EpollType::EPOLLHUP | EpollType::EPOLLERR);
820    }
821
822    #[test]
823    fn drain_ready_events_keeps_multi_fd_same_readiness_isolated() {
824        let epoll_state = Arc::new(EpollState::new());
825
826        let sub_a = test_sub_state(10, 1);
827        let sub_b = test_sub_state(11, 1);
828        let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
829
830        sub_a.pending_bits.store(readable_bit, Ordering::Release);
831        sub_a.enqueued.store(true, Ordering::Release);
832        sub_b.pending_bits.store(readable_bit, Ordering::Release);
833        sub_b.enqueued.store(true, Ordering::Release);
834
835        epoll_state.insert_subscription(10, sub_a);
836        epoll_state.insert_subscription(11, sub_b);
837        epoll_state.enqueue_ready(10, 1);
838        epoll_state.enqueue_ready(11, 1);
839
840        let events = drain_ready_events(&epoll_state, 8);
841        assert_eq!(events.len(), 2);
842        assert_eq!(events[0].0.fd(), 10);
843        assert_eq!(events[1].0.fd(), 11);
844        assert_eq!(events[0].1, EpollType::EPOLLIN);
845        assert_eq!(events[1].1, EpollType::EPOLLIN);
846    }
847
848    #[test]
849    fn drain_ready_events_coalesces_readiness_bits_for_one_fd() {
850        let epoll_state = Arc::new(EpollState::new());
851        let sub = Arc::new(EpollSubState::new(
852            EpollFd::new(EpollType::EPOLLIN | EpollType::EPOLLOUT, 0, 90, 0, 0),
853            1,
854        ));
855
856        sub.pending_bits
857            .store(READABLE_BIT | WRITABLE_BIT, Ordering::Release);
858        sub.enqueued.store(true, Ordering::Release);
859        epoll_state.insert_subscription(90, sub.clone());
860        epoll_state.enqueue_ready(90, 1);
861
862        let first = drain_ready_events(&epoll_state, 1);
863        assert_eq!(first.len(), 1);
864        assert_eq!(first[0].0.fd(), 90);
865        assert_eq!(first[0].1, EpollType::EPOLLIN | EpollType::EPOLLOUT);
866        assert_eq!(sub.pending_bits(), 0);
867        assert!(!sub.enqueued.load(Ordering::Acquire));
868        assert_eq!(epoll_state.ready.lock().unwrap().len(), 0);
869    }
870
871    #[test]
872    fn drain_ready_events_drops_stale_generation_items() {
873        let epoll_state = Arc::new(EpollState::new());
874
875        let sub = test_sub_state(22, 2);
876        let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
877        sub.pending_bits.store(readable_bit, Ordering::Release);
878        sub.enqueued.store(true, Ordering::Release);
879
880        epoll_state.insert_subscription(22, sub.clone());
881        epoll_state.enqueue_ready(22, 1);
882
883        let events = drain_ready_events(&epoll_state, 8);
884        assert!(
885            events.is_empty(),
886            "stale generation items must not emit events"
887        );
888        assert_eq!(
889            sub.pending_bits.load(Ordering::Acquire),
890            readable_bit,
891            "stale dequeue must not clear pending bits for current generation"
892        );
893    }
894
895    #[test]
896    fn repair_ready_queue_after_drain_requeues_when_new_bits_arrive() {
897        let epoll_state = Arc::new(EpollState::new());
898        let sub = test_sub_state(44, 3);
899        let writable_bit = epoll_type_to_pending_bit(EpollType::EPOLLOUT).unwrap();
900        sub.pending_bits.store(writable_bit, Ordering::Release);
901        sub.enqueued.store(false, Ordering::Release);
902
903        repair_ready_queue_after_drain(&epoll_state, 44, &sub);
904
905        assert!(sub.enqueued.load(Ordering::Acquire));
906        let queued = epoll_state.ready.lock().unwrap().pop_front().unwrap();
907        assert_eq!(queued.fd, 44);
908    }
909
910    #[test]
911    fn apply_del_detaches_joins_even_if_subscription_stays_alive() {
912        let epoll_state = Arc::new(EpollState::new());
913        let event = test_epoll_event_ctl(55);
914        let sub = Arc::new(EpollSubState::new(EpollFd::from_event_ctl(55, &event), 1));
915        epoll_state.insert_subscription(55, sub.clone());
916
917        let (tx, _rx) = Pipe::new().split();
918        sub.add_join(EpollJoinGuard::new(InodeValFilePollGuard {
919            fd: 55,
920            peb: PollEventBuilder::new().build(),
921            subscription: Subscription {
922                userdata: 0,
923                type_: Eventtype::FdRead,
924                data: SubscriptionUnion {
925                    fd_readwrite: SubscriptionFsReadwrite {
926                        file_descriptor: 55,
927                    },
928                },
929            },
930            mode: InodeValFilePollGuardMode::PipeTx {
931                tx: Arc::new(RwLock::new(Box::new(tx))),
932            },
933        }));
934
935        let leaked_ref = sub.clone();
936        assert_eq!(leaked_ref.joins.lock().unwrap().len(), 1);
937
938        epoll_state.apply_del(55).unwrap();
939
940        assert_eq!(leaked_ref.joins.lock().unwrap().len(), 0);
941    }
942}