wasmer_vm/interrupt_registry/
unix.rs

1#![cfg(unix)]
2
3use std::{
4    cell::UnsafeCell,
5    ffi::CStr,
6    sync::{
7        Arc, LazyLock,
8        atomic::{AtomicUsize, Ordering},
9    },
10};
11
12use dashmap::{DashMap, Entry};
13use wasmer_types::StoreId;
14
15use super::*;
16
17/// All necessary data for interrupting a store running WASM code
18/// on a thread.
19struct StoreInterruptState {
20    /// The pthread of the thread the store is running on, used to
21    /// send the interrupt signal. Note that multiple stores may
22    /// be executing WASM code within the same OS thread.
23    ///
24    /// We store this as a plain integer because `libc::pthread_t` is a raw
25    /// pointer on some Unix targets, which would make the global `DashMap`
26    /// fail its `Send` bounds even though we only treat the value as an opaque
27    /// thread identifier.
28    pthread: usize,
29    /// Whether this store was interrupted.
30    interrupted: bool,
31    /// See comments in [`ThreadInterruptState`].
32    thread_current_signal_target_store: Arc<AtomicUsize>,
33}
34
35/// Thread-related state; only **PARTS** of this struct are safe to access
36/// from within the interrupt handler.
37struct ThreadInterruptState {
38    /// We need to maintain a stack of active stores per thread, hence the vec.
39    /// This should not be touched by the interrupt handler.
40    active_stores: Vec<StoreId>,
41
42    /// Always stores the top entry from `active_stores`. Needed since a vec is not
43    /// safe to access from signal handlers.
44    current_active_store: AtomicUsize,
45
46    /// Shared state between the thread requesting the interrupt
47    /// and the thread running the store's code. The thread
48    /// requesting the interrupt writes the ID of the store it
49    /// wants to interrupt to this atomic. The interrupted
50    /// thread later checks this value (through its own clone
51    /// of the Arc in [`ThreadInterruptState`]) against the currently
52    /// running store, and traps only if they match, recording the
53    /// interrupt otherwise.
54    /// Note that mutexes are not safe for use within signal
55    /// handlers; only atomics can be safely used.
56    current_signal_target_store: Arc<AtomicUsize>,
57}
58
59/// HashMap of all store states, accessible from all threads
60static STORE_INTERRUPT_STATE: LazyLock<DashMap<StoreId, StoreInterruptState>> =
61    LazyLock::new(Default::default);
62
63thread_local! {
64    /// Thread-local thread state. The book-keeping in a RefCell isn't
65    /// guaranteed to be signal-handler-safe, so we use an UnsafeCell
66    /// instead. The cell is only accessed in leaf functions, so it
67    /// should be safe.
68    /// The *only* actually unsafe access happens if a signal comes in
69    /// while another function is modifying the cell; In this case,
70    /// [`should_interrupt_now`] will return junk results. This is
71    /// still safe because:
72    ///   * `should_interrupt_now` only atomically accesses data from this cell
73    ///   * junk results shouldn't matter if we're not running WASM code
74    static THREAD_INTERRUPT_STATE: UnsafeCell<ThreadInterruptState> =
75        UnsafeCell::new(ThreadInterruptState {
76            active_stores: vec![],
77            current_active_store: AtomicUsize::new(0),
78            current_signal_target_store: Arc::new(AtomicUsize::new(0)),
79        });
80}
81
82/// Install interrupt state for the given store. Note that this function
83/// may be called more than once, and correctly maintains a stack of
84/// stores for which the state is installed.
85pub fn install(store_id: StoreId) -> Result<InterruptInstallGuard, InstallError> {
86    let store_state = STORE_INTERRUPT_STATE.entry(store_id).or_insert_with(|| {
87        let thread_current_signal_target_store = THREAD_INTERRUPT_STATE.with(|t| {
88            // Safety: See comments on THREAD_INTERRUPT_STATE.
89            unsafe { t.get().as_mut().unwrap() }
90                .current_signal_target_store
91                .clone()
92        });
93
94        // TODO: isn't there a way to get this without reaching for libc APIs?
95        // Since stores can't be sent across threads once they start executing code,
96        // we don't need to update this value for recursive calls.
97        #[allow(trivial_numeric_casts)]
98        let pthread = unsafe { libc::pthread_self() as usize };
99
100        StoreInterruptState {
101            pthread,
102            interrupted: false,
103            thread_current_signal_target_store,
104        }
105    });
106
107    if store_state.interrupted {
108        return Err(InstallError::AlreadyInterrupted);
109    }
110
111    THREAD_INTERRUPT_STATE.with(|t| {
112        // Safety: See comments on THREAD_INTERRUPT_STATE.
113        let borrow = unsafe { t.get().as_mut().unwrap() };
114        borrow.active_stores.push(store_id);
115        borrow
116            .current_active_store
117            .store(store_id.as_raw().get(), Ordering::Release);
118    });
119
120    Ok(InterruptInstallGuard { store_id })
121}
122
123pub(super) fn uninstall(store_id: StoreId) {
124    let Entry::Occupied(store_state_entry) = STORE_INTERRUPT_STATE.entry(store_id) else {
125        panic!("Internal error: interrupt state not installed for store");
126    };
127
128    let has_more_installations = THREAD_INTERRUPT_STATE.with(|t| {
129        // Safety: See comments on THREAD_INTERRUPT_STATE.
130        let borrow = unsafe { t.get().as_mut().unwrap() };
131        match borrow.active_stores.pop_if(|x| *x == store_id) {
132            Some(_) => {
133                borrow.current_active_store.store(
134                    borrow
135                        .active_stores
136                        .last()
137                        .map(|x| x.as_raw().get())
138                        .unwrap_or(0),
139                    Ordering::Release,
140                );
141                borrow.active_stores.contains(&store_id)
142            }
143            None => panic!("InterruptInstallGuard dropped out of order"),
144        }
145    });
146
147    // If this store is still active at some other point within the
148    // thread, we should keep its state around. Otherwise, it should
149    // be deleted from the global interrupt state. Note that this will
150    // also reset the `interrupted` flag, allowing the store to be used
151    // for further function calls.
152    if !has_more_installations {
153        store_state_entry.remove();
154    }
155}
156
157/// Interrupt the store with the given ID. Best effort is made to ensure
158/// interrupts are handled. However, there is no guarantee; under rare
159/// circumstances, it is possible for the interrupt to be missed. One such
160/// case is when the target thread is about to call WASM code but has not
161/// yet made the call.
162///
163/// To make sure the code is interrupted, the target thread should notify
164/// the signalling thread that it has finished running in some way, and
165/// the signalling thread must wait for that notification and retry the
166/// interrupt if the notification is not received after some time.
167pub fn interrupt(store_id: StoreId) -> Result<(), InterruptError> {
168    let Entry::Occupied(mut store_state) = STORE_INTERRUPT_STATE.entry(store_id) else {
169        return Err(InterruptError::StoreNotRunning);
170    };
171    let store_state = store_state.get_mut();
172
173    if let Err(_) = store_state
174        .thread_current_signal_target_store
175        .compare_exchange(
176            0,
177            store_id.as_raw().get(),
178            Ordering::SeqCst,
179            Ordering::SeqCst,
180        )
181    {
182        return Err(InterruptError::OtherInterruptInProgress);
183    }
184
185    store_state.interrupted = true;
186
187    unsafe {
188        #[allow(trivial_numeric_casts)]
189        let errno = libc::pthread_kill(store_state.pthread as libc::pthread_t, libc::SIGUSR1);
190        if errno != 0 {
191            let error_str = CStr::from_ptr(libc::strerror(errno)).to_str().unwrap();
192            return Err(InterruptError::FailedToSendSignal(error_str));
193        }
194    }
195
196    Ok(())
197}
198
199/// Called from within the signal handler to decide whether we should interrupt
200/// the currently running WASM code. This function *MAY* return junk results in
201/// case a signal comes in during an install or uninstall operation. However,
202/// in such cases, there is no WASM code running, and the result will be ignored
203/// by the signal handler anyway.
204pub(crate) fn on_interrupted() -> bool {
205    THREAD_INTERRUPT_STATE.with(|t| {
206        // Safety: See comments on THREAD_INTERRUPT_STATE.
207        let state = unsafe { t.get().as_ref().unwrap() };
208
209        let current_active_store = state.current_active_store.load(Ordering::Acquire);
210
211        let current_signal_target_store = state.current_signal_target_store.load(Ordering::Acquire);
212        assert_ne!(
213            current_signal_target_store, 0,
214            "current_signal_target_store should be set before signalling the WASM thread"
215        );
216        if let Err(_) = state.current_signal_target_store.compare_exchange(
217            current_signal_target_store,
218            0,
219            Ordering::SeqCst,
220            Ordering::SeqCst,
221        ) {
222            unreachable!("current_signal_target_store isn't changed unless it's zero");
223        }
224
225        current_active_store == current_signal_target_store
226    })
227}
228
229/// Returns true if the store with the given ID has already been interrupted.
230pub fn is_interrupted(store_id: StoreId) -> bool {
231    let Entry::Occupied(store_state_entry) = STORE_INTERRUPT_STATE.entry(store_id) else {
232        return false;
233    };
234    store_state_entry.get().interrupted
235}