1use mio::{Registry, Token};
2use std::{
3 collections::HashMap,
4 io,
5 sync::{
6 Arc, Mutex,
7 atomic::{AtomicBool, AtomicUsize, Ordering},
8 },
9};
10
11use crate::{InterestHandler, InterestType};
12
13pub enum SelectorModification {
14 Add {
15 handler: Box<dyn InterestHandler + Send + Sync>,
16 token: Token,
17 },
18 Remove {
19 token: Token,
20 },
21 Replace {
22 token: Token,
23 handler: Box<dyn InterestHandler + Send + Sync>,
24 },
25 PushInterest {
26 token: Token,
27 interest: InterestType,
28 },
29}
30
31impl SelectorModification {
32 fn apply(self, lookup: &mut HashMap<Token, Box<dyn InterestHandler + Send + Sync>>) {
36 match self {
37 SelectorModification::Add { token, handler } => {
38 lookup.insert(token, handler);
39 }
40 SelectorModification::Remove { token } => {
41 lookup.remove(&token);
42 }
43 SelectorModification::Replace { token, mut handler } => {
44 let last = lookup.remove(&token);
45
46 if let Some(last) = last {
48 let interests = vec![
49 InterestType::Readable,
50 InterestType::Writable,
51 InterestType::Closed,
52 InterestType::Error,
53 ];
54 for interest in interests {
55 if last.has_interest(interest) && !handler.has_interest(interest) {
56 handler.push_interest(interest);
57 }
58 }
59 }
60
61 lookup.insert(token, handler);
62 }
63 SelectorModification::PushInterest { token, interest } => {
64 if let Some(handler) = lookup.get_mut(&token) {
65 handler.push_interest(interest);
66 }
67 }
68 }
69 }
70}
71impl std::fmt::Debug for SelectorModification {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 SelectorModification::Add { token, .. } => {
75 f.debug_struct("Add").field("token", token).finish()
76 }
77 SelectorModification::Remove { token, .. } => {
78 f.debug_struct("Remove").field("token", token).finish()
79 }
80 SelectorModification::Replace { token, .. } => {
81 f.debug_struct("Replace").field("token", token).finish()
82 }
83 SelectorModification::PushInterest { token, interest } => f
84 .debug_struct("PushInterest")
85 .field("token", token)
86 .field("interest", interest)
87 .finish(),
88 }
89 }
90}
91
92#[derive(Debug)]
93pub struct Selector {
94 close_requested: AtomicBool,
96 token_wakeup: Token,
97 registry: Mutex<Registry>,
99 next_seed: AtomicUsize,
100 wakeup: mio::Waker,
102 queued_modifications: Mutex<Vec<SelectorModification>>,
106}
107
108impl Selector {
109 pub fn new() -> Arc<Self> {
110 let poll = mio::Poll::new().unwrap();
111 let registry = poll
112 .registry()
113 .try_clone()
114 .expect("the selector registry failed to clone");
115
116 let token_wakeup = Token(0);
117 let engine = Arc::new(Selector {
118 wakeup: mio::Waker::new(poll.registry(), token_wakeup).unwrap(),
119 close_requested: false.into(),
120 token_wakeup,
121 next_seed: 10.into(),
122 registry: Mutex::new(registry),
123 queued_modifications: Mutex::new(Vec::new()),
124 });
125
126 {
127 let engine = engine.clone();
128 std::thread::spawn(move || {
129 Self::run(engine, poll);
130 });
131 }
132
133 engine
134 }
135
136 pub fn shutdown(&self) {
137 self.close_requested.store(true, Ordering::SeqCst);
138 self.wakeup.wake().ok();
139 }
140
141 #[must_use = "the token must be consumed"]
142 pub fn add(
143 &self,
144 handler: Box<dyn InterestHandler + Send + Sync>,
145 source: &mut dyn mio::event::Source,
146 interests: mio::Interest,
147 ) -> io::Result<Token> {
148 let token = self.new_token();
149
150 self.queue_modification(SelectorModification::Add { handler, token });
151
152 let inner_registry = self.registry.lock().unwrap();
154 match source.register(&inner_registry, token, interests) {
155 Ok(()) => {}
156 Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {
157 source.deregister(&inner_registry).ok();
158 source.register(&inner_registry, token, interests)?;
159 }
160 Err(err) => return Err(err),
161 };
162
163 Ok(token)
164 }
165
166 pub fn remove(
167 &self,
168 token: Token,
169 source: Option<&mut dyn mio::event::Source>,
170 ) -> io::Result<()> {
171 self.queue_modification(SelectorModification::Remove { token });
172 if let Some(source) = source {
174 let inner_registry = self.registry.lock().unwrap();
175 source.deregister(&inner_registry)?;
176 }
177 Ok(())
178 }
179
180 pub fn push_interest(&self, token: Token, interest: InterestType) {
181 self.queue_modification(SelectorModification::PushInterest { token, interest });
182 }
183
184 pub fn replace(&self, token: Token, handler: Box<dyn InterestHandler + Send + Sync>) {
185 self.queue_modification(SelectorModification::Replace { token, handler });
186 }
187
188 #[must_use = "the token must be consumed"]
190 fn new_token(&self) -> Token {
191 Token(self.next_seed.fetch_add(1, Ordering::Relaxed))
192 }
193
194 fn queue_modification(&self, modification: SelectorModification) {
196 let needs_wakeup = matches!(
198 &modification,
199 SelectorModification::PushInterest { .. } | SelectorModification::Replace { .. }
200 );
201
202 self.queued_modifications.lock().unwrap().push(modification);
204
205 if needs_wakeup {
206 self.wakeup.wake().ok();
207 }
208 }
209
210 fn take_queued_modifications(&self) -> Vec<SelectorModification> {
212 self.queued_modifications
214 .lock()
215 .unwrap()
216 .drain(..)
217 .collect::<Vec<_>>()
218 }
219
220 fn run(engine: Arc<Selector>, mut poll: mio::Poll) {
221 let mut events = mio::Events::with_capacity(128);
224 let mut handler_map: HashMap<Token, Box<dyn InterestHandler + Send + Sync>> =
225 HashMap::new();
226 loop {
227 if let Err(e) = poll.poll(&mut events, None) {
229 #[cfg(debug_assertions)]
231 if e.kind() == std::io::ErrorKind::Interrupted {
232 continue;
233 }
234 panic!("Unexpected error in selector poll loop: {e:?}");
235 }
236
237 let queued_modifications = engine.take_queued_modifications();
240 for modification in queued_modifications {
241 modification.apply(&mut handler_map);
242 }
243
244 for event in events.iter() {
245 let token = event.token();
247
248 if token == engine.token_wakeup {
249 if engine.close_requested.load(Ordering::SeqCst) {
250 return;
252 }
253 continue;
255 }
256
257 let Some(handler) = handler_map.get_mut(&token) else {
259 tracing::debug!(token = token.0, "orphaned event");
260 continue;
261 };
262
263 if event.is_readable() {
265 tracing::trace!(token = ?token, interest = ?InterestType::Readable, "host epoll");
266 handler.push_interest(InterestType::Readable);
267 }
268 if event.is_writable() {
269 tracing::trace!(token = ?token, interest = ?InterestType::Writable, "host epoll");
270 handler.push_interest(InterestType::Writable);
271 }
272 if event.is_read_closed() || event.is_write_closed() {
273 tracing::trace!(token = ?token, interest = ?InterestType::Closed, "host epoll");
274 handler.push_interest(InterestType::Closed);
275 }
276 if event.is_error() {
277 tracing::trace!(token = ?token, interest = ?InterestType::Error, "host epoll");
278 handler.push_interest(InterestType::Error);
279 }
280 }
281 }
282 }
283}
284
285#[cfg(all(unix, test))]
287mod tests {
288 use super::*;
289 use std::io::Write;
290 use std::sync::mpsc;
291 use std::thread;
292 use std::time::Duration;
293
294 #[derive(Debug)]
295 struct TestHandler {
296 success_sender: mpsc::Sender<()>,
297 }
298 impl InterestHandler for TestHandler {
299 fn push_interest(&mut self, _interest: InterestType) {
300 self.success_sender.send(()).unwrap();
302 }
303
304 fn pop_interest(&mut self, _interest: InterestType) -> bool {
305 false
306 }
307
308 fn has_interest(&self, interest: InterestType) -> bool {
309 interest == InterestType::Readable
310 }
311 }
312
313 #[derive(Debug)]
314 struct DeadlockingHandler {
315 selector: Arc<Selector>,
316 token: Arc<Mutex<Option<Token>>>,
317 success_sender: mpsc::Sender<()>,
318 }
319 impl InterestHandler for DeadlockingHandler {
320 fn push_interest(&mut self, _interest: InterestType) {
321 self.selector
323 .remove(self.token.lock().unwrap().unwrap(), None)
324 .unwrap();
325 self.success_sender.send(()).unwrap();
326 }
327
328 fn pop_interest(&mut self, _interest: InterestType) -> bool {
329 false
330 }
331
332 fn has_interest(&self, interest: InterestType) -> bool {
333 interest == InterestType::Readable
334 }
335 }
336
337 #[test]
338 fn test_push_interest() {
339 let (mut sender, mut receiver) = mio::unix::pipe::new().unwrap();
340 let (success_sender, success_receiver) = std::sync::mpsc::channel();
341
342 let selector = Selector::new();
343
344 let handler = Box::new(TestHandler { success_sender });
345
346 let token = selector
347 .add(handler, &mut receiver, mio::Interest::READABLE)
348 .unwrap();
349
350 assert!(
351 success_receiver.try_recv().is_err(),
352 "Received success before sending data. Something is wrong"
353 );
354
355 thread::sleep(Duration::from_millis(10));
356
357 sender.write_all(&[1, 2, 3]).unwrap();
359 sender.flush().unwrap();
360 thread::sleep(Duration::from_millis(10));
361 assert!(
362 success_receiver.try_recv().is_ok(),
363 "Did not receive success signal from handler"
364 );
365 assert!(
366 success_receiver.try_recv().is_err(),
367 "Did receive more than once from handler"
368 );
369 thread::sleep(Duration::from_millis(10));
370
371 sender.write_all(&[1, 2, 3]).unwrap();
373 sender.flush().unwrap();
374 thread::sleep(Duration::from_millis(10));
375 assert!(
376 success_receiver.try_recv().is_ok(),
377 "Did not receive success signal from handler"
378 );
379 assert!(
380 success_receiver.try_recv().is_err(),
381 "Did receive more than once from handler"
382 );
383 thread::sleep(Duration::from_millis(10));
384
385 selector.remove(token, Some(&mut receiver)).unwrap();
387 sender.write_all(&[1, 2, 3]).unwrap();
388 sender.flush().unwrap();
389 thread::sleep(Duration::from_millis(10));
390 assert!(
391 success_receiver.try_recv().is_err(),
392 "Did receive even though the handler was removed"
393 );
394 thread::sleep(Duration::from_millis(10));
395
396 selector.shutdown();
397 }
398
399 #[test]
400 fn test_selector_no_deadlock_when_modifying_the_selector_from_push_interest() {
401 let (mut sender, mut receiver) = mio::unix::pipe::new().unwrap();
402 let (success_sender, success_receiver) = std::sync::mpsc::channel();
403
404 let selector = Selector::new();
405
406 let handler = Box::new(DeadlockingHandler {
408 selector: selector.clone(),
409 token: Default::default(),
410 success_sender,
411 });
412 let handler_token_arcmutex = handler.token.clone();
413
414 let token = selector
415 .add(handler, &mut receiver, mio::Interest::READABLE)
416 .unwrap();
417 handler_token_arcmutex.lock().unwrap().replace(token);
418
419 sender.write_all(&[1, 2, 3]).unwrap();
420 sender.flush().unwrap();
421
422 thread::sleep(Duration::from_millis(100));
423 selector.shutdown();
424 thread::sleep(Duration::from_millis(100));
425
426 let received_result = success_receiver.try_recv();
427 assert!(
428 received_result.is_ok(),
429 "Did not receive success signal from handler, deadlocked?"
430 );
431 }
432}