wasmer_wasix/state/linker/sync/
topology_lock.rs1use std::sync::{
10 Arc, Mutex,
11 atomic::{AtomicU64, Ordering},
12};
13
14struct TopologyGateState {
15 active_token: Option<u64>,
17}
18
19struct TopologyCoordinatorInner {
20 next_topology_token: AtomicU64,
21 #[cfg(test)]
23 topology_generation: AtomicU64,
24 gate: Mutex<TopologyGateState>,
25}
26
27#[derive(Clone)]
32pub(super) struct TopologyCoordinator {
33 inner: Arc<TopologyCoordinatorInner>,
34}
35
36impl TopologyCoordinator {
37 pub(super) fn new() -> Self {
38 Self {
39 inner: Arc::new(TopologyCoordinatorInner {
40 next_topology_token: AtomicU64::new(0),
41 #[cfg(test)]
42 topology_generation: AtomicU64::new(0),
43 gate: Mutex::new(TopologyGateState { active_token: None }),
44 }),
45 }
46 }
47
48 pub(super) fn try_acquire(&self) -> Option<TopologyToken> {
50 let mut guard = self.inner.gate.lock().unwrap_or_else(|e| e.into_inner());
51 if guard.active_token.is_some() {
52 return None;
53 }
54 let token = self
55 .inner
56 .next_topology_token
57 .fetch_add(1, Ordering::SeqCst);
58 guard.active_token = Some(token);
59 drop(guard);
60
61 #[cfg(test)]
62 self.inner
63 .topology_generation
64 .fetch_add(1, Ordering::SeqCst);
65
66 Some(TopologyToken {
67 inner: Arc::clone(&self.inner),
68 token,
69 })
70 }
71
72 #[cfg(test)]
73 fn active_token_debug(&self) -> Option<u64> {
74 let g = self.inner.gate.lock().unwrap_or_else(|e| e.into_inner());
75 g.active_token
76 }
77
78 #[cfg(test)]
79 fn topology_generation_debug(&self) -> u64 {
80 self.inner.topology_generation.load(Ordering::SeqCst)
81 }
82}
83
84pub(crate) struct TopologyToken {
88 inner: Arc<TopologyCoordinatorInner>,
89 token: u64,
90}
91
92impl TopologyToken {
93 #[cfg(test)]
95 pub(crate) fn token_id_debug(&self) -> u64 {
96 self.token
97 }
98}
99
100impl Drop for TopologyToken {
101 fn drop(&mut self) {
102 let mut guard = self.inner.gate.lock().unwrap_or_else(|e| e.into_inner());
103 debug_assert_eq!(guard.active_token, Some(self.token));
104 guard.active_token = None;
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use std::sync::mpsc;
112 use std::time::{Duration, Instant};
113
114 fn try_acquire_blocking(coord: &TopologyCoordinator) -> TopologyToken {
115 loop {
116 if let Some(t) = coord.try_acquire() {
117 return t;
118 }
119 std::thread::yield_now();
120 }
121 }
122
123 #[test]
124 fn second_try_acquire_succeeds_only_after_drop() {
125 let coord = TopologyCoordinator::new();
126 let t1 = coord.try_acquire().expect("vacant coordinator");
127 let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
128 let coord2 = coord.clone();
129 let th = std::thread::spawn(move || {
130 let _blocked = try_acquire_blocking(&coord2);
131 done_tx.send(()).unwrap();
132 });
133 std::thread::sleep(Duration::from_millis(20));
134 assert!(done_rx.try_recv().is_err());
135 drop(t1);
136 done_rx.recv_timeout(Duration::from_secs(2)).unwrap();
137 th.join().unwrap();
138 }
139
140 #[test]
141 fn tokens_are_ordered_and_generation_advances() {
142 let coord = TopologyCoordinator::new();
143 assert_eq!(coord.topology_generation_debug(), 0);
144 assert!(coord.active_token_debug().is_none());
145 let a = coord.try_acquire().unwrap();
146 assert_eq!(a.token_id_debug(), 0);
147 assert_eq!(coord.topology_generation_debug(), 1);
148 drop(a);
149 let b = coord.try_acquire().unwrap();
150 assert_eq!(b.token_id_debug(), 1);
151 assert_eq!(coord.topology_generation_debug(), 2);
152 }
153
154 #[test]
155 fn contended_try_acquire_resolves_quickly_after_drop() {
156 let coord = Arc::new(TopologyCoordinator::new());
157 let barrier = Arc::new(std::sync::Barrier::new(2));
158
159 let c1 = Arc::clone(&coord);
160 let b1 = Arc::clone(&barrier);
161 let th = std::thread::spawn(move || {
162 let _guard = try_acquire_blocking(&c1);
163 b1.wait();
164 std::thread::sleep(Duration::from_millis(30));
165 });
166
167 barrier.wait();
168 let start = Instant::now();
169 let _second = try_acquire_blocking(&coord);
170 let elapsed = start.elapsed();
171
172 assert!(
173 elapsed < Duration::from_millis(200),
174 "waited {:?}, expected uncontended-ish acquire after holder exit",
175 elapsed
176 );
177
178 th.join().unwrap();
179 }
180}