wasmer_wasix/syscalls/journal/
restore_snapshot.rs

1use super::*;
2
3/// Safety: This function manipulates the memory of the process and thus must
4/// be executed by the WASM process thread itself.
5///
6#[cfg(feature = "journal")]
7#[allow(clippy::result_large_err)]
8#[tracing::instrument(skip_all)]
9pub unsafe fn restore_snapshot(
10    mut ctx: FunctionEnvMut<'_, WasiEnv>,
11    journal: &DynReadableJournal,
12    bootstrapping: bool,
13) -> Result<Option<RewindState>, WasiRuntimeError> {
14    use std::{collections::BTreeMap, ops::Range};
15
16    use crate::{journal::Journal, os::task::process::MemorySnapshotRegion};
17
18    // Create the journal replay runner
19    let mut runner = JournalSyscallPlayer::new(ctx, bootstrapping);
20
21    // We read all the logs from the journal into the state machine
22    let mut ethereal_events = Vec::new();
23    while let Some(next) = journal.read().map_err(anyhow_err_to_runtime_err)? {
24        tracing::trace!(event=?next, "restoring event");
25        unsafe { runner.play_event(next.into_inner(), Some(&mut ethereal_events)) }?;
26    }
27
28    // Check for events that are orphaned
29    for evt in ethereal_events {
30        tracing::trace!("Orphaned ethereal events - {:?}", evt);
31    }
32
33    // FIXME: if the stdout/stderr FDs were closed as a result of replaying the journal,
34    // this breaks. A potential fix would be to only close those two FDs afterwards; so
35    // a `JournalSyscallPlayer::should_close_stdout: bool` or similar.
36    // Now output the stdout and stderr
37    if let Some(stdout) = runner.stdout {
38        tracing::trace!("replaying stdout");
39        for JournalStdIoWrite {
40            offset,
41            data,
42            is_64bit,
43        } in stdout
44        {
45            if is_64bit {
46                JournalEffector::apply_fd_write::<Memory64>(&mut runner.ctx, 1, offset, data)
47            } else {
48                JournalEffector::apply_fd_write::<Memory32>(&mut runner.ctx, 1, offset, data)
49            }
50            .map_err(anyhow_err_to_runtime_err)?;
51        }
52    }
53
54    if let Some(stderr) = runner.stderr {
55        tracing::trace!("replaying stderr");
56        for JournalStdIoWrite {
57            offset,
58            data,
59            is_64bit,
60        } in stderr
61        {
62            if is_64bit {
63                JournalEffector::apply_fd_write::<Memory64>(&mut runner.ctx, 2, offset, data)
64            } else {
65                JournalEffector::apply_fd_write::<Memory32>(&mut runner.ctx, 2, offset, data)
66            }
67            .map_err(anyhow_err_to_runtime_err)?;
68        }
69    }
70
71    // Apply the memory changes (if this is in bootstrapping mode we differed them)
72    for (region, data) in runner.differ_memory {
73        tracing::trace!(
74            "Replay journal - UpdateMemory - region:{:?}, data.len={}",
75            region,
76            data.len()
77        );
78        unsafe { JournalEffector::apply_compressed_memory(&mut runner.ctx, region, &data) }
79            .map_err(anyhow_err_to_runtime_err)?;
80    }
81
82    // Once we get to this point we are no longer replaying the journal
83    // and need to clear this flag, the reason is that restoring the
84    // background threads may immediately process requests while this
85    // flag is still set which would be bad
86    tracing::trace!("replaying journal=false");
87    runner.ctx.data_mut().replaying_journal = false;
88
89    // Spawn all the threads
90    let thread_count = runner.spawn_threads.len();
91    tracing::trace!(thread_count, "restoring threads");
92    for (index, (thread_id, thread_state)) in runner.spawn_threads.into_iter().enumerate() {
93        tracing::trace!("restoring thread {}/{}", index + 1, thread_count);
94
95        if thread_state.is_64bit {
96            JournalEffector::apply_thread_state::<Memory64>(
97                &mut runner.ctx,
98                thread_id,
99                thread_state.memory_stack,
100                thread_state.rewind_stack,
101                thread_state.store_data,
102                thread_state.start,
103                thread_state.layout,
104            )
105            .map_err(anyhow_err_to_runtime_err)?;
106        } else {
107            JournalEffector::apply_thread_state::<Memory32>(
108                &mut runner.ctx,
109                thread_id,
110                thread_state.memory_stack,
111                thread_state.rewind_stack,
112                thread_state.store_data,
113                thread_state.start,
114                thread_state.layout,
115            )
116            .map_err(anyhow_err_to_runtime_err)?;
117        }
118    }
119    tracing::debug!(thread_count, "snapshot restore complete");
120
121    Ok(runner.rewind)
122}