wasmer_wasix/fs/
notification.rs1use std::{
2 collections::VecDeque,
3 sync::Mutex,
4 task::{Poll, Waker},
5};
6
7use virtual_mio::{InterestHandler, InterestType};
8
9#[derive(Debug)]
10#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
11struct NotificationState {
12 counter: u64,
15 last_poll: u64,
17 is_semaphore: bool,
19 #[cfg_attr(feature = "enable-serde", serde(skip))]
21 wakers: VecDeque<Waker>,
22 #[cfg_attr(feature = "enable-serde", serde(skip))]
24 interest_handler: Option<Box<dyn InterestHandler>>,
25}
26
27impl NotificationState {
28 fn add_waker(&mut self, waker: &Waker) {
29 if !self.wakers.iter().any(|a| a.will_wake(waker)) {
30 self.wakers.push_front(waker.clone());
31 }
32 }
33
34 fn wake_all(&mut self) {
35 self.last_poll = u64::MAX;
36 while let Some(waker) = self.wakers.pop_front() {
37 waker.wake();
38 }
39 if let Some(handler) = self.interest_handler.as_mut() {
40 handler.push_interest(InterestType::Readable);
41 }
42 }
43
44 fn inc(&mut self, val: u64) {
45 self.counter += val;
46 self.wake_all();
47 }
48
49 fn dec(&mut self) -> u64 {
50 let val = self.counter;
51 if self.is_semaphore {
52 if self.counter > 0 {
53 self.counter -= 1;
54 if self.counter > 0 {
55 self.wake_all();
56 }
57 }
58 } else {
59 self.counter = 0;
60 }
61 val
62 }
63}
64
65#[derive(Debug)]
66#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
67pub struct NotificationInner {
68 #[cfg_attr(feature = "enable-serde", serde(skip))]
70 state: Mutex<NotificationState>,
71}
72
73impl NotificationInner {
74 pub fn new(initial_val: u64, is_semaphore: bool) -> Self {
75 Self {
76 state: Mutex::new(NotificationState {
77 counter: initial_val,
78 last_poll: u64::MAX,
79 is_semaphore,
80 wakers: Default::default(),
81 interest_handler: None,
82 }),
83 }
84 }
85 pub fn poll(&self, waker: &Waker) -> Poll<usize> {
86 let mut state = self.state.lock().unwrap();
87 state.add_waker(waker);
88
89 if state.last_poll != state.counter {
90 state.last_poll = state.counter;
91 Poll::Ready(state.counter as usize)
92 } else {
93 Poll::Pending
94 }
95 }
96
97 pub fn write(&self, val: u64) {
98 let mut state = self.state.lock().unwrap();
99 state.inc(val);
100 }
101
102 pub fn read(&self, waker: &Waker) -> Poll<u64> {
103 let mut state = self.state.lock().unwrap();
104 state.add_waker(waker);
105 match state.dec() {
106 0 => Poll::Pending,
107 res => Poll::Ready(res),
108 }
109 }
110
111 pub fn try_read(&self) -> Option<u64> {
112 let mut state = self.state.lock().unwrap();
113 match state.dec() {
114 0 => None,
115 res => Some(res),
116 }
117 }
118
119 pub fn reset(&self) {
120 let mut state = self.state.lock().unwrap();
121 state.last_poll = u64::MAX;
122 }
123
124 pub fn set_interest_handler(&self, handler: Box<dyn InterestHandler>) {
125 let mut state = self.state.lock().unwrap();
126 state.interest_handler.replace(handler);
127 }
128
129 pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
130 let mut state = self.state.lock().unwrap();
131 state.interest_handler.take()
132 }
133}