virtual_mio/
interest.rs

1use serde::{Deserialize, Serialize};
2use std::{
3    collections::{HashMap, HashSet},
4    sync::{Arc, Mutex},
5    task::{Context, RawWaker, RawWakerVTable, Waker},
6};
7
8#[derive(Debug, Clone, Serialize, Deserialize, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
9pub enum InterestType {
10    Readable,
11    Writable,
12    Closed,
13    Error,
14}
15
16#[derive(Debug)]
17pub struct WakerInterestHandler {
18    set: HashSet<InterestType>,
19    waker: Waker,
20}
21impl WakerInterestHandler {
22    pub fn new(waker: &Waker) -> Box<Self> {
23        Box::new(WakerInterestHandler {
24            set: Default::default(),
25            waker: waker.clone(),
26        })
27    }
28}
29impl InterestHandler for WakerInterestHandler {
30    fn push_interest(&mut self, interest: InterestType) {
31        self.set.insert(interest);
32        self.waker.wake_by_ref();
33    }
34
35    fn pop_interest(&mut self, interest: InterestType) -> bool {
36        self.set.remove(&interest)
37    }
38
39    fn has_interest(&self, interest: InterestType) -> bool {
40        self.set.contains(&interest)
41    }
42}
43
44#[derive(Debug, Clone)]
45pub struct SharedWakerInterestHandler {
46    inner: Arc<Mutex<Box<WakerInterestHandler>>>,
47}
48impl SharedWakerInterestHandler {
49    pub fn new(waker: &Waker) -> Box<Self> {
50        Box::new(Self {
51            inner: Arc::new(Mutex::new(WakerInterestHandler::new(waker))),
52        })
53    }
54}
55impl InterestHandler for SharedWakerInterestHandler {
56    fn push_interest(&mut self, interest: InterestType) {
57        let mut inner = self.inner.lock().unwrap();
58        inner.push_interest(interest);
59    }
60
61    fn pop_interest(&mut self, interest: InterestType) -> bool {
62        let mut inner = self.inner.lock().unwrap();
63        inner.pop_interest(interest)
64    }
65
66    fn has_interest(&self, interest: InterestType) -> bool {
67        let inner = self.inner.lock().unwrap();
68        inner.has_interest(interest)
69    }
70}
71
72pub trait InterestHandler: Send + Sync + std::fmt::Debug {
73    fn push_interest(&mut self, interest: InterestType);
74
75    fn pop_interest(&mut self, interest: InterestType) -> bool;
76
77    fn has_interest(&self, interest: InterestType) -> bool;
78}
79
80impl From<&Waker> for Box<dyn InterestHandler + Send + Sync> {
81    fn from(waker: &Waker) -> Self {
82        WakerInterestHandler::new(waker)
83    }
84}
85
86impl From<&Context<'_>> for Box<dyn InterestHandler + Send + Sync> {
87    fn from(cx: &Context) -> Self {
88        cx.waker().into()
89    }
90}
91
92pub fn handler_into_waker(
93    handler: Box<dyn InterestHandler + Send + Sync>,
94    interest: InterestType,
95) -> Arc<InterestHandlerWaker> {
96    Arc::new(InterestHandlerWaker {
97        handler: Arc::new(Mutex::new(handler)),
98        interest,
99    })
100}
101
102#[derive(Debug, Clone)]
103pub struct InterestHandlerWaker {
104    handler: Arc<Mutex<Box<dyn InterestHandler + Send + Sync>>>,
105    interest: InterestType,
106}
107impl InterestHandlerWaker {
108    pub fn wake_now(&self) {
109        let mut handler = self.handler.lock().unwrap();
110        handler.push_interest(self.interest);
111    }
112    pub fn set_interest(self: &Arc<Self>, interest: InterestType) -> Arc<Self> {
113        let mut next = self.as_ref().clone();
114        next.interest = interest;
115        Arc::new(next)
116    }
117    pub fn as_waker(self: &Arc<Self>) -> Waker {
118        let s: *const Self = Arc::into_raw(Arc::clone(self));
119        let raw_waker = RawWaker::new(s as *const (), &VTABLE);
120        unsafe { Waker::from_raw(raw_waker) }
121    }
122}
123
124fn handler_waker_wake(s: &InterestHandlerWaker) {
125    let waker_arc = unsafe { Arc::from_raw(s) };
126    waker_arc.wake_now();
127}
128
129fn handler_waker_clone(s: &InterestHandlerWaker) -> RawWaker {
130    let arc = unsafe { Arc::from_raw(s) };
131    std::mem::forget(arc.clone());
132    RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
133}
134
135const VTABLE: RawWakerVTable = unsafe {
136    RawWakerVTable::new(
137        |s| handler_waker_clone(&*(s as *const InterestHandlerWaker)), // clone
138        |s| handler_waker_wake(&*(s as *const InterestHandlerWaker)),  // wake
139        |s| (*(s as *const InterestHandlerWaker)).wake_now(), // wake by ref (don't decrease refcount)
140        |s| drop(Arc::from_raw(s as *const InterestHandlerWaker)), // decrease refcount
141    )
142};
143
144#[derive(Debug, Clone, Default)]
145struct InterestWakerMapState {
146    wakers: HashMap<InterestType, Vec<Waker>>,
147    triggered: HashSet<InterestType>,
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct InterestWakerMap {
152    state: Arc<Mutex<InterestWakerMapState>>,
153}
154
155impl InterestWakerMap {
156    pub fn add(&self, interest: InterestType, waker: &Waker) {
157        let mut state = self.state.lock().unwrap();
158        let entries = state.wakers.entry(interest).or_default();
159        if !entries.iter().any(|w| w.will_wake(waker)) {
160            entries.push(waker.clone());
161        }
162    }
163
164    pub fn pop(&self, interest: InterestType) -> bool {
165        let mut state = self.state.lock().unwrap();
166        state.triggered.remove(&interest)
167    }
168
169    pub fn push(&self, interest: InterestType) -> bool {
170        let mut state = self.state.lock().unwrap();
171        state.triggered.insert(interest)
172    }
173}
174
175impl InterestHandler for InterestWakerMap {
176    fn push_interest(&mut self, interest: InterestType) {
177        let mut state = self.state.lock().unwrap();
178        if let Some(wakers) = state.wakers.remove(&interest) {
179            for waker in wakers {
180                waker.wake();
181            }
182        } else {
183            state.triggered.insert(interest);
184        }
185    }
186
187    fn pop_interest(&mut self, interest: InterestType) -> bool {
188        let mut state = self.state.lock().unwrap();
189        state.triggered.remove(&interest)
190    }
191
192    fn has_interest(&self, interest: InterestType) -> bool {
193        let state = self.state.lock().unwrap();
194        state.triggered.contains(&interest)
195    }
196}