wasmer_wasix/fs/
notification.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::{
    collections::VecDeque,
    sync::Mutex,
    task::{Poll, Waker},
};

use virtual_mio::{InterestHandler, InterestType};

#[derive(Debug)]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
struct NotificationState {
    /// Used for event notifications by the user application or operating system
    /// (positive number means there are events waiting to be processed)
    counter: u64,
    /// Counter used to prevent duplicate notification events
    last_poll: u64,
    /// Flag that indicates if this is operating
    is_semaphore: bool,
    /// All the registered wakers
    #[cfg_attr(feature = "enable-serde", serde(skip))]
    wakers: VecDeque<Waker>,
    /// InterestHandler for use with epoll
    #[cfg_attr(feature = "enable-serde", serde(skip))]
    interest_handler: Option<Box<dyn InterestHandler>>,
}

impl NotificationState {
    fn add_waker(&mut self, waker: &Waker) {
        if !self.wakers.iter().any(|a| a.will_wake(waker)) {
            self.wakers.push_front(waker.clone());
        }
    }

    fn wake_all(&mut self) {
        self.last_poll = u64::MAX;
        while let Some(waker) = self.wakers.pop_front() {
            waker.wake();
        }
        if let Some(handler) = self.interest_handler.as_mut() {
            handler.push_interest(InterestType::Readable);
        }
    }

    fn inc(&mut self, val: u64) {
        self.counter += val;
        self.wake_all();
    }

    fn dec(&mut self) -> u64 {
        let val = self.counter;
        if self.is_semaphore {
            if self.counter > 0 {
                self.counter -= 1;
                if self.counter > 0 {
                    self.wake_all();
                }
            }
        } else {
            self.counter = 0;
        }
        val
    }
}

#[derive(Debug)]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
pub struct NotificationInner {
    /// Receiver that wakes sleeping threads
    #[cfg_attr(feature = "enable-serde", serde(skip))]
    state: Mutex<NotificationState>,
}

impl NotificationInner {
    pub fn new(initial_val: u64, is_semaphore: bool) -> Self {
        Self {
            state: Mutex::new(NotificationState {
                counter: initial_val,
                last_poll: u64::MAX,
                is_semaphore,
                wakers: Default::default(),
                interest_handler: None,
            }),
        }
    }
    pub fn poll(&self, waker: &Waker) -> Poll<usize> {
        let mut state = self.state.lock().unwrap();
        state.add_waker(waker);

        if state.last_poll != state.counter {
            state.last_poll = state.counter;
            Poll::Ready(state.counter as usize)
        } else {
            Poll::Pending
        }
    }

    pub fn write(&self, val: u64) {
        let mut state = self.state.lock().unwrap();
        state.inc(val);
    }

    pub fn read(&self, waker: &Waker) -> Poll<u64> {
        let mut state = self.state.lock().unwrap();
        state.add_waker(waker);
        match state.dec() {
            0 => Poll::Pending,
            res => Poll::Ready(res),
        }
    }

    pub fn try_read(&self) -> Option<u64> {
        let mut state = self.state.lock().unwrap();
        match state.dec() {
            0 => None,
            res => Some(res),
        }
    }

    pub fn reset(&self) {
        let mut state = self.state.lock().unwrap();
        state.last_poll = u64::MAX;
    }

    pub fn set_interest_handler(&self, handler: Box<dyn InterestHandler>) {
        let mut state = self.state.lock().unwrap();
        state.interest_handler.replace(handler);
    }

    pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
        let mut state = self.state.lock().unwrap();
        state.interest_handler.take()
    }
}