1use std::{
41 collections::VecDeque,
42 sync::{
43 Arc, Mutex as StdMutex,
44 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
45 },
46};
47
48use fnv::FnvHashMap;
49use serde::{Deserialize, Serialize};
50use tokio::sync::Notify;
51use virtual_mio::{InterestHandler, InterestType};
52use virtual_net::net_error_into_io_err;
53use wasmer_wasix_types::wasi::{
54 EpollEventCtl, EpollType, Errno, Eventtype, Fd as WasiFd, Subscription,
55 SubscriptionFsReadwrite, SubscriptionUnion,
56};
57
58use crate::{
59 fs::{InodeValFilePollGuard, InodeValFilePollGuardMode},
60 state::{PollEvent, PollEventBuilder, WasiState},
61 syscalls::poll_fd_guard,
62};
63
64const READABLE_BIT: u8 = 1 << 0;
65const WRITABLE_BIT: u8 = 1 << 1;
66const HUP_BIT: u8 = 1 << 2;
67const ERR_BIT: u8 = 1 << 3;
68
69static EPOLL_ENQUEUE_ATTEMPTS: AtomicU64 = AtomicU64::new(0);
70static EPOLL_ENQUEUE_DEDUPE_HITS: AtomicU64 = AtomicU64::new(0);
71static EPOLL_STALE_GENERATION_DROPS: AtomicU64 = AtomicU64::new(0);
72static EPOLL_EMPTY_DEQUEUE_ENTRIES: AtomicU64 = AtomicU64::new(0);
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct EpollFd {
76 events: EpollType,
78 ptr: u64,
80 fd: WasiFd,
82 data1: u32,
84 data2: u64,
86}
87
88impl EpollFd {
89 pub fn new(events: EpollType, ptr: u64, fd: WasiFd, data1: u32, data2: u64) -> Self {
91 Self {
92 events,
93 ptr,
94 fd,
95 data1,
96 data2,
97 }
98 }
99
100 pub fn from_event_ctl(fd: WasiFd, event: &EpollEventCtl) -> Self {
102 Self::new(event.events, event.ptr, fd, event.data1, event.data2)
103 }
104
105 pub fn events(&self) -> EpollType {
107 self.events
108 }
109
110 pub fn ptr(&self) -> u64 {
112 self.ptr
113 }
114
115 pub fn fd(&self) -> WasiFd {
117 self.fd
118 }
119
120 pub fn data1(&self) -> u32 {
122 self.data1
123 }
124
125 pub fn data2(&self) -> u64 {
127 self.data2
128 }
129}
130
131#[derive(Debug)]
132pub struct EpollJoinGuard {
133 fd_guard: InodeValFilePollGuard,
135}
136
137impl EpollJoinGuard {
138 fn new(fd_guard: InodeValFilePollGuard) -> Self {
139 Self { fd_guard }
140 }
141}
142
143impl Drop for EpollJoinGuard {
144 fn drop(&mut self) {
145 match &self.fd_guard.mode {
147 InodeValFilePollGuardMode::File(_) => {
148 }
150 InodeValFilePollGuardMode::Socket { inner } => {
151 let mut inner = inner.protected.write().unwrap();
152 inner.remove_handler();
153 }
154 InodeValFilePollGuardMode::EventNotifications(inner) => {
155 inner.remove_interest_handler();
156 }
157 InodeValFilePollGuardMode::DuplexPipe { pipe } => {
158 let inner = pipe.write().unwrap();
159 inner.remove_interest_handler();
160 }
161 InodeValFilePollGuardMode::PipeRx { rx } => {
162 let inner = rx.write().unwrap();
163 inner.remove_interest_handler();
164 }
165 InodeValFilePollGuardMode::PipeTx { .. } => {
166 }
168 }
169 }
170}
171
172#[derive(Debug)]
173pub struct EpollState {
174 subscriptions: StdMutex<FnvHashMap<WasiFd, Arc<EpollSubState>>>,
176 ready: StdMutex<VecDeque<ReadyItem>>,
178 notify: Notify,
180}
181
182impl Default for EpollState {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl EpollState {
189 pub fn new() -> Self {
191 Self {
192 subscriptions: StdMutex::new(FnvHashMap::default()),
193 ready: StdMutex::new(VecDeque::new()),
194 notify: Notify::new(),
195 }
196 }
197
198 #[cfg(test)]
199 fn insert_subscription(&self, fd: WasiFd, state: Arc<EpollSubState>) {
200 self.subscriptions.lock().unwrap().insert(fd, state);
201 }
202
203 fn restore_subscription(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
204 let mut subscriptions = self.subscriptions.lock().unwrap();
205 subscriptions.remove(&fd);
206 if let Some(previous) = previous {
207 subscriptions.insert(fd, previous);
208 }
209 }
210
211 fn subscription(&self, fd: WasiFd) -> Option<Arc<EpollSubState>> {
212 self.subscriptions.lock().unwrap().get(&fd).cloned()
213 }
214
215 fn enqueue_ready(&self, fd: WasiFd, generation: u64) {
216 self.ready
217 .lock()
218 .unwrap()
219 .push_back(ReadyItem { fd, generation });
220 self.notify.notify_one();
221 }
222
223 fn dequeue_ready(&self) -> Option<ReadyItem> {
224 self.ready.lock().unwrap().pop_front()
225 }
226
227 pub async fn wait(&self) {
229 self.notify.notified().await;
230 }
231
232 pub(crate) fn prepare_add(
233 &self,
234 fd: WasiFd,
235 event: &EpollEventCtl,
236 ) -> Result<(EpollFd, Arc<EpollSubState>), Errno> {
237 let mut subscriptions = self.subscriptions.lock().unwrap();
238 if subscriptions.contains_key(&fd) {
239 return Err(Errno::Exist);
240 }
241
242 let (epoll_fd, sub_state) = self.build_pending_subscription(fd, event, 1);
243 subscriptions.insert(fd, sub_state.clone());
244 Ok((epoll_fd, sub_state))
245 }
246
247 pub(crate) fn prepare_mod(
248 &self,
249 fd: WasiFd,
250 event: &EpollEventCtl,
251 ) -> Result<(EpollFd, Arc<EpollSubState>, Arc<EpollSubState>), Errno> {
252 let mut subscriptions = self.subscriptions.lock().unwrap();
253 let Some(previous) = subscriptions.remove(&fd) else {
254 return Err(Errno::Noent);
255 };
256 tracing::trace!(fd, "unregistering waker");
257
258 let (epoll_fd, sub_state) =
259 self.build_pending_subscription(fd, event, previous.next_generation());
260 subscriptions.insert(fd, sub_state.clone());
261 Ok((epoll_fd, sub_state, previous))
262 }
263
264 pub(crate) fn apply_del(&self, fd: WasiFd) -> Result<(), Errno> {
265 let removed = self
266 .subscriptions
267 .lock()
268 .unwrap()
269 .remove(&fd)
270 .ok_or(Errno::Noent)?;
271 removed.detach_joins();
272 Ok(())
273 }
274
275 pub(crate) fn rollback_registration(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
276 self.restore_subscription(fd, previous);
277 }
278
279 fn build_pending_subscription(
280 &self,
281 fd: WasiFd,
282 event: &EpollEventCtl,
283 generation: u64,
284 ) -> (EpollFd, Arc<EpollSubState>) {
285 let epoll_fd = EpollFd::from_event_ctl(fd, event);
286 tracing::trace!(
287 peb = ?event.events,
288 ptr = ?event.ptr,
289 data1 = event.data1,
290 data2 = event.data2,
291 fd,
292 "registering waker"
293 );
294 let sub_state = Arc::new(EpollSubState::new(epoll_fd.clone(), generation));
295 (epoll_fd, sub_state)
296 }
297}
298
299#[derive(Debug)]
300pub struct EpollSubState {
301 fd_meta: StdMutex<EpollFd>,
303 joins: StdMutex<Vec<EpollJoinGuard>>,
305 pending_bits: AtomicU8,
307 enqueued: AtomicBool,
309 generation: AtomicU64,
311}
312
313impl EpollSubState {
314 pub fn new(fd_meta: EpollFd, generation: u64) -> Self {
316 Self {
317 fd_meta: StdMutex::new(fd_meta),
318 joins: StdMutex::new(Vec::new()),
319 pending_bits: AtomicU8::new(0),
320 enqueued: AtomicBool::new(false),
321 generation: AtomicU64::new(generation),
322 }
323 }
324
325 pub fn next_generation(&self) -> u64 {
329 self.generation.load(Ordering::Acquire).saturating_add(1)
330 }
331
332 pub fn add_join(&self, join: EpollJoinGuard) {
334 self.joins.lock().unwrap().push(join);
335 }
336
337 pub fn detach_joins(&self) {
339 self.joins.lock().unwrap().clear();
340 }
341
342 fn generation(&self) -> u64 {
343 self.generation.load(Ordering::Acquire)
344 }
345
346 pub(crate) fn fd_meta(&self) -> EpollFd {
347 self.fd_meta.lock().unwrap().clone()
348 }
349
350 fn set_pending(&self, bit: u8) -> bool {
351 let old_bits = self.pending_bits.fetch_or(bit, Ordering::AcqRel);
352 (old_bits & bit) == 0
353 }
354
355 fn take_pending_bits(&self) -> u8 {
356 self.pending_bits.swap(0, Ordering::AcqRel)
357 }
358
359 fn pending_bits(&self) -> u8 {
360 self.pending_bits.load(Ordering::Acquire)
361 }
362
363 fn mark_enqueued(&self) -> bool {
364 self.enqueued
365 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
366 .is_ok()
367 }
368
369 fn clear_enqueued(&self) {
370 self.enqueued.store(false, Ordering::Release);
371 }
372}
373
374#[derive(Debug, Clone, Copy)]
375struct ReadyItem {
376 fd: WasiFd,
378 generation: u64,
380}
381
382pub(crate) fn epoll_type_to_pending_bit(readiness: EpollType) -> Option<u8> {
384 if readiness == EpollType::EPOLLIN {
385 Some(READABLE_BIT)
386 } else if readiness == EpollType::EPOLLOUT {
387 Some(WRITABLE_BIT)
388 } else if readiness == EpollType::EPOLLHUP {
389 Some(HUP_BIT)
390 } else if readiness == EpollType::EPOLLERR {
391 Some(ERR_BIT)
392 } else {
393 None
394 }
395}
396
397fn interest_to_pending_bit(interest: InterestType) -> u8 {
399 match interest {
400 InterestType::Readable => READABLE_BIT,
401 InterestType::Writable => WRITABLE_BIT,
402 InterestType::Closed => HUP_BIT,
403 InterestType::Error => ERR_BIT,
404 }
405}
406
407fn pending_bits_to_events(bits: u8, mask: EpollType) -> Vec<EpollType> {
409 let mut events = Vec::with_capacity(4);
410 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLIN)
411 && (bits & bit) != 0
412 && mask.contains(EpollType::EPOLLIN)
413 {
414 events.push(EpollType::EPOLLIN);
415 }
416 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLOUT)
417 && (bits & bit) != 0
418 && mask.contains(EpollType::EPOLLOUT)
419 {
420 events.push(EpollType::EPOLLOUT);
421 }
422 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLHUP)
423 && (bits & bit) != 0
424 {
425 events.push(EpollType::EPOLLHUP);
426 }
427 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLERR)
428 && (bits & bit) != 0
429 {
430 events.push(EpollType::EPOLLERR);
431 }
432 events
433}
434
435fn epoll_mask_to_pending_bits(mask: EpollType) -> u8 {
436 let mut bits = 0;
437 if mask.contains(EpollType::EPOLLIN) {
438 bits |= READABLE_BIT;
439 }
440 if mask.contains(EpollType::EPOLLOUT) {
441 bits |= WRITABLE_BIT;
442 }
443 bits |= HUP_BIT;
445 bits |= ERR_BIT;
446 bits
447}
448
449fn prime_immediate_writable_if_applicable(
450 event: &EpollFd,
451 fd_guard: &InodeValFilePollGuard,
452 epoll_state: &Arc<EpollState>,
453 sub_state: &Arc<EpollSubState>,
454) {
455 if !event.events().contains(EpollType::EPOLLOUT) {
456 return;
457 }
458
459 let writable_now = matches!(
463 fd_guard.mode,
464 InodeValFilePollGuardMode::EventNotifications(_) | InodeValFilePollGuardMode::PipeTx { .. }
465 );
466
467 if !writable_now {
468 return;
469 }
470
471 sub_state
472 .pending_bits
473 .fetch_or(WRITABLE_BIT, Ordering::AcqRel);
474 if sub_state.mark_enqueued() {
475 epoll_state.enqueue_ready(event.fd(), sub_state.generation());
476 }
477}
478
479fn repair_ready_queue_after_drain(
481 epoll_state: &Arc<EpollState>,
482 fd: WasiFd,
483 sub_state: &Arc<EpollSubState>,
484) {
485 if sub_state.pending_bits() != 0 && sub_state.mark_enqueued() {
486 epoll_state.enqueue_ready(fd, sub_state.generation());
487 }
488}
489
490pub(crate) fn drain_ready_events(
497 epoll_state: &Arc<EpollState>,
498 maxevents: usize,
499) -> Vec<(EpollFd, EpollType)> {
500 let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
501 while ret.len() < maxevents {
502 let Some(item) = epoll_state.dequeue_ready() else {
503 break;
504 };
505
506 let Some(sub_state) = epoll_state.subscription(item.fd) else {
507 epoll_empty_dequeue_entry();
508 continue;
509 };
510
511 if sub_state.generation() != item.generation {
512 epoll_stale_generation_drop();
513 continue;
514 }
515
516 let bits = sub_state.take_pending_bits();
517 sub_state.clear_enqueued();
518
519 if bits == 0 {
520 repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
521 epoll_empty_dequeue_entry();
522 continue;
523 }
524
525 let event = sub_state.fd_meta();
526 let mut undispatched_bits = bits & epoll_mask_to_pending_bits(event.events());
527 for readiness in pending_bits_to_events(bits, event.events()) {
528 if ret.len() >= maxevents {
529 break;
530 }
531 ret.push((event.clone(), readiness));
532 if let Some(bit) = epoll_type_to_pending_bit(readiness) {
533 undispatched_bits &= !bit;
534 }
535 }
536
537 if undispatched_bits != 0 {
538 sub_state
539 .pending_bits
540 .fetch_or(undispatched_bits, Ordering::AcqRel);
541 }
542 repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
543
544 if ret.len() >= maxevents {
545 break;
546 }
547 }
548 ret
549}
550
551#[derive(Debug)]
552struct EpollHandler {
553 fd: WasiFd,
555 epoll_state: Arc<EpollState>,
557 sub_state: Arc<EpollSubState>,
559}
560
561impl EpollHandler {
562 fn new(fd: WasiFd, epoll_state: Arc<EpollState>, sub_state: Arc<EpollSubState>) -> Box<Self> {
563 Box::new(Self {
564 fd,
565 epoll_state,
566 sub_state,
567 })
568 }
569}
570
571impl InterestHandler for EpollHandler {
572 fn push_interest(&mut self, interest: InterestType) {
575 EPOLL_ENQUEUE_ATTEMPTS.fetch_add(1, Ordering::Relaxed);
576 let bit = interest_to_pending_bit(interest);
577 if !self.sub_state.set_pending(bit) {
578 EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
579 return;
580 }
581
582 if self.sub_state.mark_enqueued() {
583 self.epoll_state
584 .enqueue_ready(self.fd, self.sub_state.generation());
585 } else {
586 EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
587 }
588 }
589
590 fn pop_interest(&mut self, interest: InterestType) -> bool {
592 let bit = interest_to_pending_bit(interest);
593 let old = self
594 .sub_state
595 .pending_bits
596 .fetch_and(!bit, Ordering::AcqRel);
597 (old & bit) != 0
598 }
599
600 fn has_interest(&self, interest: InterestType) -> bool {
602 let bit = interest_to_pending_bit(interest);
603 (self.sub_state.pending_bits() & bit) != 0
604 }
605}
606
607pub(crate) fn register_epoll_handler(
611 state: &Arc<WasiState>,
612 event: &EpollFd,
613 epoll_state: Arc<EpollState>,
614 sub_state: Arc<EpollSubState>,
615) -> Result<Option<EpollJoinGuard>, Errno> {
616 let mut type_ = Eventtype::FdRead;
617 let mut peb = PollEventBuilder::new();
618 if event.events().contains(EpollType::EPOLLOUT) {
619 type_ = Eventtype::FdWrite;
620 peb = peb.add(PollEvent::PollOut);
621 }
622 if event.events().contains(EpollType::EPOLLIN) {
623 type_ = Eventtype::FdRead;
624 peb = peb.add(PollEvent::PollIn);
625 }
626 peb = peb.add(PollEvent::PollError);
628 peb = peb.add(PollEvent::PollHangUp);
629
630 let s = Subscription {
631 userdata: event.data2(),
632 type_,
633 data: SubscriptionUnion {
634 fd_readwrite: SubscriptionFsReadwrite {
635 file_descriptor: event.fd(),
636 },
637 },
638 };
639
640 let fd_guard = poll_fd_guard(state, peb.build(), event.fd(), s)?;
641 let handler = EpollHandler::new(event.fd(), epoll_state.clone(), sub_state.clone());
642
643 match &fd_guard.mode {
644 InodeValFilePollGuardMode::File(_) => {
645 return Ok(None);
647 }
648 InodeValFilePollGuardMode::Socket { inner, .. } => {
649 let mut inner = inner.protected.write().unwrap();
650 inner.set_handler(handler).map_err(net_error_into_io_err)?;
651 drop(inner);
652 }
653 InodeValFilePollGuardMode::EventNotifications(inner) => inner.set_interest_handler(handler),
654 InodeValFilePollGuardMode::DuplexPipe { pipe } => {
655 let inner = pipe.write().unwrap();
656 inner.set_interest_handler(handler);
657 }
658 InodeValFilePollGuardMode::PipeRx { rx } => {
659 let inner = rx.write().unwrap();
660 inner.set_interest_handler(handler);
661 }
662 InodeValFilePollGuardMode::PipeTx { .. } => {
663 prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
667 return Ok(None);
668 }
669 }
670
671 prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
672
673 Ok(Some(EpollJoinGuard::new(fd_guard)))
674}
675
676pub(crate) fn epoll_stale_generation_drop() {
678 EPOLL_STALE_GENERATION_DROPS.fetch_add(1, Ordering::Relaxed);
679}
680
681pub(crate) fn epoll_empty_dequeue_entry() {
683 EPOLL_EMPTY_DEQUEUE_ENTRIES.fetch_add(1, Ordering::Relaxed);
684}
685
686#[cfg(test)]
687mod tests {
688 use super::*;
689 use std::sync::RwLock;
690 use virtual_fs::Pipe;
691
692 fn test_epoll_event_ctl(fd: WasiFd) -> EpollEventCtl {
693 EpollEventCtl {
694 events: EpollType::EPOLLIN,
695 ptr: 77,
696 fd,
697 data1: 88,
698 data2: 99,
699 }
700 }
701
702 fn test_epoll_handler(fd: WasiFd) -> (Arc<EpollState>, Arc<EpollSubState>, Box<EpollHandler>) {
703 let epoll_state = Arc::new(EpollState::new());
704 let sub_state = Arc::new(EpollSubState::new(
705 EpollFd::new(
706 EpollType::EPOLLIN
707 | EpollType::EPOLLOUT
708 | EpollType::EPOLLERR
709 | EpollType::EPOLLHUP,
710 0,
711 fd,
712 0,
713 0,
714 ),
715 1,
716 ));
717 let handler = EpollHandler::new(fd, epoll_state.clone(), sub_state.clone());
718 (epoll_state, sub_state, handler)
719 }
720
721 fn test_sub_state(fd: WasiFd, generation: u64) -> Arc<EpollSubState> {
722 Arc::new(EpollSubState::new(
723 EpollFd::new(
724 EpollType::EPOLLIN
725 | EpollType::EPOLLOUT
726 | EpollType::EPOLLERR
727 | EpollType::EPOLLHUP,
728 0,
729 fd,
730 0,
731 0,
732 ),
733 generation,
734 ))
735 }
736
737 #[test]
738 fn epoll_fd_from_event_ctl_uses_explicit_fd() {
739 let event = test_epoll_event_ctl(1234);
740 let epoll_fd = EpollFd::from_event_ctl(5678, &event);
741 assert_eq!(epoll_fd.fd(), 5678);
742 assert_eq!(epoll_fd.ptr(), 77);
743 assert_eq!(epoll_fd.data1(), 88);
744 assert_eq!(epoll_fd.data2(), 99);
745 assert_eq!(epoll_fd.events(), EpollType::EPOLLIN);
746 }
747
748 #[test]
749 fn epoll_handler_pop_interest_is_scoped_to_fd() {
750 let epoll_state = Arc::new(EpollState::new());
751 let sub_state1 = Arc::new(EpollSubState::new(
752 EpollFd::new(EpollType::EPOLLIN, 0, 10, 0, 0),
753 1,
754 ));
755 let sub_state2 = Arc::new(EpollSubState::new(
756 EpollFd::new(EpollType::EPOLLIN, 0, 11, 0, 0),
757 1,
758 ));
759 let mut handler1 = EpollHandler::new(10, epoll_state.clone(), sub_state1.clone());
760 let mut handler2 = EpollHandler::new(11, epoll_state.clone(), sub_state2.clone());
761
762 handler1.push_interest(InterestType::Readable);
763 handler2.push_interest(InterestType::Readable);
764
765 assert!(handler1.has_interest(InterestType::Readable));
766 assert!(handler2.has_interest(InterestType::Readable));
767
768 assert!(handler1.pop_interest(InterestType::Readable));
769 assert!(!handler1.has_interest(InterestType::Readable));
770 assert!(
771 handler2.has_interest(InterestType::Readable),
772 "popping one fd interest must not clear another fd with the same readiness"
773 );
774
775 assert!(sub_state1.pending_bits() == 0);
776 assert!(sub_state2.pending_bits() != 0);
777 assert_eq!(epoll_state.ready.lock().unwrap().len(), 2);
778 }
779
780 #[test]
781 fn epoll_handler_dedupes_queue_until_consumer_drains() {
782 let (epoll_state, sub_state, mut handler) = test_epoll_handler(7);
783
784 handler.push_interest(InterestType::Readable);
785 handler.push_interest(InterestType::Readable);
786 handler.push_interest(InterestType::Writable);
787
788 assert_eq!(
789 epoll_state.ready.lock().unwrap().len(),
790 1,
791 "multiple pushes while enqueued must keep a single queue entry"
792 );
793 assert!(handler.has_interest(InterestType::Readable));
794 assert!(handler.has_interest(InterestType::Writable));
795
796 epoll_state.ready.lock().unwrap().pop_front().unwrap();
797 sub_state.take_pending_bits();
798 sub_state.clear_enqueued();
799
800 handler.push_interest(InterestType::Readable);
801 assert_eq!(
802 epoll_state.ready.lock().unwrap().len(),
803 1,
804 "after drain, a new event should enqueue again"
805 );
806 }
807
808 #[test]
809 fn epoll_type_to_pending_bit_has_stable_mapping() {
810 assert_eq!(
811 epoll_type_to_pending_bit(EpollType::EPOLLIN),
812 Some(READABLE_BIT)
813 );
814 assert_eq!(
815 epoll_type_to_pending_bit(EpollType::EPOLLOUT),
816 Some(WRITABLE_BIT)
817 );
818 assert_eq!(
819 epoll_type_to_pending_bit(EpollType::EPOLLHUP),
820 Some(HUP_BIT)
821 );
822 assert_eq!(
823 epoll_type_to_pending_bit(EpollType::EPOLLERR),
824 Some(ERR_BIT)
825 );
826 }
827
828 #[test]
829 fn interest_to_pending_bit_has_stable_mapping() {
830 assert_eq!(
831 interest_to_pending_bit(InterestType::Readable),
832 READABLE_BIT
833 );
834 assert_eq!(
835 interest_to_pending_bit(InterestType::Writable),
836 WRITABLE_BIT
837 );
838 assert_eq!(interest_to_pending_bit(InterestType::Closed), HUP_BIT);
839 assert_eq!(interest_to_pending_bit(InterestType::Error), ERR_BIT);
840 }
841
842 #[test]
843 fn pending_bits_to_events_always_includes_hup_and_err() {
844 let events = pending_bits_to_events(HUP_BIT | ERR_BIT, EpollType::EPOLLIN);
845 assert_eq!(events, vec![EpollType::EPOLLHUP, EpollType::EPOLLERR]);
846 }
847
848 #[test]
849 fn epoll_mask_to_pending_bits_always_tracks_hup_and_err() {
850 let bits = epoll_mask_to_pending_bits(EpollType::empty());
851 assert_eq!(bits & HUP_BIT, HUP_BIT);
852 assert_eq!(bits & ERR_BIT, ERR_BIT);
853 }
854
855 #[test]
856 fn drain_ready_events_keeps_multi_fd_same_readiness_isolated() {
857 let epoll_state = Arc::new(EpollState::new());
858
859 let sub_a = test_sub_state(10, 1);
860 let sub_b = test_sub_state(11, 1);
861 let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
862
863 sub_a.pending_bits.store(readable_bit, Ordering::Release);
864 sub_a.enqueued.store(true, Ordering::Release);
865 sub_b.pending_bits.store(readable_bit, Ordering::Release);
866 sub_b.enqueued.store(true, Ordering::Release);
867
868 epoll_state.insert_subscription(10, sub_a);
869 epoll_state.insert_subscription(11, sub_b);
870 epoll_state.enqueue_ready(10, 1);
871 epoll_state.enqueue_ready(11, 1);
872
873 let events = drain_ready_events(&epoll_state, 8);
874 assert_eq!(events.len(), 2);
875 assert_eq!(events[0].0.fd(), 10);
876 assert_eq!(events[1].0.fd(), 11);
877 assert_eq!(events[0].1, EpollType::EPOLLIN);
878 assert_eq!(events[1].1, EpollType::EPOLLIN);
879 }
880
881 #[test]
882 fn drain_ready_events_requeues_undispatched_bits_when_budget_exhausted() {
883 let epoll_state = Arc::new(EpollState::new());
884 let sub = Arc::new(EpollSubState::new(
885 EpollFd::new(EpollType::EPOLLIN | EpollType::EPOLLOUT, 0, 90, 0, 0),
886 1,
887 ));
888
889 sub.pending_bits
890 .store(READABLE_BIT | WRITABLE_BIT, Ordering::Release);
891 sub.enqueued.store(true, Ordering::Release);
892 epoll_state.insert_subscription(90, sub.clone());
893 epoll_state.enqueue_ready(90, 1);
894
895 let first = drain_ready_events(&epoll_state, 1);
896 assert_eq!(first.len(), 1);
897 assert_eq!(first[0].1, EpollType::EPOLLIN);
898 assert_eq!(sub.pending_bits(), WRITABLE_BIT);
899 assert!(sub.enqueued.load(Ordering::Acquire));
900 assert_eq!(epoll_state.ready.lock().unwrap().len(), 1);
901
902 let second = drain_ready_events(&epoll_state, 1);
903 assert_eq!(second.len(), 1);
904 assert_eq!(second[0].1, EpollType::EPOLLOUT);
905 assert_eq!(sub.pending_bits(), 0);
906 assert!(!sub.enqueued.load(Ordering::Acquire));
907 assert_eq!(epoll_state.ready.lock().unwrap().len(), 0);
908 }
909
910 #[test]
911 fn drain_ready_events_drops_stale_generation_items() {
912 let epoll_state = Arc::new(EpollState::new());
913
914 let sub = test_sub_state(22, 2);
915 let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
916 sub.pending_bits.store(readable_bit, Ordering::Release);
917 sub.enqueued.store(true, Ordering::Release);
918
919 epoll_state.insert_subscription(22, sub.clone());
920 epoll_state.enqueue_ready(22, 1);
921
922 let events = drain_ready_events(&epoll_state, 8);
923 assert!(
924 events.is_empty(),
925 "stale generation items must not emit events"
926 );
927 assert_eq!(
928 sub.pending_bits.load(Ordering::Acquire),
929 readable_bit,
930 "stale dequeue must not clear pending bits for current generation"
931 );
932 }
933
934 #[test]
935 fn repair_ready_queue_after_drain_requeues_when_new_bits_arrive() {
936 let epoll_state = Arc::new(EpollState::new());
937 let sub = test_sub_state(44, 3);
938 let writable_bit = epoll_type_to_pending_bit(EpollType::EPOLLOUT).unwrap();
939 sub.pending_bits.store(writable_bit, Ordering::Release);
940 sub.enqueued.store(false, Ordering::Release);
941
942 repair_ready_queue_after_drain(&epoll_state, 44, &sub);
943
944 assert!(sub.enqueued.load(Ordering::Acquire));
945 let queued = epoll_state.ready.lock().unwrap().pop_front().unwrap();
946 assert_eq!(queued.fd, 44);
947 }
948
949 #[test]
950 fn apply_del_detaches_joins_even_if_subscription_stays_alive() {
951 let epoll_state = Arc::new(EpollState::new());
952 let event = test_epoll_event_ctl(55);
953 let sub = Arc::new(EpollSubState::new(EpollFd::from_event_ctl(55, &event), 1));
954 epoll_state.insert_subscription(55, sub.clone());
955
956 let (tx, _rx) = Pipe::new().split();
957 sub.add_join(EpollJoinGuard::new(InodeValFilePollGuard {
958 fd: 55,
959 peb: PollEventBuilder::new().build(),
960 subscription: Subscription {
961 userdata: 0,
962 type_: Eventtype::FdRead,
963 data: SubscriptionUnion {
964 fd_readwrite: SubscriptionFsReadwrite {
965 file_descriptor: 55,
966 },
967 },
968 },
969 mode: InodeValFilePollGuardMode::PipeTx {
970 tx: Arc::new(RwLock::new(Box::new(tx))),
971 },
972 }));
973
974 let leaked_ref = sub.clone();
975 assert_eq!(leaked_ref.joins.lock().unwrap().len(), 1);
976
977 epoll_state.apply_del(55).unwrap();
978
979 assert_eq!(leaked_ref.joins.lock().unwrap().len(), 0);
980 }
981}