wasmer_wasix/syscalls/wasix/
thread_spawn.rsuse std::f32::consts::E;
use super::*;
#[cfg(feature = "journal")]
use crate::journal::JournalEffector;
use crate::{
os::task::thread::WasiMemoryLayout,
runtime::{
task_manager::{TaskWasm, TaskWasmRunProperties},
TaintReason,
},
syscalls::*,
WasiThreadHandle,
};
use wasmer::Memory;
use wasmer_wasix_types::wasi::ThreadStart;
#[instrument(level = "trace", skip_all, ret)]
pub fn thread_spawn_v2<M: MemorySize>(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
start_ptr: WasmPtr<ThreadStart<M>, M>,
ret_tid: WasmPtr<Tid, M>,
) -> Result<Errno, WasiError> {
WasiEnv::do_pending_operations(&mut ctx)?;
let tid = wasi_try_ok!(thread_spawn_internal_from_wasi(&mut ctx, start_ptr));
let memory = unsafe { ctx.data().memory_view(&ctx) };
wasi_try_mem_ok!(ret_tid.write(&memory, tid));
tracing::debug!(
tid,
from_tid = ctx.data().thread.id().raw(),
"spawned new thread"
);
Ok(Errno::Success)
}
pub fn thread_spawn_internal_from_wasi<M: MemorySize>(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
start_ptr: WasmPtr<ThreadStart<M>, M>,
) -> Result<Tid, Errno> {
let env = ctx.data();
let memory = unsafe { env.memory_view(&ctx) };
let runtime = env.runtime.clone();
let tasks = env.tasks().clone();
let start_ptr_offset = start_ptr.offset();
let layout = {
let start: ThreadStart<M> = start_ptr.read(&memory).map_err(mem_error_to_wasi)?;
let stack_upper: u64 = start.stack_upper.into();
let stack_size: u64 = start.stack_size.into();
let guard_size: u64 = start.guard_size.into();
let tls_base: u64 = start.tls_base.into();
let stack_lower = stack_upper - stack_size;
WasiMemoryLayout {
stack_upper,
stack_lower,
guard_size,
stack_size,
tls_base: Some(tls_base),
}
};
tracing::trace!(
from_tid = env.thread.id().raw(),
"thread_spawn with layout {:?}",
layout
);
let thread_start = ThreadStartType::ThreadSpawn {
start_ptr: start_ptr_offset.into(),
};
let mut thread_handle = match env.process.new_thread(layout.clone(), thread_start) {
Ok(h) => Arc::new(h),
Err(err) => {
error!(
stack_base = layout.stack_lower,
"failed to create thread handle",
);
return Err(Errno::Access);
}
};
let thread_id: Tid = thread_handle.id().into();
Span::current().record("tid", thread_id);
thread_spawn_internal_using_layout::<M>(ctx, thread_handle, layout, start_ptr_offset, None)?;
Ok(thread_id)
}
pub fn thread_spawn_internal_using_layout<M: MemorySize>(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
thread_handle: Arc<WasiThreadHandle>,
layout: WasiMemoryLayout,
start_ptr_offset: M::Offset,
rewind_state: Option<(RewindState, RewindResultType)>,
) -> Result<(), Errno> {
let func_env = ctx.as_ref();
let mut store = ctx.as_store_mut();
let env = func_env.as_ref(&store);
let tasks = env.tasks().clone();
let env_inner = env.inner();
let module_handles = env_inner.main_module_instance_handles();
let thread_memory = module_handles.memory_clone();
let linker = env_inner.linker().cloned();
let state = env.state.clone();
let mut thread_env = env.clone();
thread_env.thread = thread_handle.as_thread();
thread_env.layout = layout;
thread_env.enable_deep_sleep = if cfg!(feature = "js") {
false
} else {
unsafe { env.capable_of_deep_sleep() }
};
let mut execute_module = {
let thread_handle = thread_handle;
move |ctx: WasiFunctionEnv, mut store: Store| {
call_module::<M>(ctx, store, start_ptr_offset, thread_handle, rewind_state)
}
};
if module_handles.thread_spawn.is_none() {
warn!("thread failed - the program does not export a `wasi_thread_start` function");
return Err(Errno::Notcapable);
}
let thread_module = module_handles.module_clone();
let spawn_type = match linker {
Some(linker) => crate::runtime::SpawnType::NewLinkerInstanceGroup(linker, func_env, store),
None => crate::runtime::SpawnType::ShareMemory(thread_memory, store.as_store_ref()),
};
trace!("threading: spawning background thread");
let run = move |props: TaskWasmRunProperties| {
execute_module(props.ctx, props.store);
};
let mut task_wasm = TaskWasm::new(Box::new(run), thread_env, thread_module, false, false)
.with_memory(spawn_type);
tasks.task_wasm(task_wasm).map_err(Into::<Errno>::into)?;
Ok(())
}
fn call_module_internal<M: MemorySize>(
env: &WasiFunctionEnv,
store: &mut Store,
start_ptr_offset: M::Offset,
) -> Result<(), DeepSleepWork> {
let spawn = env
.data(&store)
.inner()
.main_module_instance_handles()
.thread_spawn
.clone()
.unwrap();
let tid = env.data(&store).tid();
let thread_result = spawn.call(
store,
tid.raw().try_into().map_err(|_| Errno::Overflow).unwrap(),
start_ptr_offset
.try_into()
.map_err(|_| Errno::Overflow)
.unwrap(),
);
trace!("callback finished (ret={:?})", thread_result);
let exit_code = handle_thread_result(env, store, thread_result)?;
env.on_exit(store, exit_code);
Ok(())
}
fn handle_thread_result(
env: &WasiFunctionEnv,
store: &mut Store,
err: Result<(), RuntimeError>,
) -> Result<Option<ExitCode>, DeepSleepWork> {
let tid = env.data(&store).tid();
let pid = env.data(&store).pid();
let Err(err) = err else {
trace!("thread exited cleanly without calling thread_exit");
return Ok(None);
};
match err.downcast::<WasiError>() {
Ok(WasiError::ThreadExit) => {
trace!("thread exited cleanly");
Ok(None)
}
Ok(WasiError::Exit(code)) => {
trace!(exit_code = ?code, "thread requested exit");
if !code.is_success() {
env.data(&store)
.runtime
.on_taint(TaintReason::NonZeroExitCode(code));
};
Ok(Some(code))
}
Ok(WasiError::DeepSleep(deep)) => {
trace!("entered a deep sleep");
Err(deep)
}
Ok(WasiError::UnknownWasiVersion) => {
eprintln!(
"Thread {tid} of process {pid} failed because it has an unknown wasix version"
);
env.data(&store)
.runtime
.on_taint(TaintReason::UnknownWasiVersion);
Ok(Some(ExitCode::from(129)))
}
Ok(WasiError::DlSymbolResolutionFailed(symbol)) => {
eprintln!("Thread {tid} of process {pid} failed to find required symbol: {symbol}");
env.data(&store)
.runtime
.on_taint(TaintReason::DlSymbolResolutionFailed(symbol.clone()));
Ok(Some(ExitCode::from(129)))
}
Err(err) => {
eprintln!("Thread {tid} of process {pid} failed with runtime error: {err}");
env.data(&store)
.runtime
.on_taint(TaintReason::RuntimeError(err));
Ok(Some(ExitCode::from(129)))
}
}
}
fn call_module<M: MemorySize>(
mut ctx: WasiFunctionEnv,
mut store: Store,
start_ptr_offset: M::Offset,
thread_handle: Arc<WasiThreadHandle>,
rewind_state: Option<(RewindState, RewindResultType)>,
) {
let env = ctx.data(&store);
let tasks = env.tasks().clone();
if let Some((rewind_state, rewind_result)) = rewind_state {
let mut ctx = ctx.env.clone().into_mut(&mut store);
let res = rewind_ext::<M>(
&mut ctx,
Some(rewind_state.memory_stack),
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
);
if res != Errno::Success {
return;
}
}
let ret = call_module_internal::<M>(&ctx, &mut store, start_ptr_offset);
if let Err(deep) = ret {
let rewind = deep.rewind;
let respawn = {
let tasks = tasks.clone();
move |ctx, store, trigger_res| {
call_module::<M>(
ctx,
store,
start_ptr_offset,
thread_handle,
Some((rewind, RewindResultType::RewindWithResult(trigger_res))),
);
}
};
unsafe {
tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
};
return;
};
drop(thread_handle);
}