wasmer_wasix/os/task/
control_plane.rs1use std::{
2 collections::HashMap,
3 sync::{
4 Arc, RwLock,
5 atomic::{AtomicUsize, Ordering},
6 },
7 time::Duration,
8};
9
10use crate::{WasiProcess, WasiProcessId};
11use wasmer_types::ModuleHash;
12
13#[derive(Debug, Clone)]
14pub struct WasiControlPlane {
15 state: Arc<State>,
16}
17
18#[derive(Debug, Clone)]
19pub struct WasiControlPlaneHandle {
20 inner: std::sync::Weak<State>,
21}
22
23impl WasiControlPlaneHandle {
24 fn new(inner: &Arc<State>) -> Self {
25 Self {
26 inner: Arc::downgrade(inner),
27 }
28 }
29
30 pub fn upgrade(&self) -> Option<WasiControlPlane> {
31 self.inner.upgrade().map(|state| WasiControlPlane { state })
32 }
33
34 pub fn must_upgrade(&self) -> WasiControlPlane {
35 let state = self.inner.upgrade().expect("control plane unavailable");
36 WasiControlPlane { state }
37 }
38}
39
40#[derive(Debug, Clone)]
41pub struct ControlPlaneConfig {
42 pub max_task_count: Option<usize>,
44 pub enable_asynchronous_threading: bool,
46 pub enable_exponential_cpu_backoff: Option<Duration>,
51}
52
53impl ControlPlaneConfig {
54 pub fn new() -> Self {
55 Self {
56 max_task_count: None,
57 enable_asynchronous_threading: false,
58 enable_exponential_cpu_backoff: None,
59 }
60 }
61}
62
63impl Default for ControlPlaneConfig {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[derive(Debug)]
70struct State {
71 config: ControlPlaneConfig,
72
73 task_count: Arc<AtomicUsize>,
75
76 mutable: RwLock<MutableState>,
78}
79
80#[derive(Debug)]
81struct MutableState {
82 process_seed: u32,
84 processes: HashMap<WasiProcessId, WasiProcess>,
86 }
88
89impl WasiControlPlane {
90 pub fn new(config: ControlPlaneConfig) -> Self {
91 Self {
92 state: Arc::new(State {
93 config,
94 task_count: Arc::new(AtomicUsize::new(0)),
95 mutable: RwLock::new(MutableState {
96 process_seed: 0,
97 processes: Default::default(),
98 }),
99 }),
100 }
101 }
102
103 pub fn handle(&self) -> WasiControlPlaneHandle {
104 WasiControlPlaneHandle::new(&self.state)
105 }
106
107 fn active_task_count(&self) -> usize {
109 self.state.task_count.load(Ordering::SeqCst)
110 }
111
112 pub(crate) fn config(&self) -> &ControlPlaneConfig {
114 &self.state.config
115 }
116
117 pub(crate) fn register_task(&self) -> Result<TaskCountGuard, ControlPlaneError> {
121 let count = self.state.task_count.fetch_add(1, Ordering::SeqCst);
122 if let Some(max) = self.state.config.max_task_count {
123 if count > max {
124 self.state.task_count.fetch_sub(1, Ordering::SeqCst);
125 return Err(ControlPlaneError::TaskLimitReached { max: count });
126 }
127 }
128 Ok(TaskCountGuard(self.state.task_count.clone()))
129 }
130
131 pub fn new_process(&self, module_hash: ModuleHash) -> Result<WasiProcess, ControlPlaneError> {
135 if let Some(max) = self.state.config.max_task_count {
136 if self.active_task_count() >= max {
137 return Err(ControlPlaneError::TaskLimitReached { max });
140 }
141 }
142
143 let mut proc = WasiProcess::new(WasiProcessId::from(0), module_hash, self.handle());
145
146 let mut mutable = self.state.mutable.write().unwrap();
147
148 let pid = mutable.next_process_id()?;
149 proc.set_pid(pid);
150 mutable.processes.insert(pid, proc.clone());
151 Ok(proc)
152 }
153
154 pub fn generate_id(&self) -> Result<WasiProcessId, ControlPlaneError> {
156 let mut mutable = self.state.mutable.write().unwrap();
157 mutable.next_process_id()
158 }
159
160 pub fn get_process(&self, pid: WasiProcessId) -> Option<WasiProcess> {
162 self.state
163 .mutable
164 .read()
165 .unwrap()
166 .processes
167 .get(&pid)
168 .cloned()
169 }
170}
171
172impl MutableState {
173 fn next_process_id(&mut self) -> Result<WasiProcessId, ControlPlaneError> {
174 let id = self.process_seed.checked_add(1).ok_or({
176 ControlPlaneError::TaskLimitReached {
177 max: u32::MAX as usize,
178 }
179 })?;
180 self.process_seed = id;
181 Ok(WasiProcessId::from(id))
182 }
183}
184
185impl Default for WasiControlPlane {
186 fn default() -> Self {
187 let config = ControlPlaneConfig::default();
188 Self::new(config)
189 }
190}
191
192#[derive(Debug)]
194pub struct TaskCountGuard(Arc<AtomicUsize>);
195
196impl Drop for TaskCountGuard {
197 fn drop(&mut self) {
198 self.0.fetch_sub(1, Ordering::SeqCst);
199 }
200}
201
202#[derive(thiserror::Error, PartialEq, Eq, Clone, Debug)]
203pub enum ControlPlaneError {
204 #[error("The maximum number of execution tasks has been reached ({max})")]
206 TaskLimitReached {
207 max: usize,
209 },
210}
211
212#[cfg(test)]
213mod tests {
214 use wasmer_wasix_types::wasix::ThreadStartType;
215
216 use crate::os::task::thread::WasiMemoryLayout;
217
218 use super::*;
219
220 #[test]
222 fn test_control_plane_task_limits() {
223 let p = WasiControlPlane::new(ControlPlaneConfig {
224 max_task_count: Some(2),
225 enable_asynchronous_threading: false,
226 enable_exponential_cpu_backoff: None,
227 });
228
229 let p1 = p.new_process(ModuleHash::random()).unwrap();
230 let _t1 = p1
231 .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
232 .unwrap();
233 let _t2 = p1
234 .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
235 .unwrap();
236
237 assert_eq!(
238 p.new_process(ModuleHash::random()).unwrap_err(),
239 ControlPlaneError::TaskLimitReached { max: 2 }
240 );
241 }
242
243 #[test]
245 fn test_control_plane_task_limits_with_dropped_threads() {
246 let p = WasiControlPlane::new(ControlPlaneConfig {
247 max_task_count: Some(2),
248 enable_asynchronous_threading: false,
249 enable_exponential_cpu_backoff: None,
250 });
251
252 let p1 = p.new_process(ModuleHash::random()).unwrap();
253
254 for _ in 0..10 {
255 let _thread = p1
256 .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
257 .unwrap();
258 }
259
260 let _t1 = p1
261 .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
262 .unwrap();
263 let _t2 = p1
264 .new_thread(WasiMemoryLayout::default(), ThreadStartType::MainThread)
265 .unwrap();
266
267 assert_eq!(
268 p.new_process(ModuleHash::random()).unwrap_err(),
269 ControlPlaneError::TaskLimitReached { max: 2 }
270 );
271 }
272}