wasmer_wasix/syscalls/wasix/
proc_join.rs1use std::task::Waker;
2
3use serde::{Deserialize, Serialize};
4use wasmer::FromToNativeWasmType;
5use wasmer_wasix_types::wasi::{JoinFlags, JoinStatus, JoinStatusType, JoinStatusUnion, OptionPid};
6
7use super::*;
8use crate::{WasiProcess, syscalls::*};
9
10#[derive(Serialize, Deserialize)]
11enum JoinStatusResult {
12 Nothing,
13 ExitNormal(WasiProcessId, ExitCode),
14 Err(Errno),
15}
16
17pub fn proc_join<M: MemorySize + 'static>(
25 mut ctx: FunctionEnvMut<'_, WasiEnv>,
26 pid_ptr: WasmPtr<OptionPid, M>,
27 flags: JoinFlags,
28 status_ptr: WasmPtr<JoinStatus, M>,
29) -> Result<Errno, WasiError> {
30 WasiEnv::do_pending_operations(&mut ctx)?;
31
32 proc_join_internal(ctx, pid_ptr, flags, status_ptr)
33}
34
35pub(super) fn proc_join_internal<M: MemorySize + 'static>(
36 mut ctx: FunctionEnvMut<'_, WasiEnv>,
37 pid_ptr: WasmPtr<OptionPid, M>,
38 flags: JoinFlags,
39 status_ptr: WasmPtr<JoinStatus, M>,
40) -> Result<Errno, WasiError> {
41 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
42
43 let ret_result = {
46 move |ctx: FunctionEnvMut<'_, WasiEnv>, status: JoinStatusResult| {
47 let mut ret = Errno::Success;
48
49 let view = unsafe { ctx.data().memory_view(&ctx) };
50 let status = match status {
51 JoinStatusResult::Nothing => JoinStatus {
52 tag: JoinStatusType::Nothing,
53 u: JoinStatusUnion { nothing: 0 },
54 },
55 JoinStatusResult::ExitNormal(pid, exit_code) => {
56 let option_pid = OptionPid {
57 tag: OptionTag::Some,
58 pid: pid.raw() as Pid,
59 };
60 pid_ptr.write(&view, option_pid).ok();
61
62 JoinStatus {
63 tag: JoinStatusType::ExitNormal,
64 u: JoinStatusUnion {
65 exit_normal: exit_code.into(),
66 },
67 }
68 }
69 JoinStatusResult::Err(err) => {
70 ret = err;
71 JoinStatus {
72 tag: JoinStatusType::Nothing,
73 u: JoinStatusUnion { nothing: 0 },
74 }
75 }
76 };
77 wasi_try_mem_ok!(status_ptr.write(&view, status));
78 Ok(ret)
79 }
80 };
81
82 if let Some(status) = unsafe { handle_rewind::<M, _>(&mut ctx) } {
85 let ret = ret_result(ctx, status);
86 tracing::trace!("rewound join ret={:?}", ret);
87 return ret;
88 }
89
90 let env = ctx.data();
91 let memory = unsafe { env.memory_view(&ctx) };
92 let option_pid = wasi_try_mem_ok!(pid_ptr.read(&memory));
93 let option_pid = match option_pid.tag {
94 OptionTag::None => None,
95 OptionTag::Some => Some(option_pid.pid),
96 };
97 tracing::trace!("filter_pid = {:?}", option_pid);
98
99 wasi_try_mem_ok!(pid_ptr.write(
101 &memory,
102 OptionPid {
103 tag: OptionTag::None,
104 pid: 0,
105 }
106 ));
107 wasi_try_mem_ok!(status_ptr.write(
108 &memory,
109 JoinStatus {
110 tag: JoinStatusType::Nothing,
111 u: JoinStatusUnion { nothing: 0 },
112 }
113 ));
114
115 let pid = match option_pid {
117 None => {
118 let mut process = ctx.data_mut().process.clone();
119
120 let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
123 let child_exit = process.join_any_child().await;
124 match child_exit {
125 Ok(Some((pid, exit_code))) => {
126 tracing::trace!(%pid, %exit_code, "triggered child join");
127 trace!(ret_id = pid.raw(), exit_code = exit_code.raw());
128 JoinStatusResult::ExitNormal(pid, exit_code)
129 }
130 Ok(None) => {
131 tracing::trace!("triggered child join (no child)");
132 JoinStatusResult::Err(Errno::Child)
133 }
134 Err(err) => {
135 tracing::trace!(%err, "error triggered on child join");
136 JoinStatusResult::Err(err)
137 }
138 }
139 })?;
140 return match res {
141 AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
142 AsyncifyAction::Unwind => Ok(Errno::Success),
143 };
144 }
145 Some(pid) => pid,
146 };
147
148 let pid: WasiProcessId = pid.into();
150
151 let mut process = {
154 let mut inner = ctx.data().process.lock();
155 let process = inner
156 .children
157 .iter()
158 .filter(|c| c.pid == pid)
159 .map(Clone::clone)
160 .next();
161 inner.children.retain(|c| c.pid != pid);
162 process
163 };
164
165 if process.is_none() {
168 process = ctx.data().control_plane.get_process(pid);
169 }
170
171 if let Some(process) = process {
172 wasi_try_mem_ok!(pid_ptr.write(
174 &memory,
175 OptionPid {
176 tag: OptionTag::Some,
177 pid: pid.raw(),
178 }
179 ));
180
181 if flags.contains(JoinFlags::NON_BLOCKING) {
182 if let Some(status) = process.try_join() {
183 let exit_code = status.unwrap_or_else(|_| Errno::Child.into());
184 ret_result(ctx, JoinStatusResult::ExitNormal(pid, exit_code))
185 } else {
186 ret_result(ctx, JoinStatusResult::Nothing)
187 }
188 } else {
189 let process2 = process.clone();
191 let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
192 let exit_code = process.join().await.unwrap_or_else(|_| Errno::Child.into());
193 tracing::trace!(%exit_code, "triggered child join");
194 JoinStatusResult::ExitNormal(pid, exit_code)
195 })?;
196 match res {
197 AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
198 AsyncifyAction::Unwind => Ok(Errno::Success),
199 }
200 }
201 } else {
202 trace!(ret_id = pid.raw(), "status=nothing");
203 ret_result(ctx, JoinStatusResult::Nothing)
204 }
205}