wasmer_wasix/syscalls/wasix/
proc_join.rs1use std::task::Waker;
2
3use serde::{Deserialize, Serialize};
4use wasmer::FromToNativeWasmType;
5use wasmer_wasix_types::wasi::{JoinFlags, JoinStatus, JoinStatusType, JoinStatusUnion, OptionPid};
6
7use super::*;
8use crate::{WasiProcess, syscalls::*};
9
10#[derive(Serialize, Deserialize)]
11enum JoinStatusResult {
12 Nothing,
13 ExitNormal(WasiProcessId, ExitCode),
14 Err(Errno),
15}
16
17pub fn proc_join<M: MemorySize + 'static>(
25 mut ctx: FunctionEnvMut<'_, WasiEnv>,
26 pid_ptr: WasmPtr<OptionPid, M>,
27 flags: JoinFlags,
28 status_ptr: WasmPtr<JoinStatus, M>,
29) -> Result<Errno, WasiError> {
30 WasiEnv::do_pending_operations(&mut ctx)?;
31
32 proc_join_internal(ctx, pid_ptr, flags, status_ptr)
33}
34
35pub(super) fn proc_join_internal<M: MemorySize + 'static>(
36 mut ctx: FunctionEnvMut<'_, WasiEnv>,
37 pid_ptr: WasmPtr<OptionPid, M>,
38 flags: JoinFlags,
39 status_ptr: WasmPtr<JoinStatus, M>,
40) -> Result<Errno, WasiError> {
41 ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
42
43 let ret_result = {
46 move |ctx: FunctionEnvMut<'_, WasiEnv>, status: JoinStatusResult| {
47 let mut ret = Errno::Success;
48
49 let view = unsafe { ctx.data().memory_view(&ctx) };
50 let status = match status {
51 JoinStatusResult::Nothing => JoinStatus {
52 tag: JoinStatusType::Nothing,
53 u: JoinStatusUnion { nothing: 0 },
54 },
55 JoinStatusResult::ExitNormal(pid, exit_code) => {
56 let option_pid = OptionPid {
57 tag: OptionTag::Some,
58 pid: pid.raw() as Pid,
59 };
60 pid_ptr.write(&view, option_pid).ok();
61
62 JoinStatus {
63 tag: JoinStatusType::ExitNormal,
64 u: JoinStatusUnion {
65 exit_normal: exit_code.into(),
66 },
67 }
68 }
69 JoinStatusResult::Err(err) => {
70 ret = err;
71 JoinStatus {
72 tag: JoinStatusType::Nothing,
73 u: JoinStatusUnion { nothing: 0 },
74 }
75 }
76 };
77 wasi_try_mem_ok!(status_ptr.write(&view, status));
78 Ok(ret)
79 }
80 };
81
82 if let Some(status) = unsafe { handle_rewind::<M, _>(&mut ctx) } {
85 let ret = ret_result(ctx, status);
86 tracing::trace!("rewound join ret={:?}", ret);
87 return ret;
88 }
89
90 let env = ctx.data();
91 let memory = unsafe { env.memory_view(&ctx) };
92 let option_pid = wasi_try_mem_ok!(pid_ptr.read(&memory));
93 let option_pid = match option_pid.tag {
94 OptionTag::None => None,
95 OptionTag::Some => Some(option_pid.pid),
96 _ => return Ok(Errno::Inval),
97 };
98 tracing::trace!("filter_pid = {:?}", option_pid);
99
100 wasi_try_mem_ok!(pid_ptr.write(
102 &memory,
103 OptionPid {
104 tag: OptionTag::None,
105 pid: 0,
106 }
107 ));
108 wasi_try_mem_ok!(status_ptr.write(
109 &memory,
110 JoinStatus {
111 tag: JoinStatusType::Nothing,
112 u: JoinStatusUnion { nothing: 0 },
113 }
114 ));
115
116 let pid = match option_pid {
118 None => {
119 let mut process = ctx.data_mut().process.clone();
120
121 let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
124 let child_exit = process.join_any_child().await;
125 match child_exit {
126 Ok(Some((pid, exit_code))) => {
127 tracing::trace!(%pid, %exit_code, "triggered child join");
128 trace!(ret_id = pid.raw(), exit_code = exit_code.raw());
129 JoinStatusResult::ExitNormal(pid, exit_code)
130 }
131 Ok(None) => {
132 tracing::trace!("triggered child join (no child)");
133 JoinStatusResult::Err(Errno::Child)
134 }
135 Err(err) => {
136 tracing::trace!(%err, "error triggered on child join");
137 JoinStatusResult::Err(err)
138 }
139 }
140 })?;
141 return match res {
142 AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
143 AsyncifyAction::Unwind => Ok(Errno::Success),
144 };
145 }
146 Some(pid) => pid,
147 };
148
149 let pid: WasiProcessId = pid.into();
151
152 let mut process = {
155 let mut inner = ctx.data().process.lock();
156 let process = inner
157 .children
158 .iter()
159 .filter(|c| c.pid == pid)
160 .map(Clone::clone)
161 .next();
162 inner.children.retain(|c| c.pid != pid);
163 process
164 };
165
166 if process.is_none() {
169 process = ctx.data().control_plane.get_process(pid);
170 }
171
172 if let Some(process) = process {
173 wasi_try_mem_ok!(pid_ptr.write(
175 &memory,
176 OptionPid {
177 tag: OptionTag::Some,
178 pid: pid.raw(),
179 }
180 ));
181
182 if flags.contains(JoinFlags::NON_BLOCKING) {
183 if let Some(status) = process.try_join() {
184 let exit_code = status.unwrap_or_else(|_| Errno::Child.into());
185 ret_result(ctx, JoinStatusResult::ExitNormal(pid, exit_code))
186 } else {
187 ret_result(ctx, JoinStatusResult::Nothing)
188 }
189 } else {
190 let process2 = process.clone();
192 let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
193 let exit_code = process.join().await.unwrap_or_else(|_| Errno::Child.into());
194 tracing::trace!(%exit_code, "triggered child join");
195 JoinStatusResult::ExitNormal(pid, exit_code)
196 })?;
197 match res {
198 AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
199 AsyncifyAction::Unwind => Ok(Errno::Success),
200 }
201 }
202 } else {
203 trace!(ret_id = pid.raw(), "status=nothing");
204 ret_result(ctx, JoinStatusResult::Nothing)
205 }
206}