wasmer_wasix/syscalls/wasix/
thread_spawn.rs

1use super::*;
2#[cfg(feature = "journal")]
3use crate::journal::JournalEffector;
4use crate::{
5    WasiThreadHandle,
6    os::task::thread::WasiMemoryLayout,
7    runtime::{
8        TaintReason,
9        task_manager::{TaskWasm, TaskWasmRunProperties},
10    },
11    state::context_switching::ContextSwitchingEnvironment,
12    syscalls::*,
13};
14
15use wasmer::Memory;
16use wasmer_wasix_types::wasi::ThreadStart;
17
18/// ### `thread_spawn()`
19/// Creates a new thread by spawning that shares the same
20/// memory address space, file handles and main event loops.
21///
22/// ## Parameters
23///
24/// * `start_ptr` - Pointer to the structure that describes the thread to be launched
25/// * `ret_tid` - ID of the thread that was launched
26///
27/// ## Return
28///
29/// Returns the thread index of the newly created thread
30/// (indices always start from the same value as `pid` and increments in steps)
31#[instrument(level = "trace", skip_all, ret)]
32pub fn thread_spawn_v2<M: MemorySize>(
33    mut ctx: FunctionEnvMut<'_, WasiEnv>,
34    start_ptr: WasmPtr<ThreadStart<M>, M>,
35    ret_tid: WasmPtr<Tid, M>,
36) -> Result<Errno, WasiError> {
37    WasiEnv::do_pending_operations(&mut ctx)?;
38
39    // Create the thread
40    let tid = wasi_try_ok!(thread_spawn_internal_from_wasi(&mut ctx, start_ptr));
41
42    // Success
43    let memory = unsafe { ctx.data().memory_view(&ctx) };
44    wasi_try_mem_ok!(ret_tid.write(&memory, tid));
45
46    tracing::debug!(
47        tid,
48        from_tid = ctx.data().thread.id().raw(),
49        "spawned new thread"
50    );
51
52    Ok(Errno::Success)
53}
54
55pub fn thread_spawn_internal_from_wasi<M: MemorySize>(
56    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
57    start_ptr: WasmPtr<ThreadStart<M>, M>,
58) -> Result<Tid, Errno> {
59    // Now we use the environment and memory references
60    let env = ctx.data();
61    let memory = unsafe { env.memory_view(&ctx) };
62    let runtime = env.runtime.clone();
63    let tasks = env.tasks().clone();
64    let start_ptr_offset = start_ptr.offset();
65
66    // Read the properties about the stack which we will use for asyncify
67    let layout = {
68        let start: ThreadStart<M> = start_ptr.read(&memory).map_err(mem_error_to_wasi)?;
69        let stack_upper: u64 = start.stack_upper.into();
70        let stack_size: u64 = start.stack_size.into();
71        let guard_size: u64 = start.guard_size.into();
72        let tls_base: u64 = start.tls_base.into();
73        let stack_lower = stack_upper - stack_size;
74
75        WasiMemoryLayout {
76            stack_upper,
77            stack_lower,
78            guard_size,
79            stack_size,
80            tls_base: Some(tls_base),
81        }
82    };
83    tracing::trace!(
84        from_tid = env.thread.id().raw(),
85        "thread_spawn with layout {:?}",
86        layout
87    );
88
89    // Create the handle that represents this thread
90    let thread_start = ThreadStartType::ThreadSpawn {
91        start_ptr: start_ptr_offset.into(),
92    };
93    let mut thread_handle = match env.process.new_thread(layout.clone(), thread_start) {
94        Ok(h) => Arc::new(h),
95        Err(err) => {
96            error!(
97                stack_base = layout.stack_lower,
98                "failed to create thread handle",
99            );
100            // TODO: evaluate the appropriate error code, document it in the spec.
101            return Err(Errno::Access);
102        }
103    };
104    let thread_id: Tid = thread_handle.id().into();
105    Span::current().record("tid", thread_id);
106
107    // Spawn the thread
108    thread_spawn_internal_using_layout::<M>(ctx, thread_handle, layout, start_ptr_offset, None)?;
109
110    // Success
111    Ok(thread_id)
112}
113
114pub fn thread_spawn_internal_using_layout<M: MemorySize>(
115    ctx: &mut FunctionEnvMut<'_, WasiEnv>,
116    thread_handle: Arc<WasiThreadHandle>,
117    layout: WasiMemoryLayout,
118    start_ptr_offset: M::Offset,
119    rewind_state: Option<(RewindState, RewindResultType)>,
120) -> Result<(), Errno> {
121    // We extract the memory which will be passed to the thread
122    let func_env = ctx.as_ref();
123    let mut store = ctx.as_store_mut();
124    let env = func_env.as_ref(&store);
125    let tasks = env.tasks().clone();
126
127    let env_inner = env.inner();
128    let module_handles = env_inner.main_module_instance_handles();
129
130    let thread_memory = module_handles.memory_clone();
131    let linker = env_inner.linker().cloned();
132
133    // We capture some local variables
134    let state = env.state.clone();
135    let mut thread_env = env.clone();
136    thread_env.thread = thread_handle.as_thread();
137    thread_env.layout = layout;
138
139    // TODO: Currently asynchronous threading does not work with multi
140    //       threading in JS but it does work for the main thread. This will
141    //       require more work to find out why.
142    thread_env.enable_deep_sleep = if cfg!(feature = "js") {
143        false
144    } else {
145        unsafe { env.capable_of_deep_sleep() }
146    };
147
148    // This next function gets a context for the local thread and then
149    // calls into the process
150    let mut execute_module = {
151        let thread_handle = thread_handle;
152        move |ctx: WasiFunctionEnv, mut store: Store| {
153            // Call the thread
154            call_module::<M>(ctx, store, start_ptr_offset, thread_handle, rewind_state)
155        }
156    };
157
158    // If the process does not export a thread spawn function then obviously
159    // we can't spawn a background thread
160    if module_handles.thread_spawn.is_none() {
161        warn!("thread failed - the program does not export a `wasi_thread_start` function");
162        return Err(Errno::Notcapable);
163    }
164    let thread_module = module_handles.module_clone();
165    let spawn_type = match linker {
166        Some(linker) => {
167            let instance_group_data = linker.prepare_for_instance_group(ctx).map_err(|e| {
168                tracing::warn!("failed to prepare linker for thread spawn: {e}");
169                Errno::Notcapable
170            })?;
171            crate::runtime::SpawnType::NewLinkerInstanceGroup(instance_group_data)
172        }
173        None => crate::runtime::SpawnType::AttachMemory(
174            thread_memory.as_shared(&store).ok_or_else(|| {
175                tracing::warn!("Memory must be shared for thread spawning to work");
176                Errno::Memviolation
177            })?,
178        ),
179    };
180
181    // Now spawn a thread
182    trace!("threading: spawning background thread");
183    let run = move |props: TaskWasmRunProperties| {
184        execute_module(props.ctx, props.store);
185    };
186
187    let mut task_wasm = TaskWasm::new(Box::new(run), thread_env, thread_module, false, false)
188        .with_memory(spawn_type);
189
190    tasks.task_wasm(task_wasm).map_err(Into::<Errno>::into)?;
191
192    // Success
193    Ok(())
194}
195
196// This function calls into the module
197fn call_module_internal<M: MemorySize>(
198    ctx: &WasiFunctionEnv,
199    mut store: Store,
200    start_ptr_offset: M::Offset,
201) -> (Store, Result<Option<ExitCode>, DeepSleepWork>) {
202    // Note: we ensure both unwraps can happen before getting to this point
203    let spawn = ctx
204        .data(&store)
205        .inner()
206        .main_module_instance_handles()
207        .thread_spawn
208        .clone()
209        .unwrap();
210    let tid = ctx.data(&store).tid();
211
212    let spawn: Function = spawn.into();
213    let tid_i32 = tid.raw().try_into().map_err(|_| Errno::Overflow).unwrap();
214    let start_pointer_i32 = start_ptr_offset
215        .try_into()
216        .map_err(|_| Errno::Overflow)
217        .unwrap();
218    let (mut store, thread_result) = ContextSwitchingEnvironment::run_main_context(
219        ctx,
220        store,
221        spawn,
222        vec![Value::I32(tid_i32), Value::I32(start_pointer_i32)],
223    );
224    let thread_result = thread_result.map(|_| ());
225
226    trace!("callback finished (ret={:?})", thread_result);
227
228    let exit_code = match handle_thread_result(ctx, &mut store, thread_result) {
229        Ok(code) => code,
230        Err(deep_sleep) => return (store, Err(deep_sleep)),
231    };
232
233    (store, Ok(exit_code))
234}
235
236fn handle_thread_result(
237    env: &WasiFunctionEnv,
238    store: &mut Store,
239    err: Result<(), RuntimeError>,
240) -> Result<Option<ExitCode>, DeepSleepWork> {
241    let tid = env.data(&store).tid();
242    let pid = env.data(&store).pid();
243    let Err(err) = err else {
244        trace!("thread exited cleanly without calling thread_exit");
245        return Ok(None);
246    };
247    match err.downcast::<WasiError>() {
248        Ok(WasiError::ThreadExit) => {
249            trace!("thread exited cleanly");
250            Ok(None)
251        }
252        Ok(WasiError::Exit(code)) => {
253            trace!(exit_code = ?code, "thread requested exit");
254            if !code.is_success() {
255                // TODO: Why do we need to taint the runtime on a non-zero exit code? Why not also for zero?
256                env.data(&store)
257                    .runtime
258                    .on_taint(TaintReason::NonZeroExitCode(code));
259            };
260            Ok(Some(code))
261        }
262        Ok(WasiError::DeepSleep(deep)) => {
263            trace!("entered a deep sleep");
264            Err(deep)
265        }
266        Ok(WasiError::UnknownWasiVersion) => {
267            eprintln!(
268                "Thread {tid} of process {pid} failed because it has an unknown wasix version"
269            );
270            env.data(&store)
271                .runtime
272                .on_taint(TaintReason::UnknownWasiVersion);
273            Ok(Some(ExitCode::from(129)))
274        }
275        Ok(WasiError::DlSymbolResolutionFailed(symbol)) => {
276            eprintln!("Thread {tid} of process {pid} failed to find required symbol: {symbol}");
277            env.data(&store)
278                .runtime
279                .on_taint(TaintReason::DlSymbolResolutionFailed(symbol.clone()));
280            Ok(Some(ExitCode::from(129)))
281        }
282        Err(err) => {
283            if err.clone().to_trap() == Some(wasmer_types::TrapCode::HostInterrupt) {
284                debug!(%tid, %pid, error = %err, "thread interrupted by host");
285            } else {
286                eprintln!("Thread {tid} of process {pid} failed with runtime error: {err}");
287            }
288            env.data(&store)
289                .runtime
290                .on_taint(TaintReason::RuntimeError(err));
291            Ok(Some(ExitCode::from(129)))
292        }
293    }
294}
295
296/// Calls the module
297fn call_module<M: MemorySize>(
298    mut ctx: WasiFunctionEnv,
299    mut store: Store,
300    start_ptr_offset: M::Offset,
301    thread_handle: Arc<WasiThreadHandle>,
302    rewind_state: Option<(RewindState, RewindResultType)>,
303) {
304    let env = ctx.data(&store);
305    let tasks = env.tasks().clone();
306
307    // If we need to rewind then do so
308    if let Some((rewind_state, rewind_result)) = rewind_state {
309        let mut ctx = ctx.env.clone().into_mut(&mut store);
310        let res = rewind_ext::<M>(
311            &mut ctx,
312            Some(rewind_state.memory_stack),
313            rewind_state.rewind_stack,
314            rewind_state.store_data,
315            rewind_result,
316        );
317        if res != Errno::Success {
318            return;
319        }
320    }
321
322    // Now invoke the module
323    let (mut store, ret) = call_module_internal::<M>(&ctx, store, start_ptr_offset);
324
325    // If it went to deep sleep then we need to handle that
326    if let Err(deep) = ret {
327        // Create the callback that will be invoked when the thread respawns after a deep sleep
328        let rewind = deep.rewind;
329        let respawn = {
330            let tasks = tasks.clone();
331            move |ctx, store, trigger_res| {
332                // Call the thread
333                call_module::<M>(
334                    ctx,
335                    store,
336                    start_ptr_offset,
337                    thread_handle,
338                    Some((rewind, RewindResultType::RewindWithResult(trigger_res))),
339                );
340            }
341        };
342
343        /// Spawns the WASM process after a trigger
344        unsafe {
345            tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
346        };
347        return;
348    };
349
350    let exit_code = ret.unwrap_or_else(|_| unreachable!());
351    if let Some(exit_code) = exit_code {
352        ctx.on_exit(&mut store, Some(exit_code));
353        thread_handle.set_status_finished(Ok(exit_code));
354    } else {
355        ctx.on_exit(&mut store, None);
356        thread_handle.set_status_finished(Ok(Errno::Success.into()));
357    }
358
359    drop(thread_handle);
360}