wasmer_wasix/syscalls/wasix/
thread_spawn.rs1use std::f32::consts::E;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::journal::JournalEffector;
6use crate::{
7 WasiThreadHandle,
8 os::task::thread::WasiMemoryLayout,
9 runtime::{
10 TaintReason,
11 task_manager::{TaskWasm, TaskWasmRunProperties},
12 },
13 state::context_switching::ContextSwitchingEnvironment,
14 syscalls::*,
15};
16
17use wasmer::Memory;
18use wasmer_wasix_types::wasi::ThreadStart;
19
20#[instrument(level = "trace", skip_all, ret)]
34pub fn thread_spawn_v2<M: MemorySize>(
35 mut ctx: FunctionEnvMut<'_, WasiEnv>,
36 start_ptr: WasmPtr<ThreadStart<M>, M>,
37 ret_tid: WasmPtr<Tid, M>,
38) -> Result<Errno, WasiError> {
39 WasiEnv::do_pending_operations(&mut ctx)?;
40
41 let tid = wasi_try_ok!(thread_spawn_internal_from_wasi(&mut ctx, start_ptr));
43
44 let memory = unsafe { ctx.data().memory_view(&ctx) };
46 wasi_try_mem_ok!(ret_tid.write(&memory, tid));
47
48 tracing::debug!(
49 tid,
50 from_tid = ctx.data().thread.id().raw(),
51 "spawned new thread"
52 );
53
54 Ok(Errno::Success)
55}
56
57pub fn thread_spawn_internal_from_wasi<M: MemorySize>(
58 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
59 start_ptr: WasmPtr<ThreadStart<M>, M>,
60) -> Result<Tid, Errno> {
61 let env = ctx.data();
63 let memory = unsafe { env.memory_view(&ctx) };
64 let runtime = env.runtime.clone();
65 let tasks = env.tasks().clone();
66 let start_ptr_offset = start_ptr.offset();
67
68 let layout = {
70 let start: ThreadStart<M> = start_ptr.read(&memory).map_err(mem_error_to_wasi)?;
71 let stack_upper: u64 = start.stack_upper.into();
72 let stack_size: u64 = start.stack_size.into();
73 let guard_size: u64 = start.guard_size.into();
74 let tls_base: u64 = start.tls_base.into();
75 let stack_lower = stack_upper - stack_size;
76
77 WasiMemoryLayout {
78 stack_upper,
79 stack_lower,
80 guard_size,
81 stack_size,
82 tls_base: Some(tls_base),
83 }
84 };
85 tracing::trace!(
86 from_tid = env.thread.id().raw(),
87 "thread_spawn with layout {:?}",
88 layout
89 );
90
91 let thread_start = ThreadStartType::ThreadSpawn {
93 start_ptr: start_ptr_offset.into(),
94 };
95 let mut thread_handle = match env.process.new_thread(layout.clone(), thread_start) {
96 Ok(h) => Arc::new(h),
97 Err(err) => {
98 error!(
99 stack_base = layout.stack_lower,
100 "failed to create thread handle",
101 );
102 return Err(Errno::Access);
104 }
105 };
106 let thread_id: Tid = thread_handle.id().into();
107 Span::current().record("tid", thread_id);
108
109 thread_spawn_internal_using_layout::<M>(ctx, thread_handle, layout, start_ptr_offset, None)?;
111
112 Ok(thread_id)
114}
115
116pub fn thread_spawn_internal_using_layout<M: MemorySize>(
117 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
118 thread_handle: Arc<WasiThreadHandle>,
119 layout: WasiMemoryLayout,
120 start_ptr_offset: M::Offset,
121 rewind_state: Option<(RewindState, RewindResultType)>,
122) -> Result<(), Errno> {
123 let func_env = ctx.as_ref();
125 let mut store = ctx.as_store_mut();
126 let env = func_env.as_ref(&store);
127 let tasks = env.tasks().clone();
128
129 let env_inner = env.inner();
130 let module_handles = env_inner.main_module_instance_handles();
131
132 let thread_memory = module_handles.memory_clone();
133 let linker = env_inner.linker().cloned();
134
135 let state = env.state.clone();
137 let mut thread_env = env.clone();
138 thread_env.thread = thread_handle.as_thread();
139 thread_env.layout = layout;
140
141 thread_env.enable_deep_sleep = if cfg!(feature = "js") {
145 false
146 } else {
147 unsafe { env.capable_of_deep_sleep() }
148 };
149
150 let mut execute_module = {
153 let thread_handle = thread_handle;
154 move |ctx: WasiFunctionEnv, mut store: Store| {
155 call_module::<M>(ctx, store, start_ptr_offset, thread_handle, rewind_state)
157 }
158 };
159
160 if module_handles.thread_spawn.is_none() {
163 warn!("thread failed - the program does not export a `wasi_thread_start` function");
164 return Err(Errno::Notcapable);
165 }
166 let thread_module = module_handles.module_clone();
167 let spawn_type = match linker {
168 Some(linker) => crate::runtime::SpawnType::NewLinkerInstanceGroup(linker, func_env, store),
169 None => crate::runtime::SpawnType::ShareMemory(thread_memory, store.as_store_ref()),
170 };
171
172 trace!("threading: spawning background thread");
174 let run = move |props: TaskWasmRunProperties| {
175 execute_module(props.ctx, props.store);
176 };
177
178 let mut task_wasm = TaskWasm::new(Box::new(run), thread_env, thread_module, false, false)
179 .with_memory(spawn_type);
180
181 tasks.task_wasm(task_wasm).map_err(Into::<Errno>::into)?;
182
183 Ok(())
185}
186
187fn call_module_internal<M: MemorySize>(
189 ctx: &WasiFunctionEnv,
190 mut store: Store,
191 start_ptr_offset: M::Offset,
192) -> (Store, Result<(), DeepSleepWork>) {
193 let spawn = ctx
195 .data(&store)
196 .inner()
197 .main_module_instance_handles()
198 .thread_spawn
199 .clone()
200 .unwrap();
201 let tid = ctx.data(&store).tid();
202
203 let spawn: Function = spawn.into();
204 let tid_i32 = tid.raw().try_into().map_err(|_| Errno::Overflow).unwrap();
205 let start_pointer_i32 = start_ptr_offset
206 .try_into()
207 .map_err(|_| Errno::Overflow)
208 .unwrap();
209 let (mut store, thread_result) = ContextSwitchingEnvironment::run_main_context(
210 ctx,
211 store,
212 spawn,
213 vec![Value::I32(tid_i32), Value::I32(start_pointer_i32)],
214 );
215 let thread_result = thread_result.map(|_| ());
216
217 trace!("callback finished (ret={:?})", thread_result);
218
219 let exit_code = match handle_thread_result(ctx, &mut store, thread_result) {
220 Ok(code) => code,
221 Err(deep_sleep) => return (store, Err(deep_sleep)),
222 };
223
224 ctx.on_exit(&mut store, exit_code);
226 (store, Ok(()))
227}
228
229fn handle_thread_result(
230 env: &WasiFunctionEnv,
231 store: &mut Store,
232 err: Result<(), RuntimeError>,
233) -> Result<Option<ExitCode>, DeepSleepWork> {
234 let tid = env.data(&store).tid();
235 let pid = env.data(&store).pid();
236 let Err(err) = err else {
237 trace!("thread exited cleanly without calling thread_exit");
238 return Ok(None);
239 };
240 match err.downcast::<WasiError>() {
241 Ok(WasiError::ThreadExit) => {
242 trace!("thread exited cleanly");
243 Ok(None)
244 }
245 Ok(WasiError::Exit(code)) => {
246 trace!(exit_code = ?code, "thread requested exit");
247 if !code.is_success() {
248 env.data(&store)
250 .runtime
251 .on_taint(TaintReason::NonZeroExitCode(code));
252 };
253 Ok(Some(code))
254 }
255 Ok(WasiError::DeepSleep(deep)) => {
256 trace!("entered a deep sleep");
257 Err(deep)
258 }
259 Ok(WasiError::UnknownWasiVersion) => {
260 eprintln!(
261 "Thread {tid} of process {pid} failed because it has an unknown wasix version"
262 );
263 env.data(&store)
264 .runtime
265 .on_taint(TaintReason::UnknownWasiVersion);
266 Ok(Some(ExitCode::from(129)))
267 }
268 Ok(WasiError::DlSymbolResolutionFailed(symbol)) => {
269 eprintln!("Thread {tid} of process {pid} failed to find required symbol: {symbol}");
270 env.data(&store)
271 .runtime
272 .on_taint(TaintReason::DlSymbolResolutionFailed(symbol.clone()));
273 Ok(Some(ExitCode::from(129)))
274 }
275 Err(err) => {
276 eprintln!("Thread {tid} of process {pid} failed with runtime error: {err}");
277 env.data(&store)
278 .runtime
279 .on_taint(TaintReason::RuntimeError(err));
280 Ok(Some(ExitCode::from(129)))
281 }
282 }
283}
284
285fn call_module<M: MemorySize>(
287 mut ctx: WasiFunctionEnv,
288 mut store: Store,
289 start_ptr_offset: M::Offset,
290 thread_handle: Arc<WasiThreadHandle>,
291 rewind_state: Option<(RewindState, RewindResultType)>,
292) {
293 let env = ctx.data(&store);
294 let tasks = env.tasks().clone();
295
296 if let Some((rewind_state, rewind_result)) = rewind_state {
298 let mut ctx = ctx.env.clone().into_mut(&mut store);
299 let res = rewind_ext::<M>(
300 &mut ctx,
301 Some(rewind_state.memory_stack),
302 rewind_state.rewind_stack,
303 rewind_state.store_data,
304 rewind_result,
305 );
306 if res != Errno::Success {
307 return;
308 }
309 }
310
311 let (store, ret) = call_module_internal::<M>(&ctx, store, start_ptr_offset);
313
314 if let Err(deep) = ret {
316 let rewind = deep.rewind;
318 let respawn = {
319 let tasks = tasks.clone();
320 move |ctx, store, trigger_res| {
321 call_module::<M>(
323 ctx,
324 store,
325 start_ptr_offset,
326 thread_handle,
327 Some((rewind, RewindResultType::RewindWithResult(trigger_res))),
328 );
329 }
330 };
331
332 unsafe {
334 tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
335 };
336 return;
337 };
338 drop(thread_handle);
340}