wasmer_wasix/os/task/
control_plane.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        Arc, RwLock,
5        atomic::{AtomicUsize, Ordering},
6    },
7    time::Duration,
8};
9
10use crate::{WasiProcess, WasiProcessId};
11use wasmer_types::ModuleHash;
12
13#[derive(Debug, Clone)]
14pub struct WasiControlPlane {
15    state: Arc<State>,
16}
17
18#[derive(Debug, Clone)]
19pub struct WasiControlPlaneHandle {
20    inner: std::sync::Weak<State>,
21}
22
23impl WasiControlPlaneHandle {
24    fn new(inner: &Arc<State>) -> Self {
25        Self {
26            inner: Arc::downgrade(inner),
27        }
28    }
29
30    pub fn upgrade(&self) -> Option<WasiControlPlane> {
31        self.inner.upgrade().map(|state| WasiControlPlane { state })
32    }
33
34    pub fn must_upgrade(&self) -> WasiControlPlane {
35        let state = self.inner.upgrade().expect("control plane unavailable");
36        WasiControlPlane { state }
37    }
38}
39
40#[derive(Debug, Clone)]
41pub struct ControlPlaneConfig {
42    /// Total number of tasks (processes + threads) that can be spawned.
43    pub max_task_count: Option<usize>,
44    /// Flag that indicates if asynchronous threading is enables (opt-in)
45    pub enable_asynchronous_threading: bool,
46    /// Enables an exponential backoff of the process CPU usage when there
47    /// are no active run tokens (when set holds the maximum amount of
48    /// time that it will pause the CPU)
49    /// (default = off)
50    pub enable_exponential_cpu_backoff: Option<Duration>,
51}
52
53impl ControlPlaneConfig {
54    pub fn new() -> Self {
55        Self {
56            max_task_count: None,
57            enable_asynchronous_threading: false,
58            enable_exponential_cpu_backoff: None,
59        }
60    }
61}
62
63impl Default for ControlPlaneConfig {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69#[derive(Debug)]
70struct State {
71    config: ControlPlaneConfig,
72
73    /// Total number of active tasks (threads) across all processes.
74    task_count: Arc<AtomicUsize>,
75
76    /// Mutable state.
77    mutable: RwLock<MutableState>,
78}
79
80#[derive(Debug)]
81struct MutableState {
82    /// Seed used to generate process ID's
83    process_seed: u32,
84    /// The processes running on this machine
85    processes: HashMap<WasiProcessId, WasiProcess>,
86    // TODO: keep a queue of terminated process ids for id reuse.
87}
88
89impl WasiControlPlane {
90    pub fn new(config: ControlPlaneConfig) -> Self {
91        Self {
92            state: Arc::new(State {
93                config,
94                task_count: Arc::new(AtomicUsize::new(0)),
95                mutable: RwLock::new(MutableState {
96                    process_seed: 0,
97                    processes: Default::default(),
98                }),
99            }),
100        }
101    }
102
103    pub fn handle(&self) -> WasiControlPlaneHandle {
104        WasiControlPlaneHandle::new(&self.state)
105    }
106
107    /// Get the current count of active tasks (threads).
108    fn active_task_count(&self) -> usize {
109        self.state.task_count.load(Ordering::SeqCst)
110    }
111
112    /// Returns the configuration for this control plane
113    pub(crate) fn config(&self) -> &ControlPlaneConfig {
114        &self.state.config
115    }
116
117    /// Register a new task.
118    ///
119    // Currently just increments the task counter.
120    pub(crate) fn register_task(&self) -> Result<TaskCountGuard, ControlPlaneError> {
121        let count = self.state.task_count.fetch_add(1, Ordering::SeqCst);
122        if let Some(max) = self.state.config.max_task_count {
123            if count > max {
124                self.state.task_count.fetch_sub(1, Ordering::SeqCst);
125                return Err(ControlPlaneError::TaskLimitReached { max: count });
126            }
127        }
128        Ok(TaskCountGuard(self.state.task_count.clone()))
129    }
130
131    /// Creates a new process
132    // FIXME: De-register terminated processes!
133    // Currently they just accumulate.
134    pub fn new_process(&self, module_hash: ModuleHash) -> Result<WasiProcess, ControlPlaneError> {
135        if let Some(max) = self.state.config.max_task_count {
136            if self.active_task_count() >= max {
137                // NOTE: task count is not incremented here, only when new threads are spawned.
138                // A process will always have a main thread.
139                return Err(ControlPlaneError::TaskLimitReached { max });
140            }
141        }
142
143        // Create the process first to do all the allocations before locking.
144        let mut proc = WasiProcess::new(WasiProcessId::from(0), module_hash, self.handle());
145
146        let mut mutable = self.state.mutable.write().unwrap();
147
148        let pid = mutable.next_process_id()?;
149        proc.set_pid(pid);
150        mutable.processes.insert(pid, proc.clone());
151        Ok(proc)
152    }
153
154    /// Generates a new process ID
155    pub fn generate_id(&self) -> Result<WasiProcessId, ControlPlaneError> {
156        let mut mutable = self.state.mutable.write().unwrap();
157        mutable.next_process_id()
158    }
159
160    /// Gets a reference to a running process
161    pub fn get_process(&self, pid: WasiProcessId) -> Option<WasiProcess> {
162        self.state
163            .mutable
164            .read()
165            .unwrap()
166            .processes
167            .get(&pid)
168            .cloned()
169    }
170}
171
172impl MutableState {
173    fn next_process_id(&mut self) -> Result<WasiProcessId, ControlPlaneError> {
174        // TODO: reuse terminated ids, handle wrap-around, ...
175        let id = self.process_seed.checked_add(1).ok_or({
176            ControlPlaneError::TaskLimitReached {
177                max: u32::MAX as usize,
178            }
179        })?;
180        self.process_seed = id;
181        Ok(WasiProcessId::from(id))
182    }
183}
184
185impl Default for WasiControlPlane {
186    fn default() -> Self {
187        let config = ControlPlaneConfig::default();
188        Self::new(config)
189    }
190}
191
192/// Guard that ensures the [`WasiControlPlane`] task counter is decremented when dropped.
193#[derive(Debug)]
194pub struct TaskCountGuard(Arc<AtomicUsize>);
195
196impl Drop for TaskCountGuard {
197    fn drop(&mut self) {
198        self.0.fetch_sub(1, Ordering::SeqCst);
199    }
200}
201
202#[derive(thiserror::Error, PartialEq, Eq, Clone, Debug)]
203pub enum ControlPlaneError {
204    /// The maximum number of execution tasks has been reached.
205    #[error("The maximum number of execution tasks has been reached ({max})")]
206    TaskLimitReached {
207        /// The maximum number of tasks.
208        max: usize,
209    },
210}
211
212#[cfg(test)]
213mod tests {
214    use wasmer_wasix_types::wasix::ThreadStartType;
215
216    use crate::os::task::thread::WasiMemoryLayout;
217
218    use super::*;
219
220    /// Simple test to ensure task limits are respected.
221    #[test]
222    fn test_control_plane_task_limits() {
223        let p = WasiControlPlane::new(ControlPlaneConfig {
224            max_task_count: Some(2),
225            enable_asynchronous_threading: false,
226            enable_exponential_cpu_backoff: None,
227        });
228
229        let p1 = p.new_process(ModuleHash::random()).unwrap();
230        let _t1 = p1
231            .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
232            .unwrap();
233        let _t2 = p1
234            .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
235            .unwrap();
236
237        assert_eq!(
238            p.new_process(ModuleHash::random()).unwrap_err(),
239            ControlPlaneError::TaskLimitReached { max: 2 }
240        );
241    }
242
243    /// Simple test to ensure task limits are respected and that thread drop guards work.
244    #[test]
245    fn test_control_plane_task_limits_with_dropped_threads() {
246        let p = WasiControlPlane::new(ControlPlaneConfig {
247            max_task_count: Some(2),
248            enable_asynchronous_threading: false,
249            enable_exponential_cpu_backoff: None,
250        });
251
252        let p1 = p.new_process(ModuleHash::random()).unwrap();
253
254        for _ in 0..10 {
255            let _thread = p1
256                .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
257                .unwrap();
258        }
259
260        let _t1 = p1
261            .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
262            .unwrap();
263        let _t2 = p1
264            .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
265            .unwrap();
266
267        assert_eq!(
268            p.new_process(ModuleHash::random()).unwrap_err(),
269            ControlPlaneError::TaskLimitReached { max: 2 }
270        );
271    }
272}