1use std::{
41 collections::VecDeque,
42 sync::{
43 Arc, Mutex as StdMutex,
44 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
45 },
46};
47
48use fnv::FnvHashMap;
49use serde::{Deserialize, Serialize};
50use tokio::sync::Notify;
51use virtual_mio::{InterestHandler, InterestType};
52use virtual_net::net_error_into_io_err;
53use wasmer_wasix_types::wasi::{
54 EpollEventCtl, EpollType, Errno, Eventtype, Fd as WasiFd, Subscription,
55 SubscriptionFsReadwrite, SubscriptionUnion,
56};
57
58use crate::{
59 fs::{InodeValFilePollGuard, InodeValFilePollGuardMode},
60 state::{PollEvent, PollEventBuilder, WasiState},
61 syscalls::poll_fd_guard,
62};
63
64const READABLE_BIT: u8 = 1 << 0;
65const WRITABLE_BIT: u8 = 1 << 1;
66const HUP_BIT: u8 = 1 << 2;
67const ERR_BIT: u8 = 1 << 3;
68
69static EPOLL_ENQUEUE_ATTEMPTS: AtomicU64 = AtomicU64::new(0);
70static EPOLL_ENQUEUE_DEDUPE_HITS: AtomicU64 = AtomicU64::new(0);
71static EPOLL_STALE_GENERATION_DROPS: AtomicU64 = AtomicU64::new(0);
72static EPOLL_EMPTY_DEQUEUE_ENTRIES: AtomicU64 = AtomicU64::new(0);
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct EpollFd {
76 events: EpollType,
78 ptr: u64,
80 fd: WasiFd,
82 data1: u32,
84 data2: u64,
86}
87
88impl EpollFd {
89 pub fn new(events: EpollType, ptr: u64, fd: WasiFd, data1: u32, data2: u64) -> Self {
91 Self {
92 events,
93 ptr,
94 fd,
95 data1,
96 data2,
97 }
98 }
99
100 pub fn from_event_ctl(fd: WasiFd, event: &EpollEventCtl) -> Self {
102 Self::new(event.events, event.ptr, fd, event.data1, event.data2)
103 }
104
105 pub fn events(&self) -> EpollType {
107 self.events
108 }
109
110 pub fn ptr(&self) -> u64 {
112 self.ptr
113 }
114
115 pub fn fd(&self) -> WasiFd {
117 self.fd
118 }
119
120 pub fn data1(&self) -> u32 {
122 self.data1
123 }
124
125 pub fn data2(&self) -> u64 {
127 self.data2
128 }
129}
130
131#[derive(Debug)]
132pub struct EpollJoinGuard {
133 fd_guard: InodeValFilePollGuard,
135}
136
137impl EpollJoinGuard {
138 fn new(fd_guard: InodeValFilePollGuard) -> Self {
139 Self { fd_guard }
140 }
141}
142
143impl Drop for EpollJoinGuard {
144 fn drop(&mut self) {
145 match &self.fd_guard.mode {
147 InodeValFilePollGuardMode::File(_) => {
148 }
150 InodeValFilePollGuardMode::Socket { inner } => {
151 let mut inner = inner.protected.write().unwrap();
152 inner.remove_handler();
153 }
154 InodeValFilePollGuardMode::EventNotifications(inner) => {
155 inner.remove_interest_handler();
156 }
157 InodeValFilePollGuardMode::DuplexPipe { pipe } => {
158 let inner = pipe.write().unwrap();
159 inner.remove_interest_handler();
160 }
161 InodeValFilePollGuardMode::PipeRx { rx } => {
162 let inner = rx.write().unwrap();
163 inner.remove_interest_handler();
164 }
165 InodeValFilePollGuardMode::PipeTx { .. } => {
166 }
168 }
169 }
170}
171
172#[derive(Debug)]
173pub struct EpollState {
174 subscriptions: StdMutex<FnvHashMap<WasiFd, Arc<EpollSubState>>>,
176 ready: StdMutex<VecDeque<ReadyItem>>,
178 notify: Notify,
180}
181
182impl Default for EpollState {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl EpollState {
189 pub fn new() -> Self {
191 Self {
192 subscriptions: StdMutex::new(FnvHashMap::default()),
193 ready: StdMutex::new(VecDeque::new()),
194 notify: Notify::new(),
195 }
196 }
197
198 #[cfg(test)]
199 fn insert_subscription(&self, fd: WasiFd, state: Arc<EpollSubState>) {
200 self.subscriptions.lock().unwrap().insert(fd, state);
201 }
202
203 fn restore_subscription(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
204 let mut subscriptions = self.subscriptions.lock().unwrap();
205 subscriptions.remove(&fd);
206 if let Some(previous) = previous {
207 subscriptions.insert(fd, previous);
208 }
209 }
210
211 fn subscription(&self, fd: WasiFd) -> Option<Arc<EpollSubState>> {
212 self.subscriptions.lock().unwrap().get(&fd).cloned()
213 }
214
215 fn enqueue_ready(&self, fd: WasiFd, generation: u64) {
216 self.ready
217 .lock()
218 .unwrap()
219 .push_back(ReadyItem { fd, generation });
220 self.notify.notify_one();
221 }
222
223 fn dequeue_ready(&self) -> Option<ReadyItem> {
224 self.ready.lock().unwrap().pop_front()
225 }
226
227 pub async fn wait(&self) {
229 self.notify.notified().await;
230 }
231
232 pub(crate) fn prepare_add(
233 &self,
234 fd: WasiFd,
235 event: &EpollEventCtl,
236 ) -> Result<(EpollFd, Arc<EpollSubState>), Errno> {
237 let mut subscriptions = self.subscriptions.lock().unwrap();
238 if subscriptions.contains_key(&fd) {
239 return Err(Errno::Exist);
240 }
241
242 let (epoll_fd, sub_state) = self.build_pending_subscription(fd, event, 1);
243 subscriptions.insert(fd, sub_state.clone());
244 Ok((epoll_fd, sub_state))
245 }
246
247 pub(crate) fn prepare_mod(
248 &self,
249 fd: WasiFd,
250 event: &EpollEventCtl,
251 ) -> Result<(EpollFd, Arc<EpollSubState>, Arc<EpollSubState>), Errno> {
252 let mut subscriptions = self.subscriptions.lock().unwrap();
253 let Some(previous) = subscriptions.remove(&fd) else {
254 return Err(Errno::Noent);
255 };
256 tracing::trace!(fd, "unregistering waker");
257
258 let (epoll_fd, sub_state) =
259 self.build_pending_subscription(fd, event, previous.next_generation());
260 subscriptions.insert(fd, sub_state.clone());
261 Ok((epoll_fd, sub_state, previous))
262 }
263
264 pub(crate) fn apply_del(&self, fd: WasiFd) -> Result<(), Errno> {
265 let removed = self
266 .subscriptions
267 .lock()
268 .unwrap()
269 .remove(&fd)
270 .ok_or(Errno::Noent)?;
271 removed.detach_joins();
272 Ok(())
273 }
274
275 pub(crate) fn rollback_registration(&self, fd: WasiFd, previous: Option<Arc<EpollSubState>>) {
276 self.restore_subscription(fd, previous);
277 }
278
279 fn build_pending_subscription(
280 &self,
281 fd: WasiFd,
282 event: &EpollEventCtl,
283 generation: u64,
284 ) -> (EpollFd, Arc<EpollSubState>) {
285 let epoll_fd = EpollFd::from_event_ctl(fd, event);
286 tracing::trace!(
287 peb = ?event.events,
288 ptr = ?event.ptr,
289 data1 = event.data1,
290 data2 = event.data2,
291 fd,
292 "registering waker"
293 );
294 let sub_state = Arc::new(EpollSubState::new(epoll_fd.clone(), generation));
295 (epoll_fd, sub_state)
296 }
297}
298
299#[derive(Debug)]
300pub struct EpollSubState {
301 fd_meta: StdMutex<EpollFd>,
303 joins: StdMutex<Vec<EpollJoinGuard>>,
305 pending_bits: AtomicU8,
307 enqueued: AtomicBool,
309 generation: AtomicU64,
311}
312
313impl EpollSubState {
314 pub fn new(fd_meta: EpollFd, generation: u64) -> Self {
316 Self {
317 fd_meta: StdMutex::new(fd_meta),
318 joins: StdMutex::new(Vec::new()),
319 pending_bits: AtomicU8::new(0),
320 enqueued: AtomicBool::new(false),
321 generation: AtomicU64::new(generation),
322 }
323 }
324
325 pub fn next_generation(&self) -> u64 {
329 self.generation.load(Ordering::Acquire).saturating_add(1)
330 }
331
332 pub fn add_join(&self, join: EpollJoinGuard) {
334 self.joins.lock().unwrap().push(join);
335 }
336
337 pub fn detach_joins(&self) {
339 self.joins.lock().unwrap().clear();
340 }
341
342 fn generation(&self) -> u64 {
343 self.generation.load(Ordering::Acquire)
344 }
345
346 pub(crate) fn fd_meta(&self) -> EpollFd {
347 self.fd_meta.lock().unwrap().clone()
348 }
349
350 fn set_pending(&self, bit: u8) -> bool {
351 let old_bits = self.pending_bits.fetch_or(bit, Ordering::AcqRel);
352 (old_bits & bit) == 0
353 }
354
355 fn take_pending_bits(&self) -> u8 {
356 self.pending_bits.swap(0, Ordering::AcqRel)
357 }
358
359 fn pending_bits(&self) -> u8 {
360 self.pending_bits.load(Ordering::Acquire)
361 }
362
363 fn mark_enqueued(&self) -> bool {
364 self.enqueued
365 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
366 .is_ok()
367 }
368
369 fn clear_enqueued(&self) {
370 self.enqueued.store(false, Ordering::Release);
371 }
372}
373
374#[derive(Debug, Clone, Copy)]
375struct ReadyItem {
376 fd: WasiFd,
378 generation: u64,
380}
381
382pub(crate) fn epoll_type_to_pending_bit(readiness: EpollType) -> Option<u8> {
384 if readiness == EpollType::EPOLLIN {
385 Some(READABLE_BIT)
386 } else if readiness == EpollType::EPOLLOUT {
387 Some(WRITABLE_BIT)
388 } else if readiness == EpollType::EPOLLHUP {
389 Some(HUP_BIT)
390 } else if readiness == EpollType::EPOLLERR {
391 Some(ERR_BIT)
392 } else {
393 None
394 }
395}
396
397fn interest_to_pending_bit(interest: InterestType) -> u8 {
399 match interest {
400 InterestType::Readable => READABLE_BIT,
401 InterestType::Writable => WRITABLE_BIT,
402 InterestType::Closed => HUP_BIT,
403 InterestType::Error => ERR_BIT,
404 }
405}
406
407fn pending_bits_to_event(bits: u8, mask: EpollType) -> EpollType {
409 let mut event = EpollType::empty();
410 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLIN)
411 && (bits & bit) != 0
412 && mask.contains(EpollType::EPOLLIN)
413 {
414 event |= EpollType::EPOLLIN;
415 }
416 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLOUT)
417 && (bits & bit) != 0
418 && mask.contains(EpollType::EPOLLOUT)
419 {
420 event |= EpollType::EPOLLOUT;
421 }
422 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLHUP)
423 && (bits & bit) != 0
424 {
425 event |= EpollType::EPOLLHUP;
426 }
427 if let Some(bit) = epoll_type_to_pending_bit(EpollType::EPOLLERR)
428 && (bits & bit) != 0
429 {
430 event |= EpollType::EPOLLERR;
431 }
432 event
433}
434
435fn prime_immediate_writable_if_applicable(
436 event: &EpollFd,
437 fd_guard: &InodeValFilePollGuard,
438 epoll_state: &Arc<EpollState>,
439 sub_state: &Arc<EpollSubState>,
440) {
441 if !event.events().contains(EpollType::EPOLLOUT) {
442 return;
443 }
444
445 let writable_now = matches!(
449 fd_guard.mode,
450 InodeValFilePollGuardMode::EventNotifications(_) | InodeValFilePollGuardMode::PipeTx { .. }
451 );
452
453 if !writable_now {
454 return;
455 }
456
457 sub_state
458 .pending_bits
459 .fetch_or(WRITABLE_BIT, Ordering::AcqRel);
460 if sub_state.mark_enqueued() {
461 epoll_state.enqueue_ready(event.fd(), sub_state.generation());
462 }
463}
464
465fn repair_ready_queue_after_drain(
467 epoll_state: &Arc<EpollState>,
468 fd: WasiFd,
469 sub_state: &Arc<EpollSubState>,
470) {
471 if sub_state.pending_bits() != 0 && sub_state.mark_enqueued() {
472 epoll_state.enqueue_ready(fd, sub_state.generation());
473 }
474}
475
476pub(crate) fn drain_ready_events(
483 epoll_state: &Arc<EpollState>,
484 maxevents: usize,
485) -> Vec<(EpollFd, EpollType)> {
486 let mut ret: Vec<(EpollFd, EpollType)> = Vec::new();
487 while ret.len() < maxevents {
488 let Some(item) = epoll_state.dequeue_ready() else {
489 break;
490 };
491
492 let Some(sub_state) = epoll_state.subscription(item.fd) else {
493 epoll_empty_dequeue_entry();
494 continue;
495 };
496
497 if sub_state.generation() != item.generation {
498 epoll_stale_generation_drop();
499 continue;
500 }
501
502 let bits = sub_state.take_pending_bits();
503 sub_state.clear_enqueued();
504
505 if bits == 0 {
506 repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
507 epoll_empty_dequeue_entry();
508 continue;
509 }
510
511 let event = sub_state.fd_meta();
512 let readiness = pending_bits_to_event(bits, event.events());
513 if readiness != EpollType::empty() {
514 ret.push((event, readiness));
515 }
516 repair_ready_queue_after_drain(epoll_state, item.fd, &sub_state);
517
518 if ret.len() >= maxevents {
519 break;
520 }
521 }
522 ret
523}
524
525#[derive(Debug)]
526struct EpollHandler {
527 fd: WasiFd,
529 epoll_state: Arc<EpollState>,
531 sub_state: Arc<EpollSubState>,
533}
534
535impl EpollHandler {
536 fn new(fd: WasiFd, epoll_state: Arc<EpollState>, sub_state: Arc<EpollSubState>) -> Box<Self> {
537 Box::new(Self {
538 fd,
539 epoll_state,
540 sub_state,
541 })
542 }
543}
544
545impl InterestHandler for EpollHandler {
546 fn push_interest(&mut self, interest: InterestType) {
549 EPOLL_ENQUEUE_ATTEMPTS.fetch_add(1, Ordering::Relaxed);
550 let bit = interest_to_pending_bit(interest);
551 if !self.sub_state.set_pending(bit) {
552 EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
553 return;
554 }
555
556 if self.sub_state.mark_enqueued() {
557 self.epoll_state
558 .enqueue_ready(self.fd, self.sub_state.generation());
559 } else {
560 EPOLL_ENQUEUE_DEDUPE_HITS.fetch_add(1, Ordering::Relaxed);
561 }
562 }
563
564 fn pop_interest(&mut self, interest: InterestType) -> bool {
566 let bit = interest_to_pending_bit(interest);
567 let old = self
568 .sub_state
569 .pending_bits
570 .fetch_and(!bit, Ordering::AcqRel);
571 (old & bit) != 0
572 }
573
574 fn has_interest(&self, interest: InterestType) -> bool {
576 let bit = interest_to_pending_bit(interest);
577 (self.sub_state.pending_bits() & bit) != 0
578 }
579}
580
581pub(crate) fn register_epoll_handler(
585 state: &Arc<WasiState>,
586 event: &EpollFd,
587 epoll_state: Arc<EpollState>,
588 sub_state: Arc<EpollSubState>,
589) -> Result<Option<EpollJoinGuard>, Errno> {
590 let mut type_ = Eventtype::FdRead;
591 let mut peb = PollEventBuilder::new();
592 if event.events().contains(EpollType::EPOLLOUT) {
593 type_ = Eventtype::FdWrite;
594 peb = peb.add(PollEvent::PollOut);
595 }
596 if event.events().contains(EpollType::EPOLLIN) {
597 type_ = Eventtype::FdRead;
598 peb = peb.add(PollEvent::PollIn);
599 }
600 peb = peb.add(PollEvent::PollError);
602 peb = peb.add(PollEvent::PollHangUp);
603
604 let s = Subscription {
605 userdata: event.data2(),
606 type_,
607 data: SubscriptionUnion {
608 fd_readwrite: SubscriptionFsReadwrite {
609 file_descriptor: event.fd(),
610 },
611 },
612 };
613
614 let fd_guard = poll_fd_guard(state, peb.build(), event.fd(), s)?;
615 let handler = EpollHandler::new(event.fd(), epoll_state.clone(), sub_state.clone());
616
617 match &fd_guard.mode {
618 InodeValFilePollGuardMode::File(_) => {
619 return Ok(None);
621 }
622 InodeValFilePollGuardMode::Socket { inner, .. } => {
623 let mut inner = inner.protected.write().unwrap();
624 inner.set_handler(handler).map_err(net_error_into_io_err)?;
625 drop(inner);
626 }
627 InodeValFilePollGuardMode::EventNotifications(inner) => inner.set_interest_handler(handler),
628 InodeValFilePollGuardMode::DuplexPipe { pipe } => {
629 let inner = pipe.write().unwrap();
630 inner.set_interest_handler(handler);
631 }
632 InodeValFilePollGuardMode::PipeRx { rx } => {
633 let inner = rx.write().unwrap();
634 inner.set_interest_handler(handler);
635 }
636 InodeValFilePollGuardMode::PipeTx { .. } => {
637 prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
641 return Ok(None);
642 }
643 }
644
645 prime_immediate_writable_if_applicable(event, &fd_guard, &epoll_state, &sub_state);
646
647 Ok(Some(EpollJoinGuard::new(fd_guard)))
648}
649
650pub(crate) fn epoll_stale_generation_drop() {
652 EPOLL_STALE_GENERATION_DROPS.fetch_add(1, Ordering::Relaxed);
653}
654
655pub(crate) fn epoll_empty_dequeue_entry() {
657 EPOLL_EMPTY_DEQUEUE_ENTRIES.fetch_add(1, Ordering::Relaxed);
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663 use std::sync::RwLock;
664 use virtual_fs::Pipe;
665
666 fn test_epoll_event_ctl(fd: WasiFd) -> EpollEventCtl {
667 EpollEventCtl {
668 events: EpollType::EPOLLIN,
669 ptr: 77,
670 fd,
671 data1: 88,
672 data2: 99,
673 }
674 }
675
676 fn test_epoll_handler(fd: WasiFd) -> (Arc<EpollState>, Arc<EpollSubState>, Box<EpollHandler>) {
677 let epoll_state = Arc::new(EpollState::new());
678 let sub_state = Arc::new(EpollSubState::new(
679 EpollFd::new(
680 EpollType::EPOLLIN
681 | EpollType::EPOLLOUT
682 | EpollType::EPOLLERR
683 | EpollType::EPOLLHUP,
684 0,
685 fd,
686 0,
687 0,
688 ),
689 1,
690 ));
691 let handler = EpollHandler::new(fd, epoll_state.clone(), sub_state.clone());
692 (epoll_state, sub_state, handler)
693 }
694
695 fn test_sub_state(fd: WasiFd, generation: u64) -> Arc<EpollSubState> {
696 Arc::new(EpollSubState::new(
697 EpollFd::new(
698 EpollType::EPOLLIN
699 | EpollType::EPOLLOUT
700 | EpollType::EPOLLERR
701 | EpollType::EPOLLHUP,
702 0,
703 fd,
704 0,
705 0,
706 ),
707 generation,
708 ))
709 }
710
711 #[test]
712 fn epoll_fd_from_event_ctl_uses_explicit_fd() {
713 let event = test_epoll_event_ctl(1234);
714 let epoll_fd = EpollFd::from_event_ctl(5678, &event);
715 assert_eq!(epoll_fd.fd(), 5678);
716 assert_eq!(epoll_fd.ptr(), 77);
717 assert_eq!(epoll_fd.data1(), 88);
718 assert_eq!(epoll_fd.data2(), 99);
719 assert_eq!(epoll_fd.events(), EpollType::EPOLLIN);
720 }
721
722 #[test]
723 fn epoll_handler_pop_interest_is_scoped_to_fd() {
724 let epoll_state = Arc::new(EpollState::new());
725 let sub_state1 = Arc::new(EpollSubState::new(
726 EpollFd::new(EpollType::EPOLLIN, 0, 10, 0, 0),
727 1,
728 ));
729 let sub_state2 = Arc::new(EpollSubState::new(
730 EpollFd::new(EpollType::EPOLLIN, 0, 11, 0, 0),
731 1,
732 ));
733 let mut handler1 = EpollHandler::new(10, epoll_state.clone(), sub_state1.clone());
734 let mut handler2 = EpollHandler::new(11, epoll_state.clone(), sub_state2.clone());
735
736 handler1.push_interest(InterestType::Readable);
737 handler2.push_interest(InterestType::Readable);
738
739 assert!(handler1.has_interest(InterestType::Readable));
740 assert!(handler2.has_interest(InterestType::Readable));
741
742 assert!(handler1.pop_interest(InterestType::Readable));
743 assert!(!handler1.has_interest(InterestType::Readable));
744 assert!(
745 handler2.has_interest(InterestType::Readable),
746 "popping one fd interest must not clear another fd with the same readiness"
747 );
748
749 assert!(sub_state1.pending_bits() == 0);
750 assert!(sub_state2.pending_bits() != 0);
751 assert_eq!(epoll_state.ready.lock().unwrap().len(), 2);
752 }
753
754 #[test]
755 fn epoll_handler_dedupes_queue_until_consumer_drains() {
756 let (epoll_state, sub_state, mut handler) = test_epoll_handler(7);
757
758 handler.push_interest(InterestType::Readable);
759 handler.push_interest(InterestType::Readable);
760 handler.push_interest(InterestType::Writable);
761
762 assert_eq!(
763 epoll_state.ready.lock().unwrap().len(),
764 1,
765 "multiple pushes while enqueued must keep a single queue entry"
766 );
767 assert!(handler.has_interest(InterestType::Readable));
768 assert!(handler.has_interest(InterestType::Writable));
769
770 epoll_state.ready.lock().unwrap().pop_front().unwrap();
771 sub_state.take_pending_bits();
772 sub_state.clear_enqueued();
773
774 handler.push_interest(InterestType::Readable);
775 assert_eq!(
776 epoll_state.ready.lock().unwrap().len(),
777 1,
778 "after drain, a new event should enqueue again"
779 );
780 }
781
782 #[test]
783 fn epoll_type_to_pending_bit_has_stable_mapping() {
784 assert_eq!(
785 epoll_type_to_pending_bit(EpollType::EPOLLIN),
786 Some(READABLE_BIT)
787 );
788 assert_eq!(
789 epoll_type_to_pending_bit(EpollType::EPOLLOUT),
790 Some(WRITABLE_BIT)
791 );
792 assert_eq!(
793 epoll_type_to_pending_bit(EpollType::EPOLLHUP),
794 Some(HUP_BIT)
795 );
796 assert_eq!(
797 epoll_type_to_pending_bit(EpollType::EPOLLERR),
798 Some(ERR_BIT)
799 );
800 }
801
802 #[test]
803 fn interest_to_pending_bit_has_stable_mapping() {
804 assert_eq!(
805 interest_to_pending_bit(InterestType::Readable),
806 READABLE_BIT
807 );
808 assert_eq!(
809 interest_to_pending_bit(InterestType::Writable),
810 WRITABLE_BIT
811 );
812 assert_eq!(interest_to_pending_bit(InterestType::Closed), HUP_BIT);
813 assert_eq!(interest_to_pending_bit(InterestType::Error), ERR_BIT);
814 }
815
816 #[test]
817 fn pending_bits_to_event_always_includes_hup_and_err() {
818 let event = pending_bits_to_event(HUP_BIT | ERR_BIT, EpollType::EPOLLIN);
819 assert_eq!(event, EpollType::EPOLLHUP | EpollType::EPOLLERR);
820 }
821
822 #[test]
823 fn drain_ready_events_keeps_multi_fd_same_readiness_isolated() {
824 let epoll_state = Arc::new(EpollState::new());
825
826 let sub_a = test_sub_state(10, 1);
827 let sub_b = test_sub_state(11, 1);
828 let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
829
830 sub_a.pending_bits.store(readable_bit, Ordering::Release);
831 sub_a.enqueued.store(true, Ordering::Release);
832 sub_b.pending_bits.store(readable_bit, Ordering::Release);
833 sub_b.enqueued.store(true, Ordering::Release);
834
835 epoll_state.insert_subscription(10, sub_a);
836 epoll_state.insert_subscription(11, sub_b);
837 epoll_state.enqueue_ready(10, 1);
838 epoll_state.enqueue_ready(11, 1);
839
840 let events = drain_ready_events(&epoll_state, 8);
841 assert_eq!(events.len(), 2);
842 assert_eq!(events[0].0.fd(), 10);
843 assert_eq!(events[1].0.fd(), 11);
844 assert_eq!(events[0].1, EpollType::EPOLLIN);
845 assert_eq!(events[1].1, EpollType::EPOLLIN);
846 }
847
848 #[test]
849 fn drain_ready_events_coalesces_readiness_bits_for_one_fd() {
850 let epoll_state = Arc::new(EpollState::new());
851 let sub = Arc::new(EpollSubState::new(
852 EpollFd::new(EpollType::EPOLLIN | EpollType::EPOLLOUT, 0, 90, 0, 0),
853 1,
854 ));
855
856 sub.pending_bits
857 .store(READABLE_BIT | WRITABLE_BIT, Ordering::Release);
858 sub.enqueued.store(true, Ordering::Release);
859 epoll_state.insert_subscription(90, sub.clone());
860 epoll_state.enqueue_ready(90, 1);
861
862 let first = drain_ready_events(&epoll_state, 1);
863 assert_eq!(first.len(), 1);
864 assert_eq!(first[0].0.fd(), 90);
865 assert_eq!(first[0].1, EpollType::EPOLLIN | EpollType::EPOLLOUT);
866 assert_eq!(sub.pending_bits(), 0);
867 assert!(!sub.enqueued.load(Ordering::Acquire));
868 assert_eq!(epoll_state.ready.lock().unwrap().len(), 0);
869 }
870
871 #[test]
872 fn drain_ready_events_drops_stale_generation_items() {
873 let epoll_state = Arc::new(EpollState::new());
874
875 let sub = test_sub_state(22, 2);
876 let readable_bit = epoll_type_to_pending_bit(EpollType::EPOLLIN).unwrap();
877 sub.pending_bits.store(readable_bit, Ordering::Release);
878 sub.enqueued.store(true, Ordering::Release);
879
880 epoll_state.insert_subscription(22, sub.clone());
881 epoll_state.enqueue_ready(22, 1);
882
883 let events = drain_ready_events(&epoll_state, 8);
884 assert!(
885 events.is_empty(),
886 "stale generation items must not emit events"
887 );
888 assert_eq!(
889 sub.pending_bits.load(Ordering::Acquire),
890 readable_bit,
891 "stale dequeue must not clear pending bits for current generation"
892 );
893 }
894
895 #[test]
896 fn repair_ready_queue_after_drain_requeues_when_new_bits_arrive() {
897 let epoll_state = Arc::new(EpollState::new());
898 let sub = test_sub_state(44, 3);
899 let writable_bit = epoll_type_to_pending_bit(EpollType::EPOLLOUT).unwrap();
900 sub.pending_bits.store(writable_bit, Ordering::Release);
901 sub.enqueued.store(false, Ordering::Release);
902
903 repair_ready_queue_after_drain(&epoll_state, 44, &sub);
904
905 assert!(sub.enqueued.load(Ordering::Acquire));
906 let queued = epoll_state.ready.lock().unwrap().pop_front().unwrap();
907 assert_eq!(queued.fd, 44);
908 }
909
910 #[test]
911 fn apply_del_detaches_joins_even_if_subscription_stays_alive() {
912 let epoll_state = Arc::new(EpollState::new());
913 let event = test_epoll_event_ctl(55);
914 let sub = Arc::new(EpollSubState::new(EpollFd::from_event_ctl(55, &event), 1));
915 epoll_state.insert_subscription(55, sub.clone());
916
917 let (tx, _rx) = Pipe::new().split();
918 sub.add_join(EpollJoinGuard::new(InodeValFilePollGuard {
919 fd: 55,
920 peb: PollEventBuilder::new().build(),
921 subscription: Subscription {
922 userdata: 0,
923 type_: Eventtype::FdRead,
924 data: SubscriptionUnion {
925 fd_readwrite: SubscriptionFsReadwrite {
926 file_descriptor: 55,
927 },
928 },
929 },
930 mode: InodeValFilePollGuardMode::PipeTx {
931 tx: Arc::new(RwLock::new(Box::new(tx))),
932 },
933 }));
934
935 let leaked_ref = sub.clone();
936 assert_eq!(leaked_ref.joins.lock().unwrap().len(), 1);
937
938 epoll_state.apply_del(55).unwrap();
939
940 assert_eq!(leaked_ref.joins.lock().unwrap().len(), 0);
941 }
942}