wasmer_wasix/os/task/
backoff.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    sync::{
5        Arc,
6        atomic::{AtomicU32, Ordering},
7    },
8    task::{Context, Poll, Waker},
9    time::Duration,
10};
11
12use futures::{Future, FutureExt};
13use wasmer_wasix_types::wasi::Snapshot0Clockid;
14
15use crate::{VirtualTaskManager, WasiProcess, syscalls::platform_clock_time_get};
16
17use super::process::LockableWasiProcessInner;
18
19/// Represents the CPU backoff properties for a process
20/// which will be used to determine if the CPU should be
21/// throttled or not
22#[derive(Debug)]
23pub struct WasiProcessCpuBackoff {
24    /// Referenced list of wakers that will be triggered
25    /// when the process goes active again due to a token
26    /// being acquired
27    cpu_backoff_wakers: HashMap<u64, Waker>,
28    /// Seed used to register CPU release wakers
29    cpu_backoff_waker_seed: u64,
30    /// The amount of CPU backoff time we are currently waiting
31    cpu_backoff_time: Duration,
32    /// When the backoff is reset the cool-off period will keep
33    /// things running for a short period of time extra
34    cpu_run_cool_off: u128,
35    /// Maximum amount of CPU backoff time before it starts capping
36    max_cpu_backoff_time: Duration,
37    /// Amount of time the CPU should cool-off after exiting run
38    /// before it begins a backoff
39    max_cpu_cool_off_time: Duration,
40}
41
42impl WasiProcessCpuBackoff {
43    pub fn new(max_cpu_backoff_time: Duration, max_cpu_cool_off_time: Duration) -> Self {
44        Self {
45            cpu_backoff_wakers: Default::default(),
46            cpu_backoff_waker_seed: 0,
47            cpu_backoff_time: Duration::ZERO,
48            cpu_run_cool_off: 0,
49            max_cpu_backoff_time,
50            max_cpu_cool_off_time,
51        }
52    }
53}
54
55#[derive(Debug)]
56pub struct CpuRunToken {
57    tokens: Arc<AtomicU32>,
58}
59
60impl Drop for CpuRunToken {
61    fn drop(&mut self) {
62        self.tokens.fetch_sub(1, Ordering::SeqCst);
63    }
64}
65
66pub struct CpuBackoffToken {
67    /// The amount of CPU backoff time we are currently waiting
68    cpu_backoff_time: Duration,
69    /// How long should the CPU backoff for
70    wait: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
71    /// ID used to unregister the wakers
72    waker_id: Option<u64>,
73    /// The inner protected region of the process with a conditional
74    /// variable that is used for coordination such as checksums.
75    inner: LockableWasiProcessInner,
76}
77
78impl CpuBackoffToken {
79    pub fn backoff_time(&self) -> Duration {
80        self.cpu_backoff_time
81    }
82}
83
84impl Future for CpuBackoffToken {
85    type Output = ();
86
87    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88        let inner = self.inner.clone();
89        let mut inner = inner.0.lock().unwrap();
90
91        // Registering the waker will unregister any previous wakers
92        // so that we don't go into an endless memory growth situation
93        if let Some(waker_id) = self.waker_id.take() {
94            if inner.backoff.cpu_backoff_wakers.remove(&waker_id).is_none() {
95                // if we did not remove the waker, then someone else did
96                // which means we were woken and should exit the backoff phase
97                return Poll::Ready(());
98            }
99        }
100
101        // Register ourselves as a waker to be woken
102        let id = inner.backoff.cpu_backoff_waker_seed + 1;
103        inner.backoff.cpu_backoff_waker_seed = id;
104        inner
105            .backoff
106            .cpu_backoff_wakers
107            .insert(id, cx.waker().clone());
108
109        // Now poll the waiting period
110        let ret = self.wait.poll_unpin(cx);
111
112        // If we have reached the end of the wait period
113        // then we need to exponentially grow it any future
114        // backoff's so that it gets slower
115        if ret.is_ready() && self.cpu_backoff_time == inner.backoff.cpu_backoff_time {
116            inner.backoff.cpu_backoff_time *= 2;
117            if inner.backoff.cpu_backoff_time > inner.backoff.max_cpu_backoff_time {
118                inner.backoff.cpu_backoff_time = inner.backoff.max_cpu_backoff_time;
119            }
120        }
121
122        ret
123    }
124}
125
126impl Drop for CpuBackoffToken {
127    fn drop(&mut self) {
128        if let Some(waker_id) = self.waker_id.take() {
129            let mut inner = self.inner.0.lock().unwrap();
130            inner.backoff.cpu_backoff_wakers.remove(&waker_id);
131        }
132    }
133}
134
135impl WasiProcess {
136    // Releases the CPU backoff (if one is active)
137    pub fn acquire_cpu_run_token(&self) -> CpuRunToken {
138        self.cpu_run_tokens.fetch_add(1, Ordering::SeqCst);
139
140        let mut inner = self.inner.0.lock().unwrap();
141        for (_, waker) in inner.backoff.cpu_backoff_wakers.iter() {
142            waker.wake_by_ref();
143        }
144        inner.backoff.cpu_backoff_wakers.clear();
145        inner.backoff.cpu_backoff_time = Duration::ZERO;
146        inner.backoff.cpu_run_cool_off = 0;
147
148        CpuRunToken {
149            tokens: self.cpu_run_tokens.clone(),
150        }
151    }
152
153    // Determine if we should do a CPU backoff
154    pub fn acquire_cpu_backoff_token(
155        &self,
156        tasks: &Arc<dyn VirtualTaskManager>,
157    ) -> Option<CpuBackoffToken> {
158        // If run tokens are held then we should allow executing to
159        // continue at its top pace
160        if self.cpu_run_tokens.load(Ordering::SeqCst) > 0 {
161            return None;
162        }
163
164        let cpu_backoff_time = {
165            let mut inner = self.inner.0.lock().unwrap();
166
167            // check again as it might have changed (race condition)
168            if self.cpu_run_tokens.load(Ordering::SeqCst) > 0 {
169                return None;
170            }
171
172            // Check if a cool-off-period has passed
173            let now =
174                platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128;
175            if inner.backoff.cpu_run_cool_off == 0 {
176                inner.backoff.cpu_run_cool_off =
177                    now + (1_000_000 * inner.backoff.max_cpu_cool_off_time.as_millis());
178            }
179            if now <= inner.backoff.cpu_run_cool_off {
180                return None;
181            }
182
183            // The amount of time we wait will be increased when a full
184            // time slice is executed
185            if inner.backoff.cpu_backoff_time == Duration::ZERO {
186                inner.backoff.cpu_backoff_time = Duration::from_millis(1);
187            }
188            inner.backoff.cpu_backoff_time
189        };
190        let how_long = tasks.sleep_now(cpu_backoff_time);
191
192        Some(CpuBackoffToken {
193            cpu_backoff_time,
194            wait: how_long,
195            waker_id: None,
196            inner: self.inner.clone(),
197        })
198    }
199}