use std::{
collections::HashMap,
pin::Pin,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::{Context, Poll, Waker},
time::Duration,
};
use futures::{Future, FutureExt};
use wasmer_wasix_types::wasi::Snapshot0Clockid;
use crate::{syscalls::platform_clock_time_get, VirtualTaskManager, WasiProcess};
use super::process::LockableWasiProcessInner;
#[derive(Debug)]
pub struct WasiProcessCpuBackoff {
cpu_backoff_wakers: HashMap<u64, Waker>,
cpu_backoff_waker_seed: u64,
cpu_backoff_time: Duration,
cpu_run_cool_off: u128,
max_cpu_backoff_time: Duration,
max_cpu_cool_off_time: Duration,
}
impl WasiProcessCpuBackoff {
pub fn new(max_cpu_backoff_time: Duration, max_cpu_cool_off_time: Duration) -> Self {
Self {
cpu_backoff_wakers: Default::default(),
cpu_backoff_waker_seed: 0,
cpu_backoff_time: Duration::ZERO,
cpu_run_cool_off: 0,
max_cpu_backoff_time,
max_cpu_cool_off_time,
}
}
}
#[derive(Debug)]
pub struct CpuRunToken {
tokens: Arc<AtomicU32>,
}
impl Drop for CpuRunToken {
fn drop(&mut self) {
self.tokens.fetch_sub(1, Ordering::SeqCst);
}
}
pub struct CpuBackoffToken {
cpu_backoff_time: Duration,
wait: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
waker_id: Option<u64>,
inner: LockableWasiProcessInner,
}
impl CpuBackoffToken {
pub fn backoff_time(&self) -> Duration {
self.cpu_backoff_time
}
}
impl Future for CpuBackoffToken {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.inner.clone();
let mut inner = inner.0.lock().unwrap();
if let Some(waker_id) = self.waker_id.take() {
if inner.backoff.cpu_backoff_wakers.remove(&waker_id).is_none() {
return Poll::Ready(());
}
}
let id = inner.backoff.cpu_backoff_waker_seed + 1;
inner.backoff.cpu_backoff_waker_seed = id;
inner
.backoff
.cpu_backoff_wakers
.insert(id, cx.waker().clone());
let ret = self.wait.poll_unpin(cx);
if ret.is_ready() && self.cpu_backoff_time == inner.backoff.cpu_backoff_time {
inner.backoff.cpu_backoff_time *= 2;
if inner.backoff.cpu_backoff_time > inner.backoff.max_cpu_backoff_time {
inner.backoff.cpu_backoff_time = inner.backoff.max_cpu_backoff_time;
}
}
ret
}
}
impl Drop for CpuBackoffToken {
fn drop(&mut self) {
if let Some(waker_id) = self.waker_id.take() {
let mut inner = self.inner.0.lock().unwrap();
inner.backoff.cpu_backoff_wakers.remove(&waker_id);
}
}
}
impl WasiProcess {
pub fn acquire_cpu_run_token(&self) -> CpuRunToken {
self.cpu_run_tokens.fetch_add(1, Ordering::SeqCst);
let mut inner = self.inner.0.lock().unwrap();
for (_, waker) in inner.backoff.cpu_backoff_wakers.iter() {
waker.wake_by_ref();
}
inner.backoff.cpu_backoff_wakers.clear();
inner.backoff.cpu_backoff_time = Duration::ZERO;
inner.backoff.cpu_run_cool_off = 0;
CpuRunToken {
tokens: self.cpu_run_tokens.clone(),
}
}
pub fn acquire_cpu_backoff_token(
&self,
tasks: &Arc<dyn VirtualTaskManager>,
) -> Option<CpuBackoffToken> {
if self.cpu_run_tokens.load(Ordering::SeqCst) > 0 {
return None;
}
let cpu_backoff_time = {
let mut inner = self.inner.0.lock().unwrap();
if self.cpu_run_tokens.load(Ordering::SeqCst) > 0 {
return None;
}
let now =
platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128;
if inner.backoff.cpu_run_cool_off == 0 {
inner.backoff.cpu_run_cool_off =
now + (1_000_000 * inner.backoff.max_cpu_cool_off_time.as_millis());
}
if now <= inner.backoff.cpu_run_cool_off {
return None;
}
if inner.backoff.cpu_backoff_time == Duration::ZERO {
inner.backoff.cpu_backoff_time = Duration::from_millis(1);
}
inner.backoff.cpu_backoff_time
};
let how_long = tasks.sleep_now(cpu_backoff_time);
Some(CpuBackoffToken {
cpu_backoff_time,
wait: how_long,
waker_id: None,
inner: self.inner.clone(),
})
}
}