wasmer_wasix/state/linker/sync/
topology_lock.rs

1//! Single-writer gate for WASIX linker topology changes.
2//!
3//! While a [`TopologyToken`] is held, no other topology-changing operation may begin.
4//! The token may be moved to another thread (it is [`Send`]).
5//!
6//! Use [`TopologyCoordinator::try_acquire`] once per cooperative retry loop alongside
7//! [`LinkerStateWriteBackoff`](super::LinkerStateWriteBackoff), analogous to the cooperative linker-state writers in this module (`write_linker_state`, `write_linker_state_with_topology`).
8
9use std::sync::{
10    Arc, Mutex,
11    atomic::{AtomicU64, Ordering},
12};
13
14struct TopologyGateState {
15    /// When `Some`, a [`TopologyToken`] owns the gate.
16    active_token: Option<u64>,
17}
18
19struct TopologyCoordinatorInner {
20    next_topology_token: AtomicU64,
21    /// Test-only: counts successful [`TopologyCoordinator::try_acquire`] grants.
22    #[cfg(test)]
23    topology_generation: AtomicU64,
24    gate: Mutex<TopologyGateState>,
25}
26
27/// Shared coordinator embedded in [`LinkerShared`](super::LinkerShared).
28///
29/// Call [`Self::try_acquire`] from an outer retry loop once pending DL ops have been
30/// cooperated with; do not spin forever without backoff (see [`super::LinkerStateWriteBackoff`](super::LinkerStateWriteBackoff)).
31#[derive(Clone)]
32pub(super) struct TopologyCoordinator {
33    inner: Arc<TopologyCoordinatorInner>,
34}
35
36impl TopologyCoordinator {
37    pub(super) fn new() -> Self {
38        Self {
39            inner: Arc::new(TopologyCoordinatorInner {
40                next_topology_token: AtomicU64::new(0),
41                #[cfg(test)]
42                topology_generation: AtomicU64::new(0),
43                gate: Mutex::new(TopologyGateState { active_token: None }),
44            }),
45        }
46    }
47
48    /// Grants the topology lease if idle; otherwise [`None`] (caller should retry after cooperation + backoff).
49    pub(super) fn try_acquire(&self) -> Option<TopologyToken> {
50        let mut guard = self.inner.gate.lock().unwrap_or_else(|e| e.into_inner());
51        if guard.active_token.is_some() {
52            return None;
53        }
54        let token = self
55            .inner
56            .next_topology_token
57            .fetch_add(1, Ordering::SeqCst);
58        guard.active_token = Some(token);
59        drop(guard);
60
61        #[cfg(test)]
62        self.inner
63            .topology_generation
64            .fetch_add(1, Ordering::SeqCst);
65
66        Some(TopologyToken {
67            inner: Arc::clone(&self.inner),
68            token,
69        })
70    }
71
72    #[cfg(test)]
73    fn active_token_debug(&self) -> Option<u64> {
74        let g = self.inner.gate.lock().unwrap_or_else(|e| e.into_inner());
75        g.active_token
76    }
77
78    #[cfg(test)]
79    fn topology_generation_debug(&self) -> u64 {
80        self.inner.topology_generation.load(Ordering::SeqCst)
81    }
82}
83
84/// RAII lease on linker topology serialization.
85///
86/// Dropping clears the coordinator so another [`TopologyCoordinator::try_acquire`] can succeed.
87pub(crate) struct TopologyToken {
88    inner: Arc<TopologyCoordinatorInner>,
89    token: u64,
90}
91
92impl TopologyToken {
93    /// Stable id for this lease (distinct per coordinator while they are handed out sequentially).
94    #[cfg(test)]
95    pub(crate) fn token_id_debug(&self) -> u64 {
96        self.token
97    }
98}
99
100impl Drop for TopologyToken {
101    fn drop(&mut self) {
102        let mut guard = self.inner.gate.lock().unwrap_or_else(|e| e.into_inner());
103        debug_assert_eq!(guard.active_token, Some(self.token));
104        guard.active_token = None;
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use std::sync::mpsc;
112    use std::time::{Duration, Instant};
113
114    fn try_acquire_blocking(coord: &TopologyCoordinator) -> TopologyToken {
115        loop {
116            if let Some(t) = coord.try_acquire() {
117                return t;
118            }
119            std::thread::yield_now();
120        }
121    }
122
123    #[test]
124    fn second_try_acquire_succeeds_only_after_drop() {
125        let coord = TopologyCoordinator::new();
126        let t1 = coord.try_acquire().expect("vacant coordinator");
127        let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
128        let coord2 = coord.clone();
129        let th = std::thread::spawn(move || {
130            let _blocked = try_acquire_blocking(&coord2);
131            done_tx.send(()).unwrap();
132        });
133        std::thread::sleep(Duration::from_millis(20));
134        assert!(done_rx.try_recv().is_err());
135        drop(t1);
136        done_rx.recv_timeout(Duration::from_secs(2)).unwrap();
137        th.join().unwrap();
138    }
139
140    #[test]
141    fn tokens_are_ordered_and_generation_advances() {
142        let coord = TopologyCoordinator::new();
143        assert_eq!(coord.topology_generation_debug(), 0);
144        assert!(coord.active_token_debug().is_none());
145        let a = coord.try_acquire().unwrap();
146        assert_eq!(a.token_id_debug(), 0);
147        assert_eq!(coord.topology_generation_debug(), 1);
148        drop(a);
149        let b = coord.try_acquire().unwrap();
150        assert_eq!(b.token_id_debug(), 1);
151        assert_eq!(coord.topology_generation_debug(), 2);
152    }
153
154    #[test]
155    fn contended_try_acquire_resolves_quickly_after_drop() {
156        let coord = Arc::new(TopologyCoordinator::new());
157        let barrier = Arc::new(std::sync::Barrier::new(2));
158
159        let c1 = Arc::clone(&coord);
160        let b1 = Arc::clone(&barrier);
161        let th = std::thread::spawn(move || {
162            let _guard = try_acquire_blocking(&c1);
163            b1.wait();
164            std::thread::sleep(Duration::from_millis(30));
165        });
166
167        barrier.wait();
168        let start = Instant::now();
169        let _second = try_acquire_blocking(&coord);
170        let elapsed = start.elapsed();
171
172        assert!(
173            elapsed < Duration::from_millis(200),
174            "waited {:?}, expected uncontended-ish acquire after holder exit",
175            elapsed
176        );
177
178        th.join().unwrap();
179    }
180}