wasmer_wasix/os/task/
backoff.rs1use std::{
2 collections::HashMap,
3 pin::Pin,
4 sync::{
5 Arc,
6 atomic::{AtomicU32, Ordering},
7 },
8 task::{Context, Poll, Waker},
9 time::Duration,
10};
11
12use futures::{Future, FutureExt};
13use wasmer_wasix_types::wasi::Snapshot0Clockid;
14
15use crate::{VirtualTaskManager, WasiProcess, syscalls::platform_clock_time_get};
16
17use super::process::LockableWasiProcessInner;
18
19#[derive(Debug)]
23pub struct WasiProcessCpuBackoff {
24 cpu_backoff_wakers: HashMap<u64, Waker>,
28 cpu_backoff_waker_seed: u64,
30 cpu_backoff_time: Duration,
32 cpu_run_cool_off: u128,
35 max_cpu_backoff_time: Duration,
37 max_cpu_cool_off_time: Duration,
40}
41
42impl WasiProcessCpuBackoff {
43 pub fn new(max_cpu_backoff_time: Duration, max_cpu_cool_off_time: Duration) -> Self {
44 Self {
45 cpu_backoff_wakers: Default::default(),
46 cpu_backoff_waker_seed: 0,
47 cpu_backoff_time: Duration::ZERO,
48 cpu_run_cool_off: 0,
49 max_cpu_backoff_time,
50 max_cpu_cool_off_time,
51 }
52 }
53}
54
55#[derive(Debug)]
56pub struct CpuRunToken {
57 tokens: Arc<AtomicU32>,
58}
59
60impl Drop for CpuRunToken {
61 fn drop(&mut self) {
62 self.tokens.fetch_sub(1, Ordering::SeqCst);
63 }
64}
65
66pub struct CpuBackoffToken {
67 cpu_backoff_time: Duration,
69 wait: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
71 waker_id: Option<u64>,
73 inner: LockableWasiProcessInner,
76}
77
78impl CpuBackoffToken {
79 pub fn backoff_time(&self) -> Duration {
80 self.cpu_backoff_time
81 }
82}
83
84impl Future for CpuBackoffToken {
85 type Output = ();
86
87 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88 let inner = self.inner.clone();
89 let mut inner = inner.0.lock().unwrap();
90
91 if let Some(waker_id) = self.waker_id.take() {
94 if inner.backoff.cpu_backoff_wakers.remove(&waker_id).is_none() {
95 return Poll::Ready(());
98 }
99 }
100
101 let id = inner.backoff.cpu_backoff_waker_seed + 1;
103 inner.backoff.cpu_backoff_waker_seed = id;
104 inner
105 .backoff
106 .cpu_backoff_wakers
107 .insert(id, cx.waker().clone());
108
109 let ret = self.wait.poll_unpin(cx);
111
112 if ret.is_ready() && self.cpu_backoff_time == inner.backoff.cpu_backoff_time {
116 inner.backoff.cpu_backoff_time *= 2;
117 if inner.backoff.cpu_backoff_time > inner.backoff.max_cpu_backoff_time {
118 inner.backoff.cpu_backoff_time = inner.backoff.max_cpu_backoff_time;
119 }
120 }
121
122 ret
123 }
124}
125
126impl Drop for CpuBackoffToken {
127 fn drop(&mut self) {
128 if let Some(waker_id) = self.waker_id.take() {
129 let mut inner = self.inner.0.lock().unwrap();
130 inner.backoff.cpu_backoff_wakers.remove(&waker_id);
131 }
132 }
133}
134
135impl WasiProcess {
136 pub fn acquire_cpu_run_token(&self) -> CpuRunToken {
138 self.cpu_run_tokens.fetch_add(1, Ordering::SeqCst);
139
140 let mut inner = self.inner.0.lock().unwrap();
141 for (_, waker) in inner.backoff.cpu_backoff_wakers.iter() {
142 waker.wake_by_ref();
143 }
144 inner.backoff.cpu_backoff_wakers.clear();
145 inner.backoff.cpu_backoff_time = Duration::ZERO;
146 inner.backoff.cpu_run_cool_off = 0;
147
148 CpuRunToken {
149 tokens: self.cpu_run_tokens.clone(),
150 }
151 }
152
153 pub fn acquire_cpu_backoff_token(
155 &self,
156 tasks: &Arc<dyn VirtualTaskManager>,
157 ) -> Option<CpuBackoffToken> {
158 if self.cpu_run_tokens.load(Ordering::SeqCst) > 0 {
161 return None;
162 }
163
164 let cpu_backoff_time = {
165 let mut inner = self.inner.0.lock().unwrap();
166
167 if self.cpu_run_tokens.load(Ordering::SeqCst) > 0 {
169 return None;
170 }
171
172 let now =
174 platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128;
175 if inner.backoff.cpu_run_cool_off == 0 {
176 inner.backoff.cpu_run_cool_off =
177 now + (1_000_000 * inner.backoff.max_cpu_cool_off_time.as_millis());
178 }
179 if now <= inner.backoff.cpu_run_cool_off {
180 return None;
181 }
182
183 if inner.backoff.cpu_backoff_time == Duration::ZERO {
186 inner.backoff.cpu_backoff_time = Duration::from_millis(1);
187 }
188 inner.backoff.cpu_backoff_time
189 };
190 let how_long = tasks.sleep_now(cpu_backoff_time);
191
192 Some(CpuBackoffToken {
193 cpu_backoff_time,
194 wait: how_long,
195 waker_id: None,
196 inner: self.inner.clone(),
197 })
198 }
199}