wasmer_wasix/syscalls/wasix/
proc_join.rs

1use std::task::Waker;
2
3use serde::{Deserialize, Serialize};
4use wasmer::FromToNativeWasmType;
5use wasmer_wasix_types::wasi::{JoinFlags, JoinStatus, JoinStatusType, JoinStatusUnion, OptionPid};
6
7use super::*;
8use crate::{WasiProcess, syscalls::*};
9
10#[derive(Serialize, Deserialize)]
11enum JoinStatusResult {
12    Nothing,
13    ExitNormal(WasiProcessId, ExitCode),
14    Err(Errno),
15}
16
17/// ### `proc_join()`
18/// Joins the child process, blocking this one until the other finishes
19///
20/// ## Parameters
21///
22/// * `pid` - Handle of the child process to wait on
23//#[instrument(level = "trace", skip_all, fields(pid = ctx.data().process.pid().raw()), ret)]
24pub fn proc_join<M: MemorySize + 'static>(
25    mut ctx: FunctionEnvMut<'_, WasiEnv>,
26    pid_ptr: WasmPtr<OptionPid, M>,
27    flags: JoinFlags,
28    status_ptr: WasmPtr<JoinStatus, M>,
29) -> Result<Errno, WasiError> {
30    WasiEnv::do_pending_operations(&mut ctx)?;
31
32    proc_join_internal(ctx, pid_ptr, flags, status_ptr)
33}
34
35pub(super) fn proc_join_internal<M: MemorySize + 'static>(
36    mut ctx: FunctionEnvMut<'_, WasiEnv>,
37    pid_ptr: WasmPtr<OptionPid, M>,
38    flags: JoinFlags,
39    status_ptr: WasmPtr<JoinStatus, M>,
40) -> Result<Errno, WasiError> {
41    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
42
43    // This lambda will look at what we wrote in the status variable
44    // and use this to determine the return code sent back to the caller
45    let ret_result = {
46        move |ctx: FunctionEnvMut<'_, WasiEnv>, status: JoinStatusResult| {
47            let mut ret = Errno::Success;
48
49            let view = unsafe { ctx.data().memory_view(&ctx) };
50            let status = match status {
51                JoinStatusResult::Nothing => JoinStatus {
52                    tag: JoinStatusType::Nothing,
53                    u: JoinStatusUnion { nothing: 0 },
54                },
55                JoinStatusResult::ExitNormal(pid, exit_code) => {
56                    let option_pid = OptionPid {
57                        tag: OptionTag::Some,
58                        pid: pid.raw() as Pid,
59                    };
60                    pid_ptr.write(&view, option_pid).ok();
61
62                    JoinStatus {
63                        tag: JoinStatusType::ExitNormal,
64                        u: JoinStatusUnion {
65                            exit_normal: exit_code.into(),
66                        },
67                    }
68                }
69                JoinStatusResult::Err(err) => {
70                    ret = err;
71                    JoinStatus {
72                        tag: JoinStatusType::Nothing,
73                        u: JoinStatusUnion { nothing: 0 },
74                    }
75                }
76            };
77            wasi_try_mem_ok!(status_ptr.write(&view, status));
78            Ok(ret)
79        }
80    };
81
82    // If we were just restored the stack then we were woken after a deep sleep
83    // and the return calues are already set
84    if let Some(status) = unsafe { handle_rewind::<M, _>(&mut ctx) } {
85        let ret = ret_result(ctx, status);
86        tracing::trace!("rewound join ret={:?}", ret);
87        return ret;
88    }
89
90    let env = ctx.data();
91    let memory = unsafe { env.memory_view(&ctx) };
92    let option_pid = wasi_try_mem_ok!(pid_ptr.read(&memory));
93    let option_pid = match option_pid.tag {
94        OptionTag::None => None,
95        OptionTag::Some => Some(option_pid.pid),
96        _ => return Ok(Errno::Inval),
97    };
98    tracing::trace!("filter_pid = {:?}", option_pid);
99
100    // Clear the existing values (in case something goes wrong)
101    wasi_try_mem_ok!(pid_ptr.write(
102        &memory,
103        OptionPid {
104            tag: OptionTag::None,
105            pid: 0,
106        }
107    ));
108    wasi_try_mem_ok!(status_ptr.write(
109        &memory,
110        JoinStatus {
111            tag: JoinStatusType::Nothing,
112            u: JoinStatusUnion { nothing: 0 },
113        }
114    ));
115
116    // If the ID is maximum then it means wait for any of the children
117    let pid = match option_pid {
118        None => {
119            let mut process = ctx.data_mut().process.clone();
120
121            // We wait for any process to exit (if it takes too long
122            // then we go into a deep sleep)
123            let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
124                let child_exit = process.join_any_child().await;
125                match child_exit {
126                    Ok(Some((pid, exit_code))) => {
127                        tracing::trace!(%pid, %exit_code, "triggered child join");
128                        trace!(ret_id = pid.raw(), exit_code = exit_code.raw());
129                        JoinStatusResult::ExitNormal(pid, exit_code)
130                    }
131                    Ok(None) => {
132                        tracing::trace!("triggered child join (no child)");
133                        JoinStatusResult::Err(Errno::Child)
134                    }
135                    Err(err) => {
136                        tracing::trace!(%err, "error triggered on child join");
137                        JoinStatusResult::Err(err)
138                    }
139                }
140            })?;
141            return match res {
142                AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
143                AsyncifyAction::Unwind => Ok(Errno::Success),
144            };
145        }
146        Some(pid) => pid,
147    };
148
149    // Otherwise we wait for the specific PID
150    let pid: WasiProcessId = pid.into();
151
152    // Waiting for a process that is an explicit child will join it
153    // meaning it will no longer be a sub-process of the main process
154    let mut process = {
155        let mut inner = ctx.data().process.lock();
156        let process = inner
157            .children
158            .iter()
159            .filter(|c| c.pid == pid)
160            .map(Clone::clone)
161            .next();
162        inner.children.retain(|c| c.pid != pid);
163        process
164    };
165
166    // Otherwise it could be the case that we are waiting for a process
167    // that is not a child of this process but may still be running
168    if process.is_none() {
169        process = ctx.data().control_plane.get_process(pid);
170    }
171
172    if let Some(process) = process {
173        // We can already set the process ID
174        wasi_try_mem_ok!(pid_ptr.write(
175            &memory,
176            OptionPid {
177                tag: OptionTag::Some,
178                pid: pid.raw(),
179            }
180        ));
181
182        if flags.contains(JoinFlags::NON_BLOCKING) {
183            if let Some(status) = process.try_join() {
184                let exit_code = status.unwrap_or_else(|_| Errno::Child.into());
185                ret_result(ctx, JoinStatusResult::ExitNormal(pid, exit_code))
186            } else {
187                ret_result(ctx, JoinStatusResult::Nothing)
188            }
189        } else {
190            // Wait for the process to finish
191            let process2 = process.clone();
192            let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
193                let exit_code = process.join().await.unwrap_or_else(|_| Errno::Child.into());
194                tracing::trace!(%exit_code, "triggered child join");
195                JoinStatusResult::ExitNormal(pid, exit_code)
196            })?;
197            match res {
198                AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
199                AsyncifyAction::Unwind => Ok(Errno::Success),
200            }
201        }
202    } else {
203        trace!(ret_id = pid.raw(), "status=nothing");
204        ret_result(ctx, JoinStatusResult::Nothing)
205    }
206}