wasmer_wasix/os/epoll/
mod.rs

1//! Epoll runtime implementation for WASIX.
2//!
3//! This module centralizes epoll internals behind a small crate-internal API used
4//! by `epoll_create`, `epoll_ctl`, and `epoll_wait`.
5//!
6//! ## Architecture
7//!
8//! The implementation uses:
9//! - `EpollState`: global state for one epoll fd (`subscriptions`, ready queue, notifier).
10//! - `EpollSubState`: per-watched-fd state (`pending_bits`, `enqueued`, `generation`, metadata).
11//! - `EpollHandler`: producer-side interest handler attached to socket/pipe/notification sources.
12//!
13//! ## Important flows
14//!
15//! ### Registration (`epoll_ctl` ADD/MOD/DEL)
16//! 1. `epoll_ctl` creates/removes `EpollSubState` entries in `EpollState.subscriptions`.
17//! 2. On ADD/MOD, `register_epoll_handler()` installs an `EpollHandler` on the target fd.
18//! 3. Guard ownership is stored in `EpollSubState.joins`; dropping a subscription drops guards
19//!    and unregisters handlers.
20//!
21//! ### Producer path (`EpollHandler::push_interest`)
22//! 1. Convert `InterestType` to a readiness bit.
23//! 2. Set `pending_bits` atomically.
24//! 3. Enqueue exactly one `ReadyItem` per subscription while `enqueued == true`.
25//! 4. Wake one waiter via `Notify`.
26//!
27//! ### Consumer path (`drain_ready_events` used by `epoll_wait`)
28//! 1. Pop `ReadyItem` from the ready queue.
29//! 2. Resolve the current subscription and drop stale/missing entries.
30//! 3. Atomically take readiness bits, clear `enqueued`, and map bits to output events.
31//! 4. Repair queue state if a race set new bits during draining.
32//!
33//! ## Correctness model
34//!
35//! - `pending_bits` is the source of truth for unread readiness.
36//! - `enqueued` deduplicates queue presence per subscription.
37//! - `generation` prevents stale queued entries from emitting after DEL/MOD.
38//! - Consumer logic tolerates stale queue entries and must never await while holding locks.
39//!
40use std::{
41    collections::VecDeque,
42    sync::{
43        Arc, Mutex as StdMutex,
44        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
45    },
46};
47
48use fnv::FnvHashMap;
49use serde::{Deserialize, Serialize};
50use tokio::sync::Notify;
51use virtual_mio::{InterestHandler, InterestType};
52use virtual_net::net_error_into_io_err;
53use wasmer_wasix_types::wasi::{
54    EpollEventCtl, EpollType, Errno, Eventtype, Fd as WasiFd, Subscription,
55    SubscriptionFsReadwrite, SubscriptionUnion,
56};
57
58use crate::{
59    fs::{InodeValFilePollGuard, InodeValFilePollGuardMode},
60    state::{PollEvent, PollEventBuilder, WasiState},
61    syscalls::poll_fd_guard,
62};
63
64const READABLE_BIT: u8 = 1 << 0;
65const WRITABLE_BIT: u8 = 1 << 1;
66const HUP_BIT: u8 = 1 << 2;
67const ERR_BIT: u8 = 1 << 3;
68
69static EPOLL_ENQUEUE_ATTEMPTS: AtomicU64 = AtomicU64::new(0);
70static EPOLL_ENQUEUE_DEDUPE_HITS: AtomicU64 = AtomicU64::new(0);
71static EPOLL_STALE_GENERATION_DROPS: AtomicU64 = AtomicU64::new(0);
72static EPOLL_EMPTY_DEQUEUE_ENTRIES: AtomicU64 = AtomicU64::new(0);
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct EpollFd {
76    /// Event mask configured by the caller (`epoll_ctl`).
77    events: EpollType,
78    /// Opaque pointer payload from the caller.
79    ptr: u64,
80    /// Watched file descriptor.
81    fd: WasiFd,
82    /// Additional user payload.
83    data1: u32,
84    /// Additional user payload.
85    data2: u64,
86}
87
88impl EpollFd {
89    /// Creates immutable metadata for one epoll subscription.
90    pub fn new(events: EpollType, ptr: u64, fd: WasiFd, data1: u32, data2: u64) -> Self {
91        Self {
92            events,
93            ptr,
94            fd,
95            data1,
96            data2,
97        }
98    }
99
100    /// Converts the syscall control payload into subscription metadata.
101    pub fn from_event_ctl(fd: WasiFd, event: &EpollEventCtl) -> Self {
102        Self::new(event.events, event.ptr, fd, event.data1, event.data2)
103    }
104
105    /// Returns the configured event mask for this subscription.
106    pub fn events(&self) -> EpollType {
107        self.events
108    }
109
110    /// Returns the caller-supplied pointer payload.
111    pub fn ptr(&self) -> u64 {
112        self.ptr
113    }
114
115    /// Returns the watched file descriptor.
116    pub fn fd(&self) -> WasiFd {
117        self.fd
118    }
119
120    /// Returns the first user payload value.
121    pub fn data1(&self) -> u32 {
122        self.data1
123    }
124
125    /// Returns the second user payload value.
126    pub fn data2(&self) -> u64 {
127        self.data2
128    }
129}
130
131#[derive(Debug)]
132pub struct EpollJoinGuard {
133    /// Underlying poll registration guard.
134    fd_guard: InodeValFilePollGuard,
135}
136
137impl EpollJoinGuard {
138    fn new(fd_guard: InodeValFilePollGuard) -> Self {
139        Self { fd_guard }
140    }
141}
142
143impl Drop for EpollJoinGuard {
144    fn drop(&mut self) {
145        // Dropping a subscription must detach its interest handler from the source.
146        match &self.fd_guard.mode {
147            InodeValFilePollGuardMode::File(_) => {
148                // Intentionally ignored, epoll doesn't work with files
149            }
150            InodeValFilePollGuardMode::Socket { inner } => {
151                let mut inner = inner.protected.write().unwrap();
152                inner.remove_handler();
153            }
154            InodeValFilePollGuardMode::EventNotifications(inner) => {
155                inner.remove_interest_handler();
156            }
157            InodeValFilePollGuardMode::DuplexPipe { pipe } => {
158                let inner = pipe.write().unwrap();
159                inner.remove_interest_handler();
160            }
161            InodeValFilePollGuardMode::PipeRx { rx } => {
162                let inner = rx.write().unwrap();
163                inner.remove_interest_handler();
164            }
165            InodeValFilePollGuardMode::PipeTx { .. } => {
166                // Intentionally ignored, the sending end of a pipe can't have an interest handler
167            }
168        }
169    }
170}
171
172#[derive(Debug)]
173pub struct EpollState {
174    /// Active subscriptions keyed by watched fd.
175    subscriptions: StdMutex<FnvHashMap<WasiFd, Arc<EpollSubState>>>,
176    /// Ready queue of subscriptions with potentially pending bits.
177    ready: StdMutex<VecDeque<ReadyItem>>,
178    /// Wake primitive for blocked `epoll_wait`.
179    notify: Notify,
180}
181
182impl Default for EpollState {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl EpollState {
189    /// Creates a fresh epoll runtime state.
190    pub fn new() -> Self {
191        Self {
192            subscriptions: StdMutex::new(FnvHashMap::default()),
193            ready: StdMutex::new(VecDeque::new()),
194            notify: Notify::new(),
195        }
196    }
197
198    #[cfg(test)]
199    fn insert_subscription(&self, fd: WasiFd, state: Arc<EpollSubState>) {
200        self.subscriptions.lock().unwrap().insert(fd, state);
201    }
202
203    fn restore_subscription(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
204        let mut subscriptions = self.subscriptions.lock().unwrap();
205        subscriptions.remove(&fd);
206        if let Some(previous) = previous {
207            subscriptions.insert(fd, previous);
208        }
209    }
210
211    fn subscription(&self, fd: WasiFd) -> Option<Arc<EpollSubState>> {
212        self.subscriptions.lock().unwrap().get(&fd).cloned()
213    }
214
215    fn enqueue_ready(&self, fd: WasiFd, generation: u64) {
216        self.ready
217            .lock()
218            .unwrap()
219            .push_back(ReadyItem { fd, generation });
220        self.notify.notify_one();
221    }
222
223    fn dequeue_ready(&self) -> Option<ReadyItem> {
224        self.ready.lock().unwrap().pop_front()
225    }
226
227    /// Waits until a producer enqueues readiness and notifies.
228    pub async fn wait(&self) {
229        self.notify.notified().await;
230    }
231
232    pub(crate) fn prepare_add(
233        &self,
234        fd: WasiFd,
235        event: &EpollEventCtl,
236    ) -> Result<(EpollFd, Arc<EpollSubState>), Errno> {
237        let mut subscriptions = self.subscriptions.lock().unwrap();
238        if subscriptions.contains_key(&fd) {
239            return Err(Errno::Exist);
240        }
241
242        let (epoll_fd, sub_state) = self.build_pending_subscription(fd, event, 1);
243        subscriptions.insert(fd, sub_state.clone());
244        Ok((epoll_fd, sub_state))
245    }
246
247    pub(crate) fn prepare_mod(
248        &self,
249        fd: WasiFd,
250        event: &EpollEventCtl,
251    ) -> Result<(EpollFd, Arc<EpollSubState>, Arc<EpollSubState>), Errno> {
252        let mut subscriptions = self.subscriptions.lock().unwrap();
253        let Some(previous) = subscriptions.remove(&fd) else {
254            return Err(Errno::Noent);
255        };
256        tracing::trace!(fd, "unregistering waker");
257
258        let (epoll_fd, sub_state) =
259            self.build_pending_subscription(fd, event, previous.next_generation());
260        subscriptions.insert(fd, sub_state.clone());
261        Ok((epoll_fd, sub_state, previous))
262    }
263
264    pub(crate) fn apply_del(&self, fd: WasiFd) -> Result<(), Errno> {
265        let removed = self
266            .subscriptions
267            .lock()
268            .unwrap()
269            .remove(&fd)
270            .ok_or(Errno::Noent)?;
271        removed.detach_joins();
272        Ok(())
273    }
274
275    pub(crate) fn rollback_registration(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
276        self.restore_subscription(fd, previous);
277    }
278
279    fn build_pending_subscription(
280        &self,
281        fd: WasiFd,
282        event: &EpollEventCtl,
283        generation: u64,
284    ) -> (EpollFd, Arc<EpollSubState>) {
285        let epoll_fd = EpollFd::from_event_ctl(fd, event);
286        tracing::trace!(
287            peb = ?event.events,
288            ptr = ?event.ptr,
289            data1 = event.data1,
290            data2 = event.data2,
291            fd,
292            "registering waker"
293        );
294        let sub_state = Arc::new(EpollSubState::new(epoll_fd.clone(), generation));
295        (epoll_fd, sub_state)
296    }
297}
298
299#[derive(Debug)]
300pub struct EpollSubState {
301    /// Snapshot of user-visible metadata.
302    fd_meta: StdMutex<EpollFd>,
303    /// Guard ownership for all attached handlers.
304    joins: StdMutex<Vec<EpollJoinGuard>>,
305    /// Atomic readiness bitset (EPOLLIN/OUT/HUP/ERR).
306    pending_bits: AtomicU8,
307    /// Queue dedupe flag: whether this sub already has a ready-queue entry.
308    enqueued: AtomicBool,
309    /// Generation used to invalidate stale queue entries after DEL/MOD.
310    generation: AtomicU64,
311}
312
313impl EpollSubState {
314    /// Creates a new subscription state with empty readiness and queue state.
315    pub fn new(fd_meta: EpollFd, generation: u64) -> Self {
316        Self {
317            fd_meta: StdMutex::new(fd_meta),
318            joins: StdMutex::new(Vec::new()),
319            pending_bits: AtomicU8::new(0),
320            enqueued: AtomicBool::new(false),
321            generation: AtomicU64::new(generation),
322        }
323    }
324
325    /// Returns `generation + 1` without mutating the current subscription.
326    ///
327    /// Callers use this to seed the generation of a replacement subscription.
328    pub fn next_generation(&self) -> u64 {
329        self.generation.load(Ordering::Acquire).saturating_add(1)
330    }
331
332    /// Adds a registration guard that will detach handlers when dropped.
333    pub fn add_join(&self, join: EpollJoinGuard) {
334        self.joins.lock().unwrap().push(join);
335    }
336
337    /// Detaches and drops all registered handlers for this subscription.
338    pub fn detach_joins(&self) {
339        self.joins.lock().unwrap().clear();
340    }
341
342    fn generation(&self) -> u64 {
343        self.generation.load(Ordering::Acquire)
344    }
345
346    pub(crate) fn fd_meta(&self) -> EpollFd {
347        self.fd_meta.lock().unwrap().clone()
348    }
349
350    fn set_pending(&self, bit: u8) -> bool {
351        let old_bits = self.pending_bits.fetch_or(bit, Ordering::AcqRel);
352        (old_bits & bit) == 0
353    }
354
355    fn take_pending_bits(&self) -> u8 {
356        self.pending_bits.swap(0, Ordering::AcqRel)
357    }
358
359    fn pending_bits(&self) -> u8 {
360        self.pending_bits.load(Ordering::Acquire)
361    }
362
363    fn mark_enqueued(&self) -> bool {
364        self.enqueued
365            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
366            .is_ok()
367    }
368
369    fn clear_enqueued(&self) {
370        self.enqueued.store(false, Ordering::Release);
371    }
372}
373
374#[derive(Debug, Clone, Copy)]
375struct ReadyItem {
376    /// Watched fd key used to resolve current subscription state.
377    fd: WasiFd,
378    /// Generation snapshot captured when enqueued.
379    generation: u64,
380}
381
382/// Maps epoll readiness flags into internal pending-bit positions.
383pub(crate) fn epoll_type_to_pending_bit(readiness: EpollType) -> Option<u8> {
384    if readiness == EpollType::EPOLLIN {
385        Some(READABLE_BIT)
386    } else if readiness == EpollType::EPOLLOUT {
387        Some(WRITABLE_BIT)
388    } else if readiness == EpollType::EPOLLHUP {
389        Some(HUP_BIT)
390    } else if readiness == EpollType::EPOLLERR {
391        Some(ERR_BIT)
392    } else {
393        None
394    }
395}
396
397/// Maps a source interest callback variant into the internal pending-bit mask.
398fn interest_to_pending_bit(interest: InterestType) -> u8 {
399    match interest {
400        InterestType::Readable => READABLE_BIT,
401        InterestType::Writable => WRITABLE_BIT,
402        InterestType::Closed => HUP_BIT,
403        InterestType::Error => ERR_BIT,
404    }
405}
406
407/// Converts consumed pending bits into caller-visible epoll events.
408fn pending_bits_to_events(bits: u8, mask: EpollType) -> Vec<EpollType> {
409    let mut events = Vec::with_capacity(4);
410    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLIN)
411        && (bits & bit) != 0
412        && mask.contains(EpollType::EPOLLIN)
413    {
414        events.push(EpollType::EPOLLIN);
415    }
416    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLOUT)
417        && (bits & bit) != 0
418        && mask.contains(EpollType::EPOLLOUT)
419    {
420        events.push(EpollType::EPOLLOUT);
421    }
422    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLHUP)
423        && (bits & bit) != 0
424    {
425        events.push(EpollType::EPOLLHUP);
426    }
427    if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLERR)
428        && (bits & bit) != 0
429    {
430        events.push(EpollType::EPOLLERR);
431    }
432    events
433}
434
435fn epoll_mask_to_pending_bits(mask: EpollType) -> u8 {
436    let mut bits = 0;
437    if mask.contains(EpollType::EPOLLIN) {
438        bits |= READABLE_BIT;
439    }
440    if mask.contains(EpollType::EPOLLOUT) {
441        bits |= WRITABLE_BIT;
442    }
443    // EPOLLHUP/EPOLLERR are always reported by epoll when present.
444    bits |= HUP_BIT;
445    bits |= ERR_BIT;
446    bits
447}
448
449fn prime_immediate_writable_if_applicable(
450    event: &EpollFd,
451    fd_guard: &InodeValFilePollGuard,
452    epoll_state: &Arc<EpollState>,
453    sub_state: &Arc<EpollSubState>,
454) {
455    if !event.events().contains(EpollType::EPOLLOUT) {
456        return;
457    }
458
459    // Some fd kinds are effectively writable immediately (for example eventfd-like
460    // notifications and pipe write-ends), but may not emit a writable transition.
461    // Prime EPOLLOUT once at registration so level-triggered epoll can observe them.
462    let writable_now = matches!(
463        fd_guard.mode,
464        InodeValFilePollGuardMode::EventNotifications(_) | InodeValFilePollGuardMode::PipeTx { .. }
465    );
466
467    if !writable_now {
468        return;
469    }
470
471    sub_state
472        .pending_bits
473        .fetch_or(WRITABLE_BIT, Ordering::AcqRel);
474    if sub_state.mark_enqueued() {
475        epoll_state.enqueue_ready(event.fd(), sub_state.generation());
476    }
477}
478
479/// Re-enqueues a subscription if new pending bits arrived during/after consumer drain.
480fn repair_ready_queue_after_drain(
481    epoll_state: &Arc<EpollState>,
482    fd: WasiFd,
483    sub_state: &Arc<EpollSubState>,
484) {
485    if sub_state.pending_bits() != 0 && sub_state.mark_enqueued() {
486        epoll_state.enqueue_ready(fd, sub_state.generation());
487    }
488}
489
490/// Drains ready items into `(EpollFd, readiness)` events up to `maxevents`.
491///
492/// This is the consumer hot path used by `epoll_wait`. It is designed to be:
493/// - O(number of dequeued ready items)
494/// - tolerant of stale queue entries
495/// - race-safe with producers setting bits concurrently
496pub(crate) fn drain_ready_events(
497    epoll_state: &Arc<EpollState>,
498    maxevents: usize,
499) -> Vec<(EpollFd, EpollType)> {
500    let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
501    while ret.len() < maxevents {
502        let Some(item) = epoll_state.dequeue_ready() else {
503            break;
504        };
505
506        let Some(sub_state) = epoll_state.subscription(item.fd) else {
507            epoll_empty_dequeue_entry();
508            continue;
509        };
510
511        if sub_state.generation() != item.generation {
512            epoll_stale_generation_drop();
513            continue;
514        }
515
516        let bits = sub_state.take_pending_bits();
517        sub_state.clear_enqueued();
518
519        if bits == 0 {
520            repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
521            epoll_empty_dequeue_entry();
522            continue;
523        }
524
525        let event = sub_state.fd_meta();
526        let mut undispatched_bits = bits & epoll_mask_to_pending_bits(event.events());
527        for readiness in pending_bits_to_events(bits, event.events()) {
528            if ret.len() >= maxevents {
529                break;
530            }
531            ret.push((event.clone(), readiness));
532            if let Some(bit) = epoll_type_to_pending_bit(readiness) {
533                undispatched_bits &= !bit;
534            }
535        }
536
537        if undispatched_bits != 0 {
538            sub_state
539                .pending_bits
540                .fetch_or(undispatched_bits, Ordering::AcqRel);
541        }
542        repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
543
544        if ret.len() >= maxevents {
545            break;
546        }
547    }
548    ret
549}
550
551#[derive(Debug)]
552struct EpollHandler {
553    /// Watched fd associated with the subscription.
554    fd: WasiFd,
555    /// Parent epoll state for queueing and wakeups.
556    epoll_state: Arc<EpollState>,
557    /// Per-subscription state updated by interest callbacks.
558    sub_state: Arc<EpollSubState>,
559}
560
561impl EpollHandler {
562    fn new(fd: WasiFd, epoll_state: Arc<EpollState>, sub_state: Arc<EpollSubState>) -> Box<Self> {
563        Box::new(Self {
564            fd,
565            epoll_state,
566            sub_state,
567        })
568    }
569}
570
571impl InterestHandler for EpollHandler {
572    /// Producer path:
573    /// set pending bits, enqueue once, and wake one waiter.
574    fn push_interest(&mut self, interest: InterestType) {
575        EPOLL_ENQUEUE_ATTEMPTS.fetch_add(1, Ordering::Relaxed);
576        let bit = interest_to_pending_bit(interest);
577        if !self.sub_state.set_pending(bit) {
578            EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
579            return;
580        }
581
582        if self.sub_state.mark_enqueued() {
583            self.epoll_state
584                .enqueue_ready(self.fd, self.sub_state.generation());
585        } else {
586            EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
587        }
588    }
589
590    /// Clears one readiness bit from this subscription only.
591    fn pop_interest(&mut self, interest: InterestType) -> bool {
592        let bit = interest_to_pending_bit(interest);
593        let old = self
594            .sub_state
595            .pending_bits
596            .fetch_and(!bit, Ordering::AcqRel);
597        (old & bit) != 0
598    }
599
600    /// Checks whether this subscription currently has a readiness bit set.
601    fn has_interest(&self, interest: InterestType) -> bool {
602        let bit = interest_to_pending_bit(interest);
603        (self.sub_state.pending_bits() & bit) != 0
604    }
605}
606
607/// Registers an epoll interest handler on the watched fd and returns a guard.
608///
609/// `None` means the fd kind does not support handler attachment for epoll.
610pub(crate) fn register_epoll_handler(
611    state: &Arc<WasiState>,
612    event: &EpollFd,
613    epoll_state: Arc<EpollState>,
614    sub_state: Arc<EpollSubState>,
615) -> Result<Option<EpollJoinGuard>, Errno> {
616    let mut type_ = Eventtype::FdRead;
617    let mut peb = PollEventBuilder::new();
618    if event.events().contains(EpollType::EPOLLOUT) {
619        type_ = Eventtype::FdWrite;
620        peb = peb.add(PollEvent::PollOut);
621    }
622    if event.events().contains(EpollType::EPOLLIN) {
623        type_ = Eventtype::FdRead;
624        peb = peb.add(PollEvent::PollIn);
625    }
626    // EPOLLERR/EPOLLHUP are always delivered by epoll regardless of requested mask.
627    peb = peb.add(PollEvent::PollError);
628    peb = peb.add(PollEvent::PollHangUp);
629
630    let s = Subscription {
631        userdata: event.data2(),
632        type_,
633        data: SubscriptionUnion {
634            fd_readwrite: SubscriptionFsReadwrite {
635                file_descriptor: event.fd(),
636            },
637        },
638    };
639
640    let fd_guard = poll_fd_guard(state, peb.build(), event.fd(), s)?;
641    let handler = EpollHandler::new(event.fd(), epoll_state.clone(), sub_state.clone());
642
643    match &fd_guard.mode {
644        InodeValFilePollGuardMode::File(_) => {
645            // Intentionally ignored, epoll doesn't work with files
646            return Ok(None);
647        }
648        InodeValFilePollGuardMode::Socket { inner, .. } => {
649            let mut inner = inner.protected.write().unwrap();
650            inner.set_handler(handler).map_err(net_error_into_io_err)?;
651            drop(inner);
652        }
653        InodeValFilePollGuardMode::EventNotifications(inner) => inner.set_interest_handler(handler),
654        InodeValFilePollGuardMode::DuplexPipe { pipe } => {
655            let inner = pipe.write().unwrap();
656            inner.set_interest_handler(handler);
657        }
658        InodeValFilePollGuardMode::PipeRx { rx } => {
659            let inner = rx.write().unwrap();
660            inner.set_interest_handler(handler);
661        }
662        InodeValFilePollGuardMode::PipeTx { .. } => {
663            // The sending end of a pipe can't have an interest handler, since we
664            // only support "readable" interest on pipes; they're considered to
665            // always be writable.
666            prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
667            return Ok(None);
668        }
669    }
670
671    prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
672
673    Ok(Some(EpollJoinGuard::new(fd_guard)))
674}
675
676/// Increments stale-generation dequeue metric.
677pub(crate) fn epoll_stale_generation_drop() {
678    EPOLL_STALE_GENERATION_DROPS.fetch_add(1, Ordering::Relaxed);
679}
680
681/// Increments empty dequeue metric.
682pub(crate) fn epoll_empty_dequeue_entry() {
683    EPOLL_EMPTY_DEQUEUE_ENTRIES.fetch_add(1, Ordering::Relaxed);
684}
685
686#[cfg(test)]
687mod tests {
688    use super::*;
689    use std::sync::RwLock;
690    use virtual_fs::Pipe;
691
692    fn test_epoll_event_ctl(fd: WasiFd) -> EpollEventCtl {
693        EpollEventCtl {
694            events: EpollType::EPOLLIN,
695            ptr: 77,
696            fd,
697            data1: 88,
698            data2: 99,
699        }
700    }
701
702    fn test_epoll_handler(fd: WasiFd) -> (Arc<EpollState>, Arc<EpollSubState>, Box<EpollHandler>) {
703        let epoll_state = Arc::new(EpollState::new());
704        let sub_state = Arc::new(EpollSubState::new(
705            EpollFd::new(
706                EpollType::EPOLLIN
707                    | EpollType::EPOLLOUT
708                    | EpollType::EPOLLERR
709                    | EpollType::EPOLLHUP,
710                0,
711                fd,
712                0,
713                0,
714            ),
715            1,
716        ));
717        let handler = EpollHandler::new(fd, epoll_state.clone(), sub_state.clone());
718        (epoll_state, sub_state, handler)
719    }
720
721    fn test_sub_state(fd: WasiFd, generation: u64) -> Arc<EpollSubState> {
722        Arc::new(EpollSubState::new(
723            EpollFd::new(
724                EpollType::EPOLLIN
725                    | EpollType::EPOLLOUT
726                    | EpollType::EPOLLERR
727                    | EpollType::EPOLLHUP,
728                0,
729                fd,
730                0,
731                0,
732            ),
733            generation,
734        ))
735    }
736
737    #[test]
738    fn epoll_fd_from_event_ctl_uses_explicit_fd() {
739        let event = test_epoll_event_ctl(1234);
740        let epoll_fd = EpollFd::from_event_ctl(5678, &event);
741        assert_eq!(epoll_fd.fd(), 5678);
742        assert_eq!(epoll_fd.ptr(), 77);
743        assert_eq!(epoll_fd.data1(), 88);
744        assert_eq!(epoll_fd.data2(), 99);
745        assert_eq!(epoll_fd.events(), EpollType::EPOLLIN);
746    }
747
748    #[test]
749    fn epoll_handler_pop_interest_is_scoped_to_fd() {
750        let epoll_state = Arc::new(EpollState::new());
751        let sub_state1 = Arc::new(EpollSubState::new(
752            EpollFd::new(EpollType::EPOLLIN, 0, 10, 0, 0),
753            1,
754        ));
755        let sub_state2 = Arc::new(EpollSubState::new(
756            EpollFd::new(EpollType::EPOLLIN, 0, 11, 0, 0),
757            1,
758        ));
759        let mut handler1 = EpollHandler::new(10, epoll_state.clone(), sub_state1.clone());
760        let mut handler2 = EpollHandler::new(11, epoll_state.clone(), sub_state2.clone());
761
762        handler1.push_interest(InterestType::Readable);
763        handler2.push_interest(InterestType::Readable);
764
765        assert!(handler1.has_interest(InterestType::Readable));
766        assert!(handler2.has_interest(InterestType::Readable));
767
768        assert!(handler1.pop_interest(InterestType::Readable));
769        assert!(!handler1.has_interest(InterestType::Readable));
770        assert!(
771            handler2.has_interest(InterestType::Readable),
772            "popping one fd interest must not clear another fd with the same readiness"
773        );
774
775        assert!(sub_state1.pending_bits() == 0);
776        assert!(sub_state2.pending_bits() != 0);
777        assert_eq!(epoll_state.ready.lock().unwrap().len(), 2);
778    }
779
780    #[test]
781    fn epoll_handler_dedupes_queue_until_consumer_drains() {
782        let (epoll_state, sub_state, mut handler) = test_epoll_handler(7);
783
784        handler.push_interest(InterestType::Readable);
785        handler.push_interest(InterestType::Readable);
786        handler.push_interest(InterestType::Writable);
787
788        assert_eq!(
789            epoll_state.ready.lock().unwrap().len(),
790            1,
791            "multiple pushes while enqueued must keep a single queue entry"
792        );
793        assert!(handler.has_interest(InterestType::Readable));
794        assert!(handler.has_interest(InterestType::Writable));
795
796        epoll_state.ready.lock().unwrap().pop_front().unwrap();
797        sub_state.take_pending_bits();
798        sub_state.clear_enqueued();
799
800        handler.push_interest(InterestType::Readable);
801        assert_eq!(
802            epoll_state.ready.lock().unwrap().len(),
803            1,
804            "after drain, a new event should enqueue again"
805        );
806    }
807
808    #[test]
809    fn epoll_type_to_pending_bit_has_stable_mapping() {
810        assert_eq!(
811            epoll_type_to_pending_bit(EpollType::EPOLLIN),
812            Some(READABLE_BIT)
813        );
814        assert_eq!(
815            epoll_type_to_pending_bit(EpollType::EPOLLOUT),
816            Some(WRITABLE_BIT)
817        );
818        assert_eq!(
819            epoll_type_to_pending_bit(EpollType::EPOLLHUP),
820            Some(HUP_BIT)
821        );
822        assert_eq!(
823            epoll_type_to_pending_bit(EpollType::EPOLLERR),
824            Some(ERR_BIT)
825        );
826    }
827
828    #[test]
829    fn interest_to_pending_bit_has_stable_mapping() {
830        assert_eq!(
831            interest_to_pending_bit(InterestType::Readable),
832            READABLE_BIT
833        );
834        assert_eq!(
835            interest_to_pending_bit(InterestType::Writable),
836            WRITABLE_BIT
837        );
838        assert_eq!(interest_to_pending_bit(InterestType::Closed), HUP_BIT);
839        assert_eq!(interest_to_pending_bit(InterestType::Error), ERR_BIT);
840    }
841
842    #[test]
843    fn pending_bits_to_events_always_includes_hup_and_err() {
844        let events = pending_bits_to_events(HUP_BIT | ERR_BIT, EpollType::EPOLLIN);
845        assert_eq!(events, vec![EpollType::EPOLLHUP, EpollType::EPOLLERR]);
846    }
847
848    #[test]
849    fn epoll_mask_to_pending_bits_always_tracks_hup_and_err() {
850        let bits = epoll_mask_to_pending_bits(EpollType::empty());
851        assert_eq!(bits & HUP_BIT, HUP_BIT);
852        assert_eq!(bits & ERR_BIT, ERR_BIT);
853    }
854
855    #[test]
856    fn drain_ready_events_keeps_multi_fd_same_readiness_isolated() {
857        let epoll_state = Arc::new(EpollState::new());
858
859        let sub_a = test_sub_state(10, 1);
860        let sub_b = test_sub_state(11, 1);
861        let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
862
863        sub_a.pending_bits.store(readable_bit, Ordering::Release);
864        sub_a.enqueued.store(true, Ordering::Release);
865        sub_b.pending_bits.store(readable_bit, Ordering::Release);
866        sub_b.enqueued.store(true, Ordering::Release);
867
868        epoll_state.insert_subscription(10, sub_a);
869        epoll_state.insert_subscription(11, sub_b);
870        epoll_state.enqueue_ready(10, 1);
871        epoll_state.enqueue_ready(11, 1);
872
873        let events = drain_ready_events(&epoll_state, 8);
874        assert_eq!(events.len(), 2);
875        assert_eq!(events[0].0.fd(), 10);
876        assert_eq!(events[1].0.fd(), 11);
877        assert_eq!(events[0].1, EpollType::EPOLLIN);
878        assert_eq!(events[1].1, EpollType::EPOLLIN);
879    }
880
881    #[test]
882    fn drain_ready_events_requeues_undispatched_bits_when_budget_exhausted() {
883        let epoll_state = Arc::new(EpollState::new());
884        let sub = Arc::new(EpollSubState::new(
885            EpollFd::new(EpollType::EPOLLIN | EpollType::EPOLLOUT, 0, 90, 0, 0),
886            1,
887        ));
888
889        sub.pending_bits
890            .store(READABLE_BIT | WRITABLE_BIT, Ordering::Release);
891        sub.enqueued.store(true, Ordering::Release);
892        epoll_state.insert_subscription(90, sub.clone());
893        epoll_state.enqueue_ready(90, 1);
894
895        let first = drain_ready_events(&epoll_state, 1);
896        assert_eq!(first.len(), 1);
897        assert_eq!(first[0].1, EpollType::EPOLLIN);
898        assert_eq!(sub.pending_bits(), WRITABLE_BIT);
899        assert!(sub.enqueued.load(Ordering::Acquire));
900        assert_eq!(epoll_state.ready.lock().unwrap().len(), 1);
901
902        let second = drain_ready_events(&epoll_state, 1);
903        assert_eq!(second.len(), 1);
904        assert_eq!(second[0].1, EpollType::EPOLLOUT);
905        assert_eq!(sub.pending_bits(), 0);
906        assert!(!sub.enqueued.load(Ordering::Acquire));
907        assert_eq!(epoll_state.ready.lock().unwrap().len(), 0);
908    }
909
910    #[test]
911    fn drain_ready_events_drops_stale_generation_items() {
912        let epoll_state = Arc::new(EpollState::new());
913
914        let sub = test_sub_state(22, 2);
915        let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
916        sub.pending_bits.store(readable_bit, Ordering::Release);
917        sub.enqueued.store(true, Ordering::Release);
918
919        epoll_state.insert_subscription(22, sub.clone());
920        epoll_state.enqueue_ready(22, 1);
921
922        let events = drain_ready_events(&epoll_state, 8);
923        assert!(
924            events.is_empty(),
925            "stale generation items must not emit events"
926        );
927        assert_eq!(
928            sub.pending_bits.load(Ordering::Acquire),
929            readable_bit,
930            "stale dequeue must not clear pending bits for current generation"
931        );
932    }
933
934    #[test]
935    fn repair_ready_queue_after_drain_requeues_when_new_bits_arrive() {
936        let epoll_state = Arc::new(EpollState::new());
937        let sub = test_sub_state(44, 3);
938        let writable_bit = epoll_type_to_pending_bit(EpollType::EPOLLOUT).unwrap();
939        sub.pending_bits.store(writable_bit, Ordering::Release);
940        sub.enqueued.store(false, Ordering::Release);
941
942        repair_ready_queue_after_drain(&epoll_state, 44, &sub);
943
944        assert!(sub.enqueued.load(Ordering::Acquire));
945        let queued = epoll_state.ready.lock().unwrap().pop_front().unwrap();
946        assert_eq!(queued.fd, 44);
947    }
948
949    #[test]
950    fn apply_del_detaches_joins_even_if_subscription_stays_alive() {
951        let epoll_state = Arc::new(EpollState::new());
952        let event = test_epoll_event_ctl(55);
953        let sub = Arc::new(EpollSubState::new(EpollFd::from_event_ctl(55, &event), 1));
954        epoll_state.insert_subscription(55, sub.clone());
955
956        let (tx, _rx) = Pipe::new().split();
957        sub.add_join(EpollJoinGuard::new(InodeValFilePollGuard {
958            fd: 55,
959            peb: PollEventBuilder::new().build(),
960            subscription: Subscription {
961                userdata: 0,
962                type_: Eventtype::FdRead,
963                data: SubscriptionUnion {
964                    fd_readwrite: SubscriptionFsReadwrite {
965                        file_descriptor: 55,
966                    },
967                },
968            },
969            mode: InodeValFilePollGuardMode::PipeTx {
970                tx: Arc::new(RwLock::new(Box::new(tx))),
971            },
972        }));
973
974        let leaked_ref = sub.clone();
975        assert_eq!(leaked_ref.joins.lock().unwrap().len(), 1);
976
977        epoll_state.apply_del(55).unwrap();
978
979        assert_eq!(leaked_ref.joins.lock().unwrap().len(), 0);
980    }
981}