wasmer_wasix/state/linker/sync/
linker_shared.rs

1//! Shared dynamic-link state for every [`super::super::Linker`] clone.
2//!
3//! Owns [`LinkerState`] behind [`RwLock`], the topology [`TopologyCoordinator`], and the
4//! `dl_operation_pending` [`AtomicBool`] used to coordinate [`LinkerShared::synchronize_link_operation`]
5//! with followers. Prefer these helpers over raw lock calls — lock ordering and the “never block
6//! on `write()` without cooperating” invariant are spelled out on [`super`] (the linker `sync`
7//! module).
8
9use std::{
10    ops::Deref,
11    sync::{
12        Arc, Barrier, RwLock, RwLockReadGuard, RwLockWriteGuard,
13        atomic::{AtomicBool, Ordering},
14    },
15};
16
17use tracing::trace;
18use wasmer::{AsStoreMut, FunctionEnv, FunctionEnvMut};
19
20use crate::{WasiEnv, WasiProcess, WasiThreadId};
21
22use super::super::{InstanceGroupState, LinkError, LinkerState};
23use super::{
24    DlOperation, LinkerStateWriteBackoff,
25    topology_lock::{TopologyCoordinator, TopologyToken},
26};
27
28/// Shared linkage and synchronization primitives for every [`super::super::Linker`] handle.
29///
30/// Cloning is cheap (`Arc`-backed locks and coordinators); clone when an instance-group handle
31/// outlives a particular stack frame but must keep talking to the same dynamic-link universe.
32#[derive(Clone)]
33pub(in crate::state::linker) struct LinkerShared {
34    /// Global module tables, buses, … — see [`LinkerState`].
35    linker_state: Arc<RwLock<LinkerState>>,
36    /// [`TopologyCoordinator`] embedded with this linker — guards topology-changing phases.
37    ///
38    /// At most **one** active [`TopologyToken`](TopologyToken) may exist cluster-wide while any
39    /// topology mutation sequence is underway.
40    topology_coordinator: TopologyCoordinator,
41    /// Set during [`LinkerShared::synchronize_link_operation`] so syscall paths / cooperative writers
42    /// can enter [`LinkerShared::do_pending_link_operations_internal`].
43    dl_operation_pending: Arc<AtomicBool>,
44}
45
46impl LinkerShared {
47    /// Wraps freshly constructed [`LinkerState`] for the owning process/module tree (initially only
48    /// the main [`super::super::Linker::new`] path).
49    pub(in crate::state::linker) fn new(linker_state: LinkerState) -> Self {
50        Self {
51            linker_state: Arc::new(RwLock::new(linker_state)),
52            topology_coordinator: TopologyCoordinator::new(),
53            dl_operation_pending: Arc::new(AtomicBool::new(false)),
54        }
55    }
56
57    /// Panics unless both DL buses have exactly one receiver — validates main-group bootstrap before
58    /// exclusive writes (see [`Self::bootstrap_exclusive_write_then`]).
59    fn assert_exactly_one_dl_bus_subscriber(ls: &LinkerState) {
60        let op_rx = ls.send_pending_operation.rx_count();
61        let barrier_rx = ls.send_pending_operation_barrier.rx_count();
62        if op_rx != 1 || barrier_rx != 1 {
63            panic!(
64                "wasix linker bootstrap invariant violated: expected exactly one DL bus subscriber \
65                 on each sender (pending_operation rx={op_rx}, barrier rx={barrier_rx}); \
66                 `LinkerShared::bootstrap_exclusive_write_then` must only run during main \
67                 `Linker::new` finalization before additional instance groups attach receivers"
68            );
69        }
70    }
71
72    /// Exclusive [`LinkerState`] write for main linker bootstrap only.
73    ///
74    /// # Safety
75    ///
76    /// Must run only while exactly one instance group has subscribed to both DL buses (verified
77    /// after the lock is taken — mismatch panics in release builds). Caller must respect instance-group /
78    /// linker lock ordering used in [`super::super::Linker::new`].
79    pub(in crate::state::linker) unsafe fn bootstrap_exclusive_write_then<R>(
80        &self,
81        f: impl FnOnce(&mut LinkerState) -> R,
82    ) -> R {
83        let mut guard = self.linker_state.write().unwrap();
84        Self::assert_exactly_one_dl_bus_subscriber(&guard);
85        f(&mut guard)
86    }
87
88    /// Non-blocking `try_write` on [`LinkerState`].
89    ///
90    /// Used sparingly where blocking would recurse into the linker (stub paths, best-effort
91    /// resolution). Prefer [`Self::write_linker_state`] for normal cooperative writes.
92    pub(in crate::state::linker) fn try_write_linker_state(
93        &self,
94    ) -> Result<
95        RwLockWriteGuard<'_, LinkerState>,
96        std::sync::TryLockError<RwLockWriteGuard<'_, LinkerState>>,
97    > {
98        self.linker_state.try_write()
99    }
100
101    /// Non-blocking `try_read` on [`LinkerState`].
102    pub(in crate::state::linker) fn try_read_linker_state(
103        &self,
104    ) -> Result<
105        RwLockReadGuard<'_, LinkerState>,
106        std::sync::TryLockError<RwLockReadGuard<'_, LinkerState>>,
107    > {
108        self.linker_state.try_read()
109    }
110
111    /// Locks [`LinkerState`] for write using repeated `try_write` plus cooperative draining of
112    /// pending dynamic-link replay and [`LinkerStateWriteBackoff`].
113    ///
114    /// Prefer this over naked [`RwLock::write`] / blocking `write()` from instance-group linker
115    /// paths: another OS thread might hold the write lock while follower groups rendezvous at a DL
116    /// barrier waiting for **this** thread to run [`Self::do_pending_link_operations_internal`].
117    pub(in crate::state::linker) fn write_linker_state(
118        &self,
119        group_state: &mut InstanceGroupState,
120        ctx: &mut FunctionEnvMut<'_, WasiEnv>,
121    ) -> Result<RwLockWriteGuard<'_, LinkerState>, LinkError> {
122        let mut linker_write_backoff = LinkerStateWriteBackoff::new();
123        loop {
124            match self.linker_state.try_write() {
125                Ok(guard) => return Ok(guard),
126                Err(std::sync::TryLockError::WouldBlock) => {
127                    linker_write_backoff.backoff();
128                    let env = ctx.as_ref();
129                    let mut store = ctx.as_store_mut();
130                    self.do_pending_link_operations_internal(group_state, &mut store, &env)?;
131                }
132                Err(std::sync::TryLockError::Poisoned(_)) => {
133                    panic!("The linker state's lock is poisoned");
134                }
135            }
136        }
137    }
138
139    /// [`TopologyCoordinator::try_acquire`] loop with [`LinkerStateWriteBackoff`] plus cooperative drains
140    /// of [`Self::do_pending_link_operations_internal`].
141    ///
142    /// **Lock ordering**: topology must be leased **before** taking [`LinkerState`] for write paths that
143    /// change replicated topology (spawn prepare, guarded loads, [`super::super::Linker::resolve_export`],
144    /// etc.).
145    ///
146    /// `prepare_for_instance_group` is the motivating case — the parent attaches no new subscribers until
147    /// the child finalizes while still holding this token handed across threads.
148    pub(in crate::state::linker) fn acquire_topology_token(
149        &self,
150        group_state: &mut InstanceGroupState,
151        store: &mut impl AsStoreMut,
152        env: &FunctionEnv<WasiEnv>,
153    ) -> Result<TopologyToken, LinkError> {
154        let mut backoff = LinkerStateWriteBackoff::new();
155        loop {
156            if let Some(t) = self.topology_coordinator.try_acquire() {
157                return Ok(t);
158            }
159            backoff.backoff();
160            self.do_pending_link_operations_internal(group_state, store, env)?;
161        }
162    }
163
164    /// Blocking [`RwLock`] write once a [`TopologyToken`] is already held (spawn finalization —
165    /// e.g. [`super::super::Linker::create_instance_group`]).
166    ///
167    /// Returns `(token, guard)` — drop the **`guard`** before **`token`** to avoid extending the write
168    /// critical section beyond topology decisions.
169    pub(in crate::state::linker) fn write_linker_state_blocking_holding_topology(
170        &self,
171        topology: TopologyToken,
172    ) -> (TopologyToken, RwLockWriteGuard<'_, LinkerState>) {
173        let linker_state_write_guard = self.linker_state.write().unwrap();
174        (topology, linker_state_write_guard)
175    }
176
177    /// Acquires topology (see [`Self::acquire_topology_token`]), then takes a blocking write lock via
178    /// [`Self::write_linker_state_blocking_holding_topology`].
179    ///
180    /// Use this for paths that mutate [`LinkerState`] under the topology coordinator’s single-writer
181    /// umbrella when the lease was **not** already taken elsewhere.
182    pub(in crate::state::linker) fn write_linker_state_with_topology(
183        &self,
184        group_state: &mut InstanceGroupState,
185        ctx: &mut FunctionEnvMut<'_, WasiEnv>,
186    ) -> Result<(TopologyToken, RwLockWriteGuard<'_, LinkerState>), LinkError> {
187        let env = ctx.as_ref();
188        let mut store = ctx.as_store_mut();
189        let token = self.acquire_topology_token(group_state, &mut store, &env)?;
190        Ok(self.write_linker_state_blocking_holding_topology(token))
191    }
192
193    /// Broadcasts [`DlOperation`] `op` to every instance-group receiver then waits for replay.
194    ///
195    /// Contracts:
196    ///
197    /// * `topology` must already belong to **this** instigating flow and was leased **before**
198    ///   exclusive access to buses / tables was acquired.
199    /// * `linker_state_write_lock` guards bus broadcast invariants (`try_broadcast` must succeed).
200    /// * Recoverable semantic failures are surfaced by callers; panics here are always fatal —
201    ///   bus capacity misuse or missed rendezvous implies we cannot reconcile groups.
202    /// * Drops `topology` when done (`num_groups <= 1`) or after the follower completion barrier.
203    pub(in crate::state::linker) fn synchronize_link_operation(
204        &self,
205        topology: TopologyToken,
206        op: DlOperation,
207        mut linker_state_write_lock: RwLockWriteGuard<'_, LinkerState>,
208        group_state: &mut InstanceGroupState,
209        wasi_process: &WasiProcess,
210        self_thread_id: WasiThreadId,
211    ) {
212        trace!(?op, "Synchronizing link operation");
213
214        let num_groups = linker_state_write_lock.send_pending_operation.rx_count();
215
216        if num_groups <= 1 {
217            trace!("No other living instance groups, nothing to do");
218            drop(topology);
219            return;
220        }
221
222        let barrier = Arc::new(Barrier::new(num_groups));
223        // Single-flight barrier envelope (bus depth is one intentionally).
224        if linker_state_write_lock
225            .send_pending_operation_barrier
226            .try_broadcast(barrier.clone())
227            .is_err()
228        {
229            panic!("Internal error: more than one synchronized link operation active")
230        }
231
232        // Wake followers so syscall paths re-enter cooperative DL helpers promptly.
233        self.dl_operation_pending.store(true, Ordering::SeqCst);
234
235        trace!("Signalling wasix threads to wake up");
236        for thread in wasi_process
237            .all_threads()
238            .into_iter()
239            .filter(|tid| *tid != self_thread_id)
240        {
241            wasi_process.signal_thread(&thread, wasmer_wasix_types::wasi::Signal::Sigwakeup);
242        }
243
244        trace!(%num_groups, "Waiting at barrier");
245        barrier.wait();
246
247        trace!("All threads now processing dl op");
248
249        // Everyone saw [`Self::dl_operation_pending_load`] and will drive `recv` paths.
250        self.dl_operation_pending.store(false, Ordering::SeqCst);
251
252        // Still under write lock: publish the replicated command before releasing exclusivity.
253        if linker_state_write_lock
254            .send_pending_operation
255            .try_broadcast(op.clone())
256            .is_err()
257        {
258            panic!("Internal error: more than one synchronized link operation active")
259        }
260
261        // Downgrade to shared read while followers apply (`apply_dl_operation`); no topology writer
262        // should race between the barrier epochs.
263        trace!("Unlocking linker state");
264        drop(linker_state_write_lock);
265        let linker_state_read_lock = self.linker_state.read().unwrap();
266
267        // Drain local bus copies — frees mailbox capacity before the follower epoch completes.
268        _ = group_state.recv_pending_operation_barrier.recv().unwrap();
269        _ = group_state.recv_pending_operation.recv().unwrap();
270
271        // Second rendezvous guarantees everyone finished before another writer can preempt read-only
272        // application (see linker `sync` module discussion).
273        trace!("Waiting for other threads to finish processing the dl op");
274        barrier.wait();
275
276        drop(linker_state_read_lock);
277        drop(topology);
278
279        trace!("Synchronization complete");
280    }
281
282    /// Peek at the cooperative-DL handshake flag `dl_operation_pending` with arbitrary memory
283    /// ordering.
284    ///
285    /// Prefer [`Ordering::SeqCst`] (`fast = false` in callers) whenever another thread waking from
286    /// `Sigwakeup` must reliably observe transitions; relaxed loads are intentionally lossy —
287    /// safe only when callers will retry promptly on their own syscall boundaries.
288    pub(in crate::state::linker) fn dl_operation_pending_load(&self, ordering: Ordering) -> bool {
289        self.dl_operation_pending.load(ordering)
290    }
291
292    /// Follow half of [`Self::synchronize_link_operation`] — participates in barriers, consumes the broadcast
293    /// [`DlOperation`], and applies `op` to `group_state` under [`LinkerState`] read access.
294    ///
295    /// Intended for callers that already skipped the idle fast path (cheap load of
296    /// `dl_operation_pending`) yet still need deterministic rendezvous semantics.
297    ///
298    /// # Panics
299    ///
300    /// Missing receivers / malformed bus state panic — those are irrecoverable and indicate we lost
301    /// synchronization with subscribers.
302    pub(in crate::state::linker) fn do_pending_link_operations_internal(
303        &self,
304        group_state: &mut InstanceGroupState,
305        store: &mut impl AsStoreMut,
306        env: &FunctionEnv<WasiEnv>,
307    ) -> Result<(), LinkError> {
308        if !self.dl_operation_pending.load(Ordering::SeqCst) {
309            return Ok(());
310        }
311
312        trace!("Pending link operation discovered, will process");
313
314        let barrier = group_state.recv_pending_operation_barrier.recv().expect(
315            "Failed to receive barrier while a DL operation was \
316            in progress; this condition can't be recovered from",
317        );
318        barrier.wait();
319
320        trace!("Past the barrier, now processing operation");
321
322        // Barrier epoch complete — instigator downgraded writer→reader earlier, so follower reads OK.
323        let op = group_state.recv_pending_operation.recv().unwrap();
324        let linker_state = self.linker_state.read().unwrap();
325
326        let result = group_state.apply_dl_operation(linker_state.deref(), op, store, env);
327
328        trace!("Operation applied, now waiting at second barrier");
329
330        // Rendezvous again so nobody leaves while others still mutate stores / tables concurrently.
331        barrier.wait();
332        drop(linker_state);
333
334        trace!("Pending link operation applied successfully");
335
336        result
337    }
338}