wasmer_wasix/state/linker/sync/linker_shared.rs
1//! Shared dynamic-link state for every [`super::super::Linker`] clone.
2//!
3//! Owns [`LinkerState`] behind [`RwLock`], the topology [`TopologyCoordinator`], and the
4//! `dl_operation_pending` [`AtomicBool`] used to coordinate [`LinkerShared::synchronize_link_operation`]
5//! with followers. Prefer these helpers over raw lock calls — lock ordering and the “never block
6//! on `write()` without cooperating” invariant are spelled out on [`super`] (the linker `sync`
7//! module).
8
9use std::{
10 ops::Deref,
11 sync::{
12 Arc, Barrier, RwLock, RwLockReadGuard, RwLockWriteGuard,
13 atomic::{AtomicBool, Ordering},
14 },
15};
16
17use tracing::trace;
18use wasmer::{AsStoreMut, FunctionEnv, FunctionEnvMut};
19
20use crate::{WasiEnv, WasiProcess, WasiThreadId};
21
22use super::super::{InstanceGroupState, LinkError, LinkerState};
23use super::{
24 DlOperation, LinkerStateWriteBackoff,
25 topology_lock::{TopologyCoordinator, TopologyToken},
26};
27
28/// Shared linkage and synchronization primitives for every [`super::super::Linker`] handle.
29///
30/// Cloning is cheap (`Arc`-backed locks and coordinators); clone when an instance-group handle
31/// outlives a particular stack frame but must keep talking to the same dynamic-link universe.
32#[derive(Clone)]
33pub(in crate::state::linker) struct LinkerShared {
34 /// Global module tables, buses, … — see [`LinkerState`].
35 linker_state: Arc<RwLock<LinkerState>>,
36 /// [`TopologyCoordinator`] embedded with this linker — guards topology-changing phases.
37 ///
38 /// At most **one** active [`TopologyToken`](TopologyToken) may exist cluster-wide while any
39 /// topology mutation sequence is underway.
40 topology_coordinator: TopologyCoordinator,
41 /// Set during [`LinkerShared::synchronize_link_operation`] so syscall paths / cooperative writers
42 /// can enter [`LinkerShared::do_pending_link_operations_internal`].
43 dl_operation_pending: Arc<AtomicBool>,
44}
45
46impl LinkerShared {
47 /// Wraps freshly constructed [`LinkerState`] for the owning process/module tree (initially only
48 /// the main [`super::super::Linker::new`] path).
49 pub(in crate::state::linker) fn new(linker_state: LinkerState) -> Self {
50 Self {
51 linker_state: Arc::new(RwLock::new(linker_state)),
52 topology_coordinator: TopologyCoordinator::new(),
53 dl_operation_pending: Arc::new(AtomicBool::new(false)),
54 }
55 }
56
57 /// Panics unless both DL buses have exactly one receiver — validates main-group bootstrap before
58 /// exclusive writes (see [`Self::bootstrap_exclusive_write_then`]).
59 fn assert_exactly_one_dl_bus_subscriber(ls: &LinkerState) {
60 let op_rx = ls.send_pending_operation.rx_count();
61 let barrier_rx = ls.send_pending_operation_barrier.rx_count();
62 if op_rx != 1 || barrier_rx != 1 {
63 panic!(
64 "wasix linker bootstrap invariant violated: expected exactly one DL bus subscriber \
65 on each sender (pending_operation rx={op_rx}, barrier rx={barrier_rx}); \
66 `LinkerShared::bootstrap_exclusive_write_then` must only run during main \
67 `Linker::new` finalization before additional instance groups attach receivers"
68 );
69 }
70 }
71
72 /// Exclusive [`LinkerState`] write for main linker bootstrap only.
73 ///
74 /// # Safety
75 ///
76 /// Must run only while exactly one instance group has subscribed to both DL buses (verified
77 /// after the lock is taken — mismatch panics in release builds). Caller must respect instance-group /
78 /// linker lock ordering used in [`super::super::Linker::new`].
79 pub(in crate::state::linker) unsafe fn bootstrap_exclusive_write_then<R>(
80 &self,
81 f: impl FnOnce(&mut LinkerState) -> R,
82 ) -> R {
83 let mut guard = self.linker_state.write().unwrap();
84 Self::assert_exactly_one_dl_bus_subscriber(&guard);
85 f(&mut guard)
86 }
87
88 /// Non-blocking `try_write` on [`LinkerState`].
89 ///
90 /// Used sparingly where blocking would recurse into the linker (stub paths, best-effort
91 /// resolution). Prefer [`Self::write_linker_state`] for normal cooperative writes.
92 pub(in crate::state::linker) fn try_write_linker_state(
93 &self,
94 ) -> Result<
95 RwLockWriteGuard<'_, LinkerState>,
96 std::sync::TryLockError<RwLockWriteGuard<'_, LinkerState>>,
97 > {
98 self.linker_state.try_write()
99 }
100
101 /// Non-blocking `try_read` on [`LinkerState`].
102 pub(in crate::state::linker) fn try_read_linker_state(
103 &self,
104 ) -> Result<
105 RwLockReadGuard<'_, LinkerState>,
106 std::sync::TryLockError<RwLockReadGuard<'_, LinkerState>>,
107 > {
108 self.linker_state.try_read()
109 }
110
111 /// Locks [`LinkerState`] for write using repeated `try_write` plus cooperative draining of
112 /// pending dynamic-link replay and [`LinkerStateWriteBackoff`].
113 ///
114 /// Prefer this over naked [`RwLock::write`] / blocking `write()` from instance-group linker
115 /// paths: another OS thread might hold the write lock while follower groups rendezvous at a DL
116 /// barrier waiting for **this** thread to run [`Self::do_pending_link_operations_internal`].
117 pub(in crate::state::linker) fn write_linker_state(
118 &self,
119 group_state: &mut InstanceGroupState,
120 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
121 ) -> Result<RwLockWriteGuard<'_, LinkerState>, LinkError> {
122 let mut linker_write_backoff = LinkerStateWriteBackoff::new();
123 loop {
124 match self.linker_state.try_write() {
125 Ok(guard) => return Ok(guard),
126 Err(std::sync::TryLockError::WouldBlock) => {
127 linker_write_backoff.backoff();
128 let env = ctx.as_ref();
129 let mut store = ctx.as_store_mut();
130 self.do_pending_link_operations_internal(group_state, &mut store, &env)?;
131 }
132 Err(std::sync::TryLockError::Poisoned(_)) => {
133 panic!("The linker state's lock is poisoned");
134 }
135 }
136 }
137 }
138
139 /// [`TopologyCoordinator::try_acquire`] loop with [`LinkerStateWriteBackoff`] plus cooperative drains
140 /// of [`Self::do_pending_link_operations_internal`].
141 ///
142 /// **Lock ordering**: topology must be leased **before** taking [`LinkerState`] for write paths that
143 /// change replicated topology (spawn prepare, guarded loads, [`super::super::Linker::resolve_export`],
144 /// etc.).
145 ///
146 /// `prepare_for_instance_group` is the motivating case — the parent attaches no new subscribers until
147 /// the child finalizes while still holding this token handed across threads.
148 pub(in crate::state::linker) fn acquire_topology_token(
149 &self,
150 group_state: &mut InstanceGroupState,
151 store: &mut impl AsStoreMut,
152 env: &FunctionEnv<WasiEnv>,
153 ) -> Result<TopologyToken, LinkError> {
154 let mut backoff = LinkerStateWriteBackoff::new();
155 loop {
156 if let Some(t) = self.topology_coordinator.try_acquire() {
157 return Ok(t);
158 }
159 backoff.backoff();
160 self.do_pending_link_operations_internal(group_state, store, env)?;
161 }
162 }
163
164 /// Blocking [`RwLock`] write once a [`TopologyToken`] is already held (spawn finalization —
165 /// e.g. [`super::super::Linker::create_instance_group`]).
166 ///
167 /// Returns `(token, guard)` — drop the **`guard`** before **`token`** to avoid extending the write
168 /// critical section beyond topology decisions.
169 pub(in crate::state::linker) fn write_linker_state_blocking_holding_topology(
170 &self,
171 topology: TopologyToken,
172 ) -> (TopologyToken, RwLockWriteGuard<'_, LinkerState>) {
173 let linker_state_write_guard = self.linker_state.write().unwrap();
174 (topology, linker_state_write_guard)
175 }
176
177 /// Acquires topology (see [`Self::acquire_topology_token`]), then takes a blocking write lock via
178 /// [`Self::write_linker_state_blocking_holding_topology`].
179 ///
180 /// Use this for paths that mutate [`LinkerState`] under the topology coordinator’s single-writer
181 /// umbrella when the lease was **not** already taken elsewhere.
182 pub(in crate::state::linker) fn write_linker_state_with_topology(
183 &self,
184 group_state: &mut InstanceGroupState,
185 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
186 ) -> Result<(TopologyToken, RwLockWriteGuard<'_, LinkerState>), LinkError> {
187 let env = ctx.as_ref();
188 let mut store = ctx.as_store_mut();
189 let token = self.acquire_topology_token(group_state, &mut store, &env)?;
190 Ok(self.write_linker_state_blocking_holding_topology(token))
191 }
192
193 /// Broadcasts [`DlOperation`] `op` to every instance-group receiver then waits for replay.
194 ///
195 /// Contracts:
196 ///
197 /// * `topology` must already belong to **this** instigating flow and was leased **before**
198 /// exclusive access to buses / tables was acquired.
199 /// * `linker_state_write_lock` guards bus broadcast invariants (`try_broadcast` must succeed).
200 /// * Recoverable semantic failures are surfaced by callers; panics here are always fatal —
201 /// bus capacity misuse or missed rendezvous implies we cannot reconcile groups.
202 /// * Drops `topology` when done (`num_groups <= 1`) or after the follower completion barrier.
203 pub(in crate::state::linker) fn synchronize_link_operation(
204 &self,
205 topology: TopologyToken,
206 op: DlOperation,
207 mut linker_state_write_lock: RwLockWriteGuard<'_, LinkerState>,
208 group_state: &mut InstanceGroupState,
209 wasi_process: &WasiProcess,
210 self_thread_id: WasiThreadId,
211 ) {
212 trace!(?op, "Synchronizing link operation");
213
214 let num_groups = linker_state_write_lock.send_pending_operation.rx_count();
215
216 if num_groups <= 1 {
217 trace!("No other living instance groups, nothing to do");
218 drop(topology);
219 return;
220 }
221
222 let barrier = Arc::new(Barrier::new(num_groups));
223 // Single-flight barrier envelope (bus depth is one intentionally).
224 if linker_state_write_lock
225 .send_pending_operation_barrier
226 .try_broadcast(barrier.clone())
227 .is_err()
228 {
229 panic!("Internal error: more than one synchronized link operation active")
230 }
231
232 // Wake followers so syscall paths re-enter cooperative DL helpers promptly.
233 self.dl_operation_pending.store(true, Ordering::SeqCst);
234
235 trace!("Signalling wasix threads to wake up");
236 for thread in wasi_process
237 .all_threads()
238 .into_iter()
239 .filter(|tid| *tid != self_thread_id)
240 {
241 wasi_process.signal_thread(&thread, wasmer_wasix_types::wasi::Signal::Sigwakeup);
242 }
243
244 trace!(%num_groups, "Waiting at barrier");
245 barrier.wait();
246
247 trace!("All threads now processing dl op");
248
249 // Everyone saw [`Self::dl_operation_pending_load`] and will drive `recv` paths.
250 self.dl_operation_pending.store(false, Ordering::SeqCst);
251
252 // Still under write lock: publish the replicated command before releasing exclusivity.
253 if linker_state_write_lock
254 .send_pending_operation
255 .try_broadcast(op.clone())
256 .is_err()
257 {
258 panic!("Internal error: more than one synchronized link operation active")
259 }
260
261 // Downgrade to shared read while followers apply (`apply_dl_operation`); no topology writer
262 // should race between the barrier epochs.
263 trace!("Unlocking linker state");
264 drop(linker_state_write_lock);
265 let linker_state_read_lock = self.linker_state.read().unwrap();
266
267 // Drain local bus copies — frees mailbox capacity before the follower epoch completes.
268 _ = group_state.recv_pending_operation_barrier.recv().unwrap();
269 _ = group_state.recv_pending_operation.recv().unwrap();
270
271 // Second rendezvous guarantees everyone finished before another writer can preempt read-only
272 // application (see linker `sync` module discussion).
273 trace!("Waiting for other threads to finish processing the dl op");
274 barrier.wait();
275
276 drop(linker_state_read_lock);
277 drop(topology);
278
279 trace!("Synchronization complete");
280 }
281
282 /// Peek at the cooperative-DL handshake flag `dl_operation_pending` with arbitrary memory
283 /// ordering.
284 ///
285 /// Prefer [`Ordering::SeqCst`] (`fast = false` in callers) whenever another thread waking from
286 /// `Sigwakeup` must reliably observe transitions; relaxed loads are intentionally lossy —
287 /// safe only when callers will retry promptly on their own syscall boundaries.
288 pub(in crate::state::linker) fn dl_operation_pending_load(&self, ordering: Ordering) -> bool {
289 self.dl_operation_pending.load(ordering)
290 }
291
292 /// Follow half of [`Self::synchronize_link_operation`] — participates in barriers, consumes the broadcast
293 /// [`DlOperation`], and applies `op` to `group_state` under [`LinkerState`] read access.
294 ///
295 /// Intended for callers that already skipped the idle fast path (cheap load of
296 /// `dl_operation_pending`) yet still need deterministic rendezvous semantics.
297 ///
298 /// # Panics
299 ///
300 /// Missing receivers / malformed bus state panic — those are irrecoverable and indicate we lost
301 /// synchronization with subscribers.
302 pub(in crate::state::linker) fn do_pending_link_operations_internal(
303 &self,
304 group_state: &mut InstanceGroupState,
305 store: &mut impl AsStoreMut,
306 env: &FunctionEnv<WasiEnv>,
307 ) -> Result<(), LinkError> {
308 if !self.dl_operation_pending.load(Ordering::SeqCst) {
309 return Ok(());
310 }
311
312 trace!("Pending link operation discovered, will process");
313
314 let barrier = group_state.recv_pending_operation_barrier.recv().expect(
315 "Failed to receive barrier while a DL operation was \
316 in progress; this condition can't be recovered from",
317 );
318 barrier.wait();
319
320 trace!("Past the barrier, now processing operation");
321
322 // Barrier epoch complete — instigator downgraded writer→reader earlier, so follower reads OK.
323 let op = group_state.recv_pending_operation.recv().unwrap();
324 let linker_state = self.linker_state.read().unwrap();
325
326 let result = group_state.apply_dl_operation(linker_state.deref(), op, store, env);
327
328 trace!("Operation applied, now waiting at second barrier");
329
330 // Rendezvous again so nobody leaves while others still mutate stores / tables concurrently.
331 barrier.wait();
332 drop(linker_state);
333
334 trace!("Pending link operation applied successfully");
335
336 result
337 }
338}