wasmer_wasix/syscalls/wasix/
proc_join.rs

1use std::task::Waker;
2
3use serde::{Deserialize, Serialize};
4use wasmer::FromToNativeWasmType;
5use wasmer_wasix_types::wasi::{JoinFlags, JoinStatus, JoinStatusType, JoinStatusUnion, OptionPid};
6
7use super::*;
8use crate::{WasiProcess, syscalls::*};
9
10#[derive(Serialize, Deserialize)]
11enum JoinStatusResult {
12    Nothing,
13    ExitNormal(WasiProcessId, ExitCode),
14    Err(Errno),
15}
16
17/// ### `proc_join()`
18/// Joins the child process, blocking this one until the other finishes
19///
20/// ## Parameters
21///
22/// * `pid` - Handle of the child process to wait on
23//#[instrument(level = "trace", skip_all, fields(pid = ctx.data().process.pid().raw()), ret)]
24pub fn proc_join<M: MemorySize + 'static>(
25    mut ctx: FunctionEnvMut<'_, WasiEnv>,
26    pid_ptr: WasmPtr<OptionPid, M>,
27    flags: JoinFlags,
28    status_ptr: WasmPtr<JoinStatus, M>,
29) -> Result<Errno, WasiError> {
30    WasiEnv::do_pending_operations(&mut ctx)?;
31
32    proc_join_internal(ctx, pid_ptr, flags, status_ptr)
33}
34
35pub(super) fn proc_join_internal<M: MemorySize + 'static>(
36    mut ctx: FunctionEnvMut<'_, WasiEnv>,
37    pid_ptr: WasmPtr<OptionPid, M>,
38    flags: JoinFlags,
39    status_ptr: WasmPtr<JoinStatus, M>,
40) -> Result<Errno, WasiError> {
41    ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
42
43    // This lambda will look at what we wrote in the status variable
44    // and use this to determine the return code sent back to the caller
45    let ret_result = {
46        move |ctx: FunctionEnvMut<'_, WasiEnv>, status: JoinStatusResult| {
47            let mut ret = Errno::Success;
48
49            let view = unsafe { ctx.data().memory_view(&ctx) };
50            let status = match status {
51                JoinStatusResult::Nothing => JoinStatus {
52                    tag: JoinStatusType::Nothing,
53                    u: JoinStatusUnion { nothing: 0 },
54                },
55                JoinStatusResult::ExitNormal(pid, exit_code) => {
56                    let option_pid = OptionPid {
57                        tag: OptionTag::Some,
58                        pid: pid.raw() as Pid,
59                    };
60                    pid_ptr.write(&view, option_pid).ok();
61
62                    JoinStatus {
63                        tag: JoinStatusType::ExitNormal,
64                        u: JoinStatusUnion {
65                            exit_normal: exit_code.into(),
66                        },
67                    }
68                }
69                JoinStatusResult::Err(err) => {
70                    ret = err;
71                    JoinStatus {
72                        tag: JoinStatusType::Nothing,
73                        u: JoinStatusUnion { nothing: 0 },
74                    }
75                }
76            };
77            wasi_try_mem_ok!(status_ptr.write(&view, status));
78            Ok(ret)
79        }
80    };
81
82    // If we were just restored the stack then we were woken after a deep sleep
83    // and the return calues are already set
84    if let Some(status) = unsafe { handle_rewind::<M, _>(&mut ctx) } {
85        let ret = ret_result(ctx, status);
86        tracing::trace!("rewound join ret={:?}", ret);
87        return ret;
88    }
89
90    let env = ctx.data();
91    let memory = unsafe { env.memory_view(&ctx) };
92    let option_pid = wasi_try_mem_ok!(pid_ptr.read(&memory));
93    let option_pid = match option_pid.tag {
94        OptionTag::None => None,
95        OptionTag::Some => Some(option_pid.pid),
96    };
97    tracing::trace!("filter_pid = {:?}", option_pid);
98
99    // Clear the existing values (in case something goes wrong)
100    wasi_try_mem_ok!(pid_ptr.write(
101        &memory,
102        OptionPid {
103            tag: OptionTag::None,
104            pid: 0,
105        }
106    ));
107    wasi_try_mem_ok!(status_ptr.write(
108        &memory,
109        JoinStatus {
110            tag: JoinStatusType::Nothing,
111            u: JoinStatusUnion { nothing: 0 },
112        }
113    ));
114
115    // If the ID is maximum then it means wait for any of the children
116    let pid = match option_pid {
117        None => {
118            let mut process = ctx.data_mut().process.clone();
119
120            // We wait for any process to exit (if it takes too long
121            // then we go into a deep sleep)
122            let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
123                let child_exit = process.join_any_child().await;
124                match child_exit {
125                    Ok(Some((pid, exit_code))) => {
126                        tracing::trace!(%pid, %exit_code, "triggered child join");
127                        trace!(ret_id = pid.raw(), exit_code = exit_code.raw());
128                        JoinStatusResult::ExitNormal(pid, exit_code)
129                    }
130                    Ok(None) => {
131                        tracing::trace!("triggered child join (no child)");
132                        JoinStatusResult::Err(Errno::Child)
133                    }
134                    Err(err) => {
135                        tracing::trace!(%err, "error triggered on child join");
136                        JoinStatusResult::Err(err)
137                    }
138                }
139            })?;
140            return match res {
141                AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
142                AsyncifyAction::Unwind => Ok(Errno::Success),
143            };
144        }
145        Some(pid) => pid,
146    };
147
148    // Otherwise we wait for the specific PID
149    let pid: WasiProcessId = pid.into();
150
151    // Waiting for a process that is an explicit child will join it
152    // meaning it will no longer be a sub-process of the main process
153    let mut process = {
154        let mut inner = ctx.data().process.lock();
155        let process = inner
156            .children
157            .iter()
158            .filter(|c| c.pid == pid)
159            .map(Clone::clone)
160            .next();
161        inner.children.retain(|c| c.pid != pid);
162        process
163    };
164
165    // Otherwise it could be the case that we are waiting for a process
166    // that is not a child of this process but may still be running
167    if process.is_none() {
168        process = ctx.data().control_plane.get_process(pid);
169    }
170
171    if let Some(process) = process {
172        // We can already set the process ID
173        wasi_try_mem_ok!(pid_ptr.write(
174            &memory,
175            OptionPid {
176                tag: OptionTag::Some,
177                pid: pid.raw(),
178            }
179        ));
180
181        if flags.contains(JoinFlags::NON_BLOCKING) {
182            if let Some(status) = process.try_join() {
183                let exit_code = status.unwrap_or_else(|_| Errno::Child.into());
184                ret_result(ctx, JoinStatusResult::ExitNormal(pid, exit_code))
185            } else {
186                ret_result(ctx, JoinStatusResult::Nothing)
187            }
188        } else {
189            // Wait for the process to finish
190            let process2 = process.clone();
191            let res = __asyncify_with_deep_sleep::<M, _, _>(ctx, async move {
192                let exit_code = process.join().await.unwrap_or_else(|_| Errno::Child.into());
193                tracing::trace!(%exit_code, "triggered child join");
194                JoinStatusResult::ExitNormal(pid, exit_code)
195            })?;
196            match res {
197                AsyncifyAction::Finish(ctx, result) => ret_result(ctx, result),
198                AsyncifyAction::Unwind => Ok(Errno::Success),
199            }
200        }
201    } else {
202        trace!(ret_id = pid.raw(), "status=nothing");
203        ret_result(ctx, JoinStatusResult::Nothing)
204    }
205}