wasmer_wasix/syscalls/wasix/
thread_spawn.rs1use std::f32::consts::E;
2
3use super::*;
4#[cfg(feature = "journal")]
5use crate::journal::JournalEffector;
6use crate::{
7 WasiThreadHandle,
8 os::task::thread::WasiMemoryLayout,
9 runtime::{
10 TaintReason,
11 task_manager::{TaskWasm, TaskWasmRunProperties},
12 },
13 state::context_switching::ContextSwitchingEnvironment,
14 syscalls::*,
15};
16
17use wasmer::Memory;
18use wasmer_wasix_types::wasi::ThreadStart;
19
20#[instrument(level = "trace", skip_all, ret)]
34pub fn thread_spawn_v2<M: MemorySize>(
35 mut ctx: FunctionEnvMut<'_, WasiEnv>,
36 start_ptr: WasmPtr<ThreadStart<M>, M>,
37 ret_tid: WasmPtr<Tid, M>,
38) -> Result<Errno, WasiError> {
39 WasiEnv::do_pending_operations(&mut ctx)?;
40
41 let tid = wasi_try_ok!(thread_spawn_internal_from_wasi(&mut ctx, start_ptr));
43
44 let memory = unsafe { ctx.data().memory_view(&ctx) };
46 wasi_try_mem_ok!(ret_tid.write(&memory, tid));
47
48 tracing::debug!(
49 tid,
50 from_tid = ctx.data().thread.id().raw(),
51 "spawned new thread"
52 );
53
54 Ok(Errno::Success)
55}
56
57pub fn thread_spawn_internal_from_wasi<M: MemorySize>(
58 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
59 start_ptr: WasmPtr<ThreadStart<M>, M>,
60) -> Result<Tid, Errno> {
61 let env = ctx.data();
63 let memory = unsafe { env.memory_view(&ctx) };
64 let runtime = env.runtime.clone();
65 let tasks = env.tasks().clone();
66 let start_ptr_offset = start_ptr.offset();
67
68 let layout = {
70 let start: ThreadStart<M> = start_ptr.read(&memory).map_err(mem_error_to_wasi)?;
71 let stack_upper: u64 = start.stack_upper.into();
72 let stack_size: u64 = start.stack_size.into();
73 let guard_size: u64 = start.guard_size.into();
74 let tls_base: u64 = start.tls_base.into();
75 let stack_lower = stack_upper - stack_size;
76
77 WasiMemoryLayout {
78 stack_upper,
79 stack_lower,
80 guard_size,
81 stack_size,
82 tls_base: Some(tls_base),
83 }
84 };
85 tracing::trace!(
86 from_tid = env.thread.id().raw(),
87 "thread_spawn with layout {:?}",
88 layout
89 );
90
91 let thread_start = ThreadStartType::ThreadSpawn {
93 start_ptr: start_ptr_offset.into(),
94 };
95 let mut thread_handle = match env.process.new_thread(layout.clone(), thread_start) {
96 Ok(h) => Arc::new(h),
97 Err(err) => {
98 error!(
99 stack_base = layout.stack_lower,
100 "failed to create thread handle",
101 );
102 return Err(Errno::Access);
104 }
105 };
106 let thread_id: Tid = thread_handle.id().into();
107 Span::current().record("tid", thread_id);
108
109 thread_spawn_internal_using_layout::<M>(ctx, thread_handle, layout, start_ptr_offset, None)?;
111
112 Ok(thread_id)
114}
115
116pub fn thread_spawn_internal_using_layout<M: MemorySize>(
117 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
118 thread_handle: Arc<WasiThreadHandle>,
119 layout: WasiMemoryLayout,
120 start_ptr_offset: M::Offset,
121 rewind_state: Option<(RewindState, RewindResultType)>,
122) -> Result<(), Errno> {
123 let func_env = ctx.as_ref();
125 let mut store = ctx.as_store_mut();
126 let env = func_env.as_ref(&store);
127 let tasks = env.tasks().clone();
128
129 let env_inner = env.inner();
130 let module_handles = env_inner.main_module_instance_handles();
131
132 let thread_memory = module_handles.memory_clone();
133 let linker = env_inner.linker().cloned();
134
135 let state = env.state.clone();
137 let mut thread_env = env.clone();
138 thread_env.thread = thread_handle.as_thread();
139 thread_env.layout = layout;
140
141 thread_env.enable_deep_sleep = if cfg!(feature = "js") {
145 false
146 } else {
147 unsafe { env.capable_of_deep_sleep() }
148 };
149
150 let mut execute_module = {
153 let thread_handle = thread_handle;
154 move |ctx: WasiFunctionEnv, mut store: Store| {
155 call_module::<M>(ctx, store, start_ptr_offset, thread_handle, rewind_state)
157 }
158 };
159
160 if module_handles.thread_spawn.is_none() {
163 warn!("thread failed - the program does not export a `wasi_thread_start` function");
164 return Err(Errno::Notcapable);
165 }
166 let thread_module = module_handles.module_clone();
167 let spawn_type = match linker {
168 Some(linker) => crate::runtime::SpawnType::NewLinkerInstanceGroup(linker, func_env, store),
169 None => crate::runtime::SpawnType::ShareMemory(thread_memory, store.as_store_ref()),
170 };
171
172 trace!("threading: spawning background thread");
174 let run = move |props: TaskWasmRunProperties| {
175 execute_module(props.ctx, props.store);
176 };
177
178 let mut task_wasm = TaskWasm::new(Box::new(run), thread_env, thread_module, false, false)
179 .with_memory(spawn_type);
180
181 tasks.task_wasm(task_wasm).map_err(Into::<Errno>::into)?;
182
183 Ok(())
185}
186
187fn call_module_internal<M: MemorySize>(
189 ctx: &WasiFunctionEnv,
190 mut store: Store,
191 start_ptr_offset: M::Offset,
192) -> (Store, Result<Option<ExitCode>, DeepSleepWork>) {
193 let spawn = ctx
195 .data(&store)
196 .inner()
197 .main_module_instance_handles()
198 .thread_spawn
199 .clone()
200 .unwrap();
201 let tid = ctx.data(&store).tid();
202
203 let spawn: Function = spawn.into();
204 let tid_i32 = tid.raw().try_into().map_err(|_| Errno::Overflow).unwrap();
205 let start_pointer_i32 = start_ptr_offset
206 .try_into()
207 .map_err(|_| Errno::Overflow)
208 .unwrap();
209 let (mut store, thread_result) = ContextSwitchingEnvironment::run_main_context(
210 ctx,
211 store,
212 spawn,
213 vec![Value::I32(tid_i32), Value::I32(start_pointer_i32)],
214 );
215 let thread_result = thread_result.map(|_| ());
216
217 trace!("callback finished (ret={:?})", thread_result);
218
219 let exit_code = match handle_thread_result(ctx, &mut store, thread_result) {
220 Ok(code) => code,
221 Err(deep_sleep) => return (store, Err(deep_sleep)),
222 };
223
224 (store, Ok(exit_code))
225}
226
227fn handle_thread_result(
228 env: &WasiFunctionEnv,
229 store: &mut Store,
230 err: Result<(), RuntimeError>,
231) -> Result<Option<ExitCode>, DeepSleepWork> {
232 let tid = env.data(&store).tid();
233 let pid = env.data(&store).pid();
234 let Err(err) = err else {
235 trace!("thread exited cleanly without calling thread_exit");
236 return Ok(None);
237 };
238 match err.downcast::<WasiError>() {
239 Ok(WasiError::ThreadExit) => {
240 trace!("thread exited cleanly");
241 Ok(None)
242 }
243 Ok(WasiError::Exit(code)) => {
244 trace!(exit_code = ?code, "thread requested exit");
245 if !code.is_success() {
246 env.data(&store)
248 .runtime
249 .on_taint(TaintReason::NonZeroExitCode(code));
250 };
251 Ok(Some(code))
252 }
253 Ok(WasiError::DeepSleep(deep)) => {
254 trace!("entered a deep sleep");
255 Err(deep)
256 }
257 Ok(WasiError::UnknownWasiVersion) => {
258 eprintln!(
259 "Thread {tid} of process {pid} failed because it has an unknown wasix version"
260 );
261 env.data(&store)
262 .runtime
263 .on_taint(TaintReason::UnknownWasiVersion);
264 Ok(Some(ExitCode::from(129)))
265 }
266 Ok(WasiError::DlSymbolResolutionFailed(symbol)) => {
267 eprintln!("Thread {tid} of process {pid} failed to find required symbol: {symbol}");
268 env.data(&store)
269 .runtime
270 .on_taint(TaintReason::DlSymbolResolutionFailed(symbol.clone()));
271 Ok(Some(ExitCode::from(129)))
272 }
273 Err(err) => {
274 eprintln!("Thread {tid} of process {pid} failed with runtime error: {err}");
275 env.data(&store)
276 .runtime
277 .on_taint(TaintReason::RuntimeError(err));
278 Ok(Some(ExitCode::from(129)))
279 }
280 }
281}
282
283fn call_module<M: MemorySize>(
285 mut ctx: WasiFunctionEnv,
286 mut store: Store,
287 start_ptr_offset: M::Offset,
288 thread_handle: Arc<WasiThreadHandle>,
289 rewind_state: Option<(RewindState, RewindResultType)>,
290) {
291 let env = ctx.data(&store);
292 let tasks = env.tasks().clone();
293
294 if let Some((rewind_state, rewind_result)) = rewind_state {
296 let mut ctx = ctx.env.clone().into_mut(&mut store);
297 let res = rewind_ext::<M>(
298 &mut ctx,
299 Some(rewind_state.memory_stack),
300 rewind_state.rewind_stack,
301 rewind_state.store_data,
302 rewind_result,
303 );
304 if res != Errno::Success {
305 return;
306 }
307 }
308
309 let (store, ret) = call_module_internal::<M>(&ctx, store, start_ptr_offset);
311
312 if let Err(deep) = ret {
314 let rewind = deep.rewind;
316 let respawn = {
317 let tasks = tasks.clone();
318 move |ctx, store, trigger_res| {
319 call_module::<M>(
321 ctx,
322 store,
323 start_ptr_offset,
324 thread_handle,
325 Some((rewind, RewindResultType::RewindWithResult(trigger_res))),
326 );
327 }
328 };
329
330 unsafe {
332 tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
333 };
334 return;
335 };
336
337 let exit_code = ret.unwrap_or_else(|_| unreachable!());
338 if let Some(exit_code) = exit_code {
339 ctx.data(&store).blocking_on_exit(Some(exit_code));
340 thread_handle.set_status_finished(Ok(exit_code));
341 } else {
342 thread_handle.set_status_finished(Ok(Errno::Success.into()));
343 }
344
345 drop(thread_handle);
346}