wasmer_vm/interrupt_registry/
unix.rs

1use std::{
2    cell::UnsafeCell,
3    ffi::CStr,
4    sync::{
5        Arc, LazyLock,
6        atomic::{AtomicUsize, Ordering},
7    },
8};
9
10use dashmap::{DashMap, Entry};
11use wasmer_types::StoreId;
12
13use super::*;
14
15/// All necessary data for interrupting a store running WASM code
16/// on a thread.
17struct StoreInterruptState {
18    /// The pthread of the thread the store is running on, used to
19    /// send the interrupt signal. Note that multiple stores may
20    /// be executing WASM code within the same OS thread.
21    ///
22    /// We store this as a plain integer because `libc::pthread_t` is a raw
23    /// pointer on some Unix targets, which would make the global `DashMap`
24    /// fail its `Send` bounds even though we only treat the value as an opaque
25    /// thread identifier.
26    pthread: usize,
27    /// Whether this store was interrupted.
28    interrupted: bool,
29    /// See comments in [`ThreadInterruptState`].
30    thread_current_signal_target_store: Arc<AtomicUsize>,
31}
32
33/// Thread-related state; only **PARTS** of this struct are safe to access
34/// from within the interrupt handler.
35struct ThreadInterruptState {
36    /// We need to maintain a stack of active stores per thread, hence the vec.
37    /// This should not be touched by the interrupt handler.
38    active_stores: Vec<StoreId>,
39
40    /// Always stores the top entry from `active_stores`. Needed since a vec is not
41    /// safe to access from signal handlers.
42    current_active_store: AtomicUsize,
43
44    /// Shared state between the thread requesting the interrupt
45    /// and the thread running the store's code. The thread
46    /// requesting the interrupt writes the ID of the store it
47    /// wants to interrupt to this atomic. The interrupted
48    /// thread later checks this value (through its own clone
49    /// of the Arc in [`ThreadInterruptState`]) against the currently
50    /// running store, and traps only if they match, recording the
51    /// interrupt otherwise.
52    /// Note that mutexes are not safe for use within signal
53    /// handlers; only atomics can be safely used.
54    current_signal_target_store: Arc<AtomicUsize>,
55}
56
57/// HashMap of all store states, accessible from all threads
58static STORE_INTERRUPT_STATE: LazyLock<DashMap<StoreId, StoreInterruptState>> =
59    LazyLock::new(Default::default);
60
61thread_local! {
62    /// Thread-local thread state. The book-keeping in a RefCell isn't
63    /// guaranteed to be signal-handler-safe, so we use an UnsafeCell
64    /// instead. The cell is only accessed in leaf functions, so it
65    /// should be safe.
66    /// The *only* actually unsafe access happens if a signal comes in
67    /// while another function is modifying the cell; In this case,
68    /// [`should_interrupt_now`] will return junk results. This is
69    /// still safe because:
70    ///   * `should_interrupt_now` only atomically accesses data from this cell
71    ///   * junk results shouldn't matter if we're not running WASM code
72    static THREAD_INTERRUPT_STATE: UnsafeCell<ThreadInterruptState> =
73        UnsafeCell::new(ThreadInterruptState {
74            active_stores: vec![],
75            current_active_store: AtomicUsize::new(0),
76            current_signal_target_store: Arc::new(AtomicUsize::new(0)),
77        });
78}
79
80/// Install interrupt state for the given store. Note that this function
81/// may be called more than once, and correctly maintains a stack of
82/// stores for which the state is installed.
83pub fn install(store_id: StoreId) -> Result<InterruptInstallGuard, InstallError> {
84    let store_state = STORE_INTERRUPT_STATE.entry(store_id).or_insert_with(|| {
85        let thread_current_signal_target_store = THREAD_INTERRUPT_STATE.with(|t| {
86            // Safety: See comments on THREAD_INTERRUPT_STATE.
87            unsafe { t.get().as_mut().unwrap() }
88                .current_signal_target_store
89                .clone()
90        });
91
92        // TODO: isn't there a way to get this without reaching for libc APIs?
93        // Since stores can't be sent across threads once they start executing code,
94        // we don't need to update this value for recursive calls.
95        #[allow(trivial_numeric_casts)]
96        let pthread = unsafe { libc::pthread_self() as usize };
97
98        StoreInterruptState {
99            pthread,
100            interrupted: false,
101            thread_current_signal_target_store,
102        }
103    });
104
105    if store_state.interrupted {
106        return Err(InstallError::AlreadyInterrupted);
107    }
108
109    THREAD_INTERRUPT_STATE.with(|t| {
110        // Safety: See comments on THREAD_INTERRUPT_STATE.
111        let borrow = unsafe { t.get().as_mut().unwrap() };
112        borrow.active_stores.push(store_id);
113        borrow
114            .current_active_store
115            .store(store_id.as_raw().get(), Ordering::Release);
116    });
117
118    Ok(InterruptInstallGuard { store_id })
119}
120
121pub(super) fn uninstall(store_id: StoreId) {
122    let Entry::Occupied(store_state_entry) = STORE_INTERRUPT_STATE.entry(store_id) else {
123        panic!("Internal error: interrupt state not installed for store");
124    };
125
126    let has_more_installations = THREAD_INTERRUPT_STATE.with(|t| {
127        // Safety: See comments on THREAD_INTERRUPT_STATE.
128        let borrow = unsafe { t.get().as_mut().unwrap() };
129        match borrow.active_stores.pop_if(|x| *x == store_id) {
130            Some(_) => {
131                borrow.current_active_store.store(
132                    borrow.active_stores.last().map_or(0, |x| x.as_raw().get()),
133                    Ordering::Release,
134                );
135                borrow.active_stores.contains(&store_id)
136            }
137            None => panic!("InterruptInstallGuard dropped out of order"),
138        }
139    });
140
141    // If this store is still active at some other point within the
142    // thread, we should keep its state around. Otherwise, it should
143    // be deleted from the global interrupt state. Note that this will
144    // also reset the `interrupted` flag, allowing the store to be used
145    // for further function calls.
146    if !has_more_installations {
147        store_state_entry.remove();
148    }
149}
150
151/// Interrupt the store with the given ID. Best effort is made to ensure
152/// interrupts are handled. However, there is no guarantee; under rare
153/// circumstances, it is possible for the interrupt to be missed. One such
154/// case is when the target thread is about to call WASM code but has not
155/// yet made the call.
156///
157/// To make sure the code is interrupted, the target thread should notify
158/// the signalling thread that it has finished running in some way, and
159/// the signalling thread must wait for that notification and retry the
160/// interrupt if the notification is not received after some time.
161pub fn interrupt(store_id: StoreId) -> Result<(), InterruptError> {
162    let Entry::Occupied(mut store_state) = STORE_INTERRUPT_STATE.entry(store_id) else {
163        return Err(InterruptError::StoreNotRunning);
164    };
165    let store_state = store_state.get_mut();
166
167    if store_state
168        .thread_current_signal_target_store
169        .compare_exchange(
170            0,
171            store_id.as_raw().get(),
172            Ordering::SeqCst,
173            Ordering::SeqCst,
174        )
175        .is_err()
176    {
177        return Err(InterruptError::OtherInterruptInProgress);
178    }
179
180    store_state.interrupted = true;
181
182    unsafe {
183        #[allow(trivial_numeric_casts)]
184        let errno = libc::pthread_kill(store_state.pthread as libc::pthread_t, libc::SIGUSR1);
185        if errno != 0 {
186            let error_str = CStr::from_ptr(libc::strerror(errno)).to_str().unwrap();
187            return Err(InterruptError::FailedToSendSignal(error_str));
188        }
189    }
190
191    Ok(())
192}
193
194/// Called from within the signal handler to decide whether we should interrupt
195/// the currently running WASM code. This function *MAY* return junk results in
196/// case a signal comes in during an install or uninstall operation. However,
197/// in such cases, there is no WASM code running, and the result will be ignored
198/// by the signal handler anyway.
199pub(crate) fn on_interrupted() -> bool {
200    THREAD_INTERRUPT_STATE.with(|t| {
201        // Safety: See comments on THREAD_INTERRUPT_STATE.
202        let state = unsafe { t.get().as_ref().unwrap() };
203
204        let current_active_store = state.current_active_store.load(Ordering::Acquire);
205
206        let current_signal_target_store = state.current_signal_target_store.load(Ordering::Acquire);
207        assert_ne!(
208            current_signal_target_store, 0,
209            "current_signal_target_store should be set before signalling the WASM thread"
210        );
211        if state
212            .current_signal_target_store
213            .compare_exchange(
214                current_signal_target_store,
215                0,
216                Ordering::SeqCst,
217                Ordering::SeqCst,
218            )
219            .is_err()
220        {
221            unreachable!("current_signal_target_store isn't changed unless it's zero");
222        }
223
224        current_active_store == current_signal_target_store
225    })
226}
227
228/// Returns true if the store with the given ID has already been interrupted.
229pub fn is_interrupted(store_id: StoreId) -> bool {
230    let Entry::Occupied(store_state_entry) = STORE_INTERRUPT_STATE.entry(store_id) else {
231        return false;
232    };
233    store_state_entry.get().interrupted
234}