wasmer_wasix/syscalls/wasix/
thread_spawn.rs1use super::*;
2#[cfg(feature = "journal")]
3use crate::journal::JournalEffector;
4use crate::{
5 WasiThreadHandle,
6 os::task::thread::WasiMemoryLayout,
7 runtime::{
8 TaintReason,
9 task_manager::{TaskWasm, TaskWasmRunProperties},
10 },
11 state::context_switching::ContextSwitchingEnvironment,
12 syscalls::*,
13};
14
15use wasmer::Memory;
16use wasmer_wasix_types::wasi::ThreadStart;
17
18#[instrument(level = "trace", skip_all, ret)]
32pub fn thread_spawn_v2<M: MemorySize>(
33 mut ctx: FunctionEnvMut<'_, WasiEnv>,
34 start_ptr: WasmPtr<ThreadStart<M>, M>,
35 ret_tid: WasmPtr<Tid, M>,
36) -> Result<Errno, WasiError> {
37 WasiEnv::do_pending_operations(&mut ctx)?;
38
39 let tid = wasi_try_ok!(thread_spawn_internal_from_wasi(&mut ctx, start_ptr));
41
42 let memory = unsafe { ctx.data().memory_view(&ctx) };
44 wasi_try_mem_ok!(ret_tid.write(&memory, tid));
45
46 tracing::debug!(
47 tid,
48 from_tid = ctx.data().thread.id().raw(),
49 "spawned new thread"
50 );
51
52 Ok(Errno::Success)
53}
54
55pub fn thread_spawn_internal_from_wasi<M: MemorySize>(
56 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
57 start_ptr: WasmPtr<ThreadStart<M>, M>,
58) -> Result<Tid, Errno> {
59 let env = ctx.data();
61 let memory = unsafe { env.memory_view(&ctx) };
62 let runtime = env.runtime.clone();
63 let tasks = env.tasks().clone();
64 let start_ptr_offset = start_ptr.offset();
65
66 let layout = {
68 let start: ThreadStart<M> = start_ptr.read(&memory).map_err(mem_error_to_wasi)?;
69 let stack_upper: u64 = start.stack_upper.into();
70 let stack_size: u64 = start.stack_size.into();
71 let guard_size: u64 = start.guard_size.into();
72 let tls_base: u64 = start.tls_base.into();
73 let stack_lower = stack_upper - stack_size;
74
75 WasiMemoryLayout {
76 stack_upper,
77 stack_lower,
78 guard_size,
79 stack_size,
80 tls_base: Some(tls_base),
81 }
82 };
83 tracing::trace!(
84 from_tid = env.thread.id().raw(),
85 "thread_spawn with layout {:?}",
86 layout
87 );
88
89 let thread_start = ThreadStartType::ThreadSpawn {
91 start_ptr: start_ptr_offset.into(),
92 };
93 let mut thread_handle = match env.process.new_thread(layout.clone(), thread_start) {
94 Ok(h) => Arc::new(h),
95 Err(err) => {
96 error!(
97 stack_base = layout.stack_lower,
98 "failed to create thread handle",
99 );
100 return Err(Errno::Access);
102 }
103 };
104 let thread_id: Tid = thread_handle.id().into();
105 Span::current().record("tid", thread_id);
106
107 thread_spawn_internal_using_layout::<M>(ctx, thread_handle, layout, start_ptr_offset, None)?;
109
110 Ok(thread_id)
112}
113
114pub fn thread_spawn_internal_using_layout<M: MemorySize>(
115 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
116 thread_handle: Arc<WasiThreadHandle>,
117 layout: WasiMemoryLayout,
118 start_ptr_offset: M::Offset,
119 rewind_state: Option<(RewindState, RewindResultType)>,
120) -> Result<(), Errno> {
121 let func_env = ctx.as_ref();
123 let mut store = ctx.as_store_mut();
124 let env = func_env.as_ref(&store);
125 let tasks = env.tasks().clone();
126
127 let env_inner = env.inner();
128 let module_handles = env_inner.main_module_instance_handles();
129
130 let thread_memory = module_handles.memory_clone();
131 let linker = env_inner.linker().cloned();
132
133 let state = env.state.clone();
135 let mut thread_env = env.clone();
136 thread_env.thread = thread_handle.as_thread();
137 thread_env.layout = layout;
138
139 thread_env.enable_deep_sleep = if cfg!(feature = "js") {
143 false
144 } else {
145 unsafe { env.capable_of_deep_sleep() }
146 };
147
148 let mut execute_module = {
151 let thread_handle = thread_handle;
152 move |ctx: WasiFunctionEnv, mut store: Store| {
153 call_module::<M>(ctx, store, start_ptr_offset, thread_handle, rewind_state)
155 }
156 };
157
158 if module_handles.thread_spawn.is_none() {
161 warn!("thread failed - the program does not export a `wasi_thread_start` function");
162 return Err(Errno::Notcapable);
163 }
164 let thread_module = module_handles.module_clone();
165 let spawn_type = match linker {
166 Some(linker) => {
167 let instance_group_data = linker.prepare_for_instance_group(ctx).map_err(|e| {
168 tracing::warn!("failed to prepare linker for thread spawn: {e}");
169 Errno::Notcapable
170 })?;
171 crate::runtime::SpawnType::NewLinkerInstanceGroup(instance_group_data)
172 }
173 None => crate::runtime::SpawnType::AttachMemory(
174 thread_memory.as_shared(&store).ok_or_else(|| {
175 tracing::warn!("Memory must be shared for thread spawning to work");
176 Errno::Memviolation
177 })?,
178 ),
179 };
180
181 trace!("threading: spawning background thread");
183 let run = move |props: TaskWasmRunProperties| {
184 execute_module(props.ctx, props.store);
185 };
186
187 let mut task_wasm = TaskWasm::new(Box::new(run), thread_env, thread_module, false, false)
188 .with_memory(spawn_type);
189
190 tasks.task_wasm(task_wasm).map_err(Into::<Errno>::into)?;
191
192 Ok(())
194}
195
196fn call_module_internal<M: MemorySize>(
198 ctx: &WasiFunctionEnv,
199 mut store: Store,
200 start_ptr_offset: M::Offset,
201) -> (Store, Result<Option<ExitCode>, DeepSleepWork>) {
202 let spawn = ctx
204 .data(&store)
205 .inner()
206 .main_module_instance_handles()
207 .thread_spawn
208 .clone()
209 .unwrap();
210 let tid = ctx.data(&store).tid();
211
212 let spawn: Function = spawn.into();
213 let tid_i32 = tid.raw().try_into().map_err(|_| Errno::Overflow).unwrap();
214 let start_pointer_i32 = start_ptr_offset
215 .try_into()
216 .map_err(|_| Errno::Overflow)
217 .unwrap();
218 let (mut store, thread_result) = ContextSwitchingEnvironment::run_main_context(
219 ctx,
220 store,
221 spawn,
222 vec![Value::I32(tid_i32), Value::I32(start_pointer_i32)],
223 );
224 let thread_result = thread_result.map(|_| ());
225
226 trace!("callback finished (ret={:?})", thread_result);
227
228 let exit_code = match handle_thread_result(ctx, &mut store, thread_result) {
229 Ok(code) => code,
230 Err(deep_sleep) => return (store, Err(deep_sleep)),
231 };
232
233 (store, Ok(exit_code))
234}
235
236fn handle_thread_result(
237 env: &WasiFunctionEnv,
238 store: &mut Store,
239 err: Result<(), RuntimeError>,
240) -> Result<Option<ExitCode>, DeepSleepWork> {
241 let tid = env.data(&store).tid();
242 let pid = env.data(&store).pid();
243 let Err(err) = err else {
244 trace!("thread exited cleanly without calling thread_exit");
245 return Ok(None);
246 };
247 match err.downcast::<WasiError>() {
248 Ok(WasiError::ThreadExit) => {
249 trace!("thread exited cleanly");
250 Ok(None)
251 }
252 Ok(WasiError::Exit(code)) => {
253 trace!(exit_code = ?code, "thread requested exit");
254 if !code.is_success() {
255 env.data(&store)
257 .runtime
258 .on_taint(TaintReason::NonZeroExitCode(code));
259 };
260 Ok(Some(code))
261 }
262 Ok(WasiError::DeepSleep(deep)) => {
263 trace!("entered a deep sleep");
264 Err(deep)
265 }
266 Ok(WasiError::UnknownWasiVersion) => {
267 eprintln!(
268 "Thread {tid} of process {pid} failed because it has an unknown wasix version"
269 );
270 env.data(&store)
271 .runtime
272 .on_taint(TaintReason::UnknownWasiVersion);
273 Ok(Some(ExitCode::from(129)))
274 }
275 Ok(WasiError::DlSymbolResolutionFailed(symbol)) => {
276 eprintln!("Thread {tid} of process {pid} failed to find required symbol: {symbol}");
277 env.data(&store)
278 .runtime
279 .on_taint(TaintReason::DlSymbolResolutionFailed(symbol.clone()));
280 Ok(Some(ExitCode::from(129)))
281 }
282 Err(err) => {
283 if err.clone().to_trap() == Some(wasmer_types::TrapCode::HostInterrupt) {
284 debug!(%tid, %pid, error = %err, "thread interrupted by host");
285 } else {
286 eprintln!("Thread {tid} of process {pid} failed with runtime error: {err}");
287 }
288 env.data(&store)
289 .runtime
290 .on_taint(TaintReason::RuntimeError(err));
291 Ok(Some(ExitCode::from(129)))
292 }
293 }
294}
295
296fn call_module<M: MemorySize>(
298 mut ctx: WasiFunctionEnv,
299 mut store: Store,
300 start_ptr_offset: M::Offset,
301 thread_handle: Arc<WasiThreadHandle>,
302 rewind_state: Option<(RewindState, RewindResultType)>,
303) {
304 let env = ctx.data(&store);
305 let tasks = env.tasks().clone();
306
307 if let Some((rewind_state, rewind_result)) = rewind_state {
309 let mut ctx = ctx.env.clone().into_mut(&mut store);
310 let res = rewind_ext::<M>(
311 &mut ctx,
312 Some(rewind_state.memory_stack),
313 rewind_state.rewind_stack,
314 rewind_state.store_data,
315 rewind_result,
316 );
317 if res != Errno::Success {
318 return;
319 }
320 }
321
322 let (mut store, ret) = call_module_internal::<M>(&ctx, store, start_ptr_offset);
324
325 if let Err(deep) = ret {
327 let rewind = deep.rewind;
329 let respawn = {
330 let tasks = tasks.clone();
331 move |ctx, store, trigger_res| {
332 call_module::<M>(
334 ctx,
335 store,
336 start_ptr_offset,
337 thread_handle,
338 Some((rewind, RewindResultType::RewindWithResult(trigger_res))),
339 );
340 }
341 };
342
343 unsafe {
345 tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
346 };
347 return;
348 };
349
350 let exit_code = ret.unwrap_or_else(|_| unreachable!());
351 if let Some(exit_code) = exit_code {
352 ctx.on_exit(&mut store, Some(exit_code));
353 thread_handle.set_status_finished(Ok(exit_code));
354 } else {
355 ctx.on_exit(&mut store, None);
356 thread_handle.set_status_finished(Ok(Errno::Success.into()));
357 }
358
359 drop(thread_handle);
360}