virtual_mio/
selector.rs

1use mio::{Registry, Token};
2use std::{
3    collections::HashMap,
4    io,
5    sync::{
6        Arc, Mutex,
7        atomic::{AtomicBool, AtomicUsize, Ordering},
8    },
9};
10
11use crate::{InterestHandler, InterestType};
12
13pub enum SelectorModification {
14    Add {
15        handler: Box<dyn InterestHandler + Send + Sync>,
16        token: Token,
17    },
18    Remove {
19        token: Token,
20    },
21    Replace {
22        token: Token,
23        handler: Box<dyn InterestHandler + Send + Sync>,
24    },
25    PushInterest {
26        token: Token,
27        interest: InterestType,
28    },
29}
30
31impl SelectorModification {
32    /// Apply the modification to a handler lookup table
33    ///
34    /// This function must be called with care, as `SelectorModification::PushInterest` may trigger handler code.
35    fn apply(self, lookup: &mut HashMap<Token, Box<dyn InterestHandler + Send + Sync>>) {
36        match self {
37            SelectorModification::Add { token, handler } => {
38                lookup.insert(token, handler);
39            }
40            SelectorModification::Remove { token } => {
41                lookup.remove(&token);
42            }
43            SelectorModification::Replace { token, mut handler } => {
44                let last = lookup.remove(&token);
45
46                // If there was a previous handler, copy over its active interests
47                if let Some(last) = last {
48                    let interests = vec![
49                        InterestType::Readable,
50                        InterestType::Writable,
51                        InterestType::Closed,
52                        InterestType::Error,
53                    ];
54                    for interest in interests {
55                        if last.has_interest(interest) && !handler.has_interest(interest) {
56                            handler.push_interest(interest);
57                        }
58                    }
59                }
60
61                lookup.insert(token, handler);
62            }
63            SelectorModification::PushInterest { token, interest } => {
64                if let Some(handler) = lookup.get_mut(&token) {
65                    handler.push_interest(interest);
66                }
67            }
68        }
69    }
70}
71impl std::fmt::Debug for SelectorModification {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            SelectorModification::Add { token, .. } => {
75                f.debug_struct("Add").field("token", token).finish()
76            }
77            SelectorModification::Remove { token, .. } => {
78                f.debug_struct("Remove").field("token", token).finish()
79            }
80            SelectorModification::Replace { token, .. } => {
81                f.debug_struct("Replace").field("token", token).finish()
82            }
83            SelectorModification::PushInterest { token, interest } => f
84                .debug_struct("PushInterest")
85                .field("token", token)
86                .field("interest", interest)
87                .finish(),
88        }
89    }
90}
91
92#[derive(Debug)]
93pub struct Selector {
94    /// Set to true when exiting the poll loop
95    close_requested: AtomicBool,
96    token_wakeup: Token,
97    /// The core assumption here is that this will always be the innermost lock, so we will never deadlock
98    registry: Mutex<Registry>,
99    next_seed: AtomicUsize,
100    /// Waker to wake up the selectors own poll loop
101    wakeup: mio::Waker,
102    /// Queued up modifications that will be processed immediately after we get new events
103    ///
104    /// The core assumption here is that this will always be the innermost lock, so we will never deadlock
105    queued_modifications: Mutex<Vec<SelectorModification>>,
106}
107
108impl Selector {
109    pub fn new() -> Arc<Self> {
110        let poll = mio::Poll::new().unwrap();
111        let registry = poll
112            .registry()
113            .try_clone()
114            .expect("the selector registry failed to clone");
115
116        let token_wakeup = Token(0);
117        let engine = Arc::new(Selector {
118            wakeup: mio::Waker::new(poll.registry(), token_wakeup).unwrap(),
119            close_requested: false.into(),
120            token_wakeup,
121            next_seed: 10.into(),
122            registry: Mutex::new(registry),
123            queued_modifications: Mutex::new(Vec::new()),
124        });
125
126        {
127            let engine = engine.clone();
128            std::thread::spawn(move || {
129                Self::run(engine, poll);
130            });
131        }
132
133        engine
134    }
135
136    pub fn shutdown(&self) {
137        self.close_requested.store(true, Ordering::SeqCst);
138        self.wakeup.wake().ok();
139    }
140
141    #[must_use = "the token must be consumed"]
142    pub fn add(
143        &self,
144        handler: Box<dyn InterestHandler + Send + Sync>,
145        source: &mut dyn mio::event::Source,
146        interests: mio::Interest,
147    ) -> io::Result<Token> {
148        let token = self.new_token();
149
150        self.queue_modification(SelectorModification::Add { handler, token });
151
152        // CONCURRENCY: This should never result in a deadlock, as long as source.deregister does not call remove or add again.
153        let inner_registry = self.registry.lock().unwrap();
154        match source.register(&inner_registry, token, interests) {
155            Ok(()) => {}
156            Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {
157                source.deregister(&inner_registry).ok();
158                source.register(&inner_registry, token, interests)?;
159            }
160            Err(err) => return Err(err),
161        };
162
163        Ok(token)
164    }
165
166    pub fn remove(
167        &self,
168        token: Token,
169        source: Option<&mut dyn mio::event::Source>,
170    ) -> io::Result<()> {
171        self.queue_modification(SelectorModification::Remove { token });
172        // CONCURRENCY: This should never result in a deadlock, as long as source.deregister does not call remove or add again.
173        if let Some(source) = source {
174            let inner_registry = self.registry.lock().unwrap();
175            source.deregister(&inner_registry)?;
176        }
177        Ok(())
178    }
179
180    pub fn push_interest(&self, token: Token, interest: InterestType) {
181        self.queue_modification(SelectorModification::PushInterest { token, interest });
182    }
183
184    pub fn replace(&self, token: Token, handler: Box<dyn InterestHandler + Send + Sync>) {
185        self.queue_modification(SelectorModification::Replace { token, handler });
186    }
187
188    /// Generate a new unique token
189    #[must_use = "the token must be consumed"]
190    fn new_token(&self) -> Token {
191        Token(self.next_seed.fetch_add(1, Ordering::Relaxed))
192    }
193
194    /// Try to process a modification immediately, otherwise queue it up
195    fn queue_modification(&self, modification: SelectorModification) {
196        // Replace and PushInterest can cause external code to be called so it is a good idea to process them asap so they don't get delayed too long
197        let needs_wakeup = matches!(
198            &modification,
199            SelectorModification::PushInterest { .. } | SelectorModification::Replace { .. }
200        );
201
202        // CONCURRENCY: This will never deadlock as queued_modifications is always the innermost lock and we don't call any potentially blocking functions while holding the lock.
203        self.queued_modifications.lock().unwrap().push(modification);
204
205        if needs_wakeup {
206            self.wakeup.wake().ok();
207        }
208    }
209
210    /// Drain the queued modifications queue and return the modifications
211    fn take_queued_modifications(&self) -> Vec<SelectorModification> {
212        // CONCURRENCY: This will never deadlock as queued_modifications is always the innermost lock and we don't call any potentially blocking functions while holding the lock.
213        self.queued_modifications
214            .lock()
215            .unwrap()
216            .drain(..)
217            .collect::<Vec<_>>()
218    }
219
220    fn run(engine: Arc<Selector>, mut poll: mio::Poll) {
221        // The outer loop is used to release the scope of the
222        // read lock whenever it needs to do so
223        let mut events = mio::Events::with_capacity(128);
224        let mut handler_map: HashMap<Token, Box<dyn InterestHandler + Send + Sync>> =
225            HashMap::new();
226        loop {
227            // Wait for an event to trigger
228            if let Err(e) = poll.poll(&mut events, None) {
229                // This can happen when a debugger is attached
230                #[cfg(debug_assertions)]
231                if e.kind() == std::io::ErrorKind::Interrupted {
232                    continue;
233                }
234                panic!("Unexpected error in selector poll loop: {e:?}");
235            }
236
237            // Handler changes that may be queued up between the poll completing and taking the queued modifications can be a problem but we can not eliminate that fully.
238
239            let queued_modifications = engine.take_queued_modifications();
240            for modification in queued_modifications {
241                modification.apply(&mut handler_map);
242            }
243
244            for event in events.iter() {
245                // If the event is already dropped then ignore it
246                let token = event.token();
247
248                if token == engine.token_wakeup {
249                    if engine.close_requested.load(Ordering::SeqCst) {
250                        // If exiting was requested, exit the loop
251                        return;
252                    }
253                    // Just a wake up call, continue to process queued modifications
254                    continue;
255                }
256
257                // Get the handler
258                let Some(handler) = handler_map.get_mut(&token) else {
259                    tracing::debug!(token = token.0, "orphaned event");
260                    continue;
261                };
262
263                // Otherwise this is a waker we need to wake
264                if event.is_readable() {
265                    tracing::trace!(token = ?token, interest = ?InterestType::Readable, "host epoll");
266                    handler.push_interest(InterestType::Readable);
267                }
268                if event.is_writable() {
269                    tracing::trace!(token = ?token, interest = ?InterestType::Writable, "host epoll");
270                    handler.push_interest(InterestType::Writable);
271                }
272                if event.is_read_closed() || event.is_write_closed() {
273                    tracing::trace!(token = ?token, interest = ?InterestType::Closed, "host epoll");
274                    handler.push_interest(InterestType::Closed);
275                }
276                if event.is_error() {
277                    tracing::trace!(token = ?token, interest = ?InterestType::Error, "host epoll");
278                    handler.push_interest(InterestType::Error);
279                }
280            }
281        }
282    }
283}
284
285// Tests only run on unix because they depend on mio's unix pipe implementation
286#[cfg(all(unix, test))]
287mod tests {
288    use super::*;
289    use std::io::Write;
290    use std::sync::mpsc;
291    use std::thread;
292    use std::time::Duration;
293
294    #[derive(Debug)]
295    struct TestHandler {
296        success_sender: mpsc::Sender<()>,
297    }
298    impl InterestHandler for TestHandler {
299        fn push_interest(&mut self, _interest: InterestType) {
300            // Send if we received an interest
301            self.success_sender.send(()).unwrap();
302        }
303
304        fn pop_interest(&mut self, _interest: InterestType) -> bool {
305            false
306        }
307
308        fn has_interest(&self, interest: InterestType) -> bool {
309            interest == InterestType::Readable
310        }
311    }
312
313    #[derive(Debug)]
314    struct DeadlockingHandler {
315        selector: Arc<Selector>,
316        token: Arc<Mutex<Option<Token>>>,
317        success_sender: mpsc::Sender<()>,
318    }
319    impl InterestHandler for DeadlockingHandler {
320        fn push_interest(&mut self, _interest: InterestType) {
321            // This would deadlock without a queue
322            self.selector
323                .remove(self.token.lock().unwrap().unwrap(), None)
324                .unwrap();
325            self.success_sender.send(()).unwrap();
326        }
327
328        fn pop_interest(&mut self, _interest: InterestType) -> bool {
329            false
330        }
331
332        fn has_interest(&self, interest: InterestType) -> bool {
333            interest == InterestType::Readable
334        }
335    }
336
337    #[test]
338    fn test_push_interest() {
339        let (mut sender, mut receiver) = mio::unix::pipe::new().unwrap();
340        let (success_sender, success_receiver) = std::sync::mpsc::channel();
341
342        let selector = Selector::new();
343
344        let handler = Box::new(TestHandler { success_sender });
345
346        let token = selector
347            .add(handler, &mut receiver, mio::Interest::READABLE)
348            .unwrap();
349
350        assert!(
351            success_receiver.try_recv().is_err(),
352            "Received success before sending data. Something is wrong"
353        );
354
355        thread::sleep(Duration::from_millis(10));
356
357        // Works once
358        sender.write_all(&[1, 2, 3]).unwrap();
359        sender.flush().unwrap();
360        thread::sleep(Duration::from_millis(10));
361        assert!(
362            success_receiver.try_recv().is_ok(),
363            "Did not receive success signal from handler"
364        );
365        assert!(
366            success_receiver.try_recv().is_err(),
367            "Did receive more than once from handler"
368        );
369        thread::sleep(Duration::from_millis(10));
370
371        // Works multiple times
372        sender.write_all(&[1, 2, 3]).unwrap();
373        sender.flush().unwrap();
374        thread::sleep(Duration::from_millis(10));
375        assert!(
376            success_receiver.try_recv().is_ok(),
377            "Did not receive success signal from handler"
378        );
379        assert!(
380            success_receiver.try_recv().is_err(),
381            "Did receive more than once from handler"
382        );
383        thread::sleep(Duration::from_millis(10));
384
385        // No signal after removing the handler
386        selector.remove(token, Some(&mut receiver)).unwrap();
387        sender.write_all(&[1, 2, 3]).unwrap();
388        sender.flush().unwrap();
389        thread::sleep(Duration::from_millis(10));
390        assert!(
391            success_receiver.try_recv().is_err(),
392            "Did receive even though the handler was removed"
393        );
394        thread::sleep(Duration::from_millis(10));
395
396        selector.shutdown();
397    }
398
399    #[test]
400    fn test_selector_no_deadlock_when_modifying_the_selector_from_push_interest() {
401        let (mut sender, mut receiver) = mio::unix::pipe::new().unwrap();
402        let (success_sender, success_receiver) = std::sync::mpsc::channel();
403
404        let selector = Selector::new();
405
406        // The deadlocking handler will try to remove itself from the selector when it receives an interest
407        let handler = Box::new(DeadlockingHandler {
408            selector: selector.clone(),
409            token: Default::default(),
410            success_sender,
411        });
412        let handler_token_arcmutex = handler.token.clone();
413
414        let token = selector
415            .add(handler, &mut receiver, mio::Interest::READABLE)
416            .unwrap();
417        handler_token_arcmutex.lock().unwrap().replace(token);
418
419        sender.write_all(&[1, 2, 3]).unwrap();
420        sender.flush().unwrap();
421
422        thread::sleep(Duration::from_millis(100));
423        selector.shutdown();
424        thread::sleep(Duration::from_millis(100));
425
426        let received_result = success_receiver.try_recv();
427        assert!(
428            received_result.is_ok(),
429            "Did not receive success signal from handler, deadlocked?"
430        );
431    }
432}