wasmer_wasix/journal/effector/
memory_and_snapshot.rs

1use std::collections::{BTreeMap, hash_map};
2
3#[allow(unused)]
4use lz4_flex::{
5    self, block, compress_prepend_size, decompress, decompress_into, decompress_size_prepended,
6};
7
8use crate::os::task::process::MemorySnapshotRegion;
9
10use super::*;
11
12/// This value is tweaked to minimize the amount of journal
13/// entries for a nominal workload but keep the resolution
14/// high enough that it reduces overhead and inefficiency.
15///
16/// The test case used to tune this value was a HTTP server
17/// serving a HTTP web page on hyper compiled to WASM. The
18/// server was first warmed up with a bunch of requests then
19/// the journal entries measured on subsequent requests, these
20/// are the values
21///
22/// Resolution | Journal Size | Memory Overhead
23/// -----------|--------------|----------------
24/// 128 bytes  | 3584 bytes   | 12.5%
25/// 256 bytes  | 4096 bytes   | 6.25%
26/// 512 bytes  | 7680 bytes   | 3.12%
27/// 1024 bytes | 12288 bytes  | 1.56%
28/// 2048 bytes | 22528 bytes  | 0.78%
29/// 4096 bytes | 32769 bytes  | 0.39%
30///
31/// Based on this data we have settled on 512 byte memory resolution
32/// for region extents which keeps the journal size to a reasonable
33/// value and the memory overhead of the hash table within an acceptable
34/// limit
35const MEMORY_REGION_RESOLUTION: u64 = 512;
36
37impl JournalEffector {
38    pub fn save_memory_and_snapshot(
39        ctx: &mut FunctionEnvMut<'_, WasiEnv>,
40        guard: &mut MutexGuard<'_, WasiProcessInner>,
41        trigger: SnapshotTrigger,
42    ) -> anyhow::Result<()> {
43        let env = ctx.data();
44        let memory = unsafe { env.memory_view(ctx) };
45
46        // Compute all the regions that we need to save which is basically
47        // everything in the memory except for the memory stacks.
48        //
49        // We do not want the regions to be greater than 64KB as this will
50        // otherwise create too much inefficiency. We choose 64KB as its
51        // aligned with the standard WASM page size.
52        let mut cur = 0u64;
53        let mut regions = Vec::<MemorySnapshotRegion>::new();
54        while cur < memory.data_size() {
55            //let mut again = false;
56            let next = ((cur + MEMORY_REGION_RESOLUTION) / MEMORY_REGION_RESOLUTION)
57                * MEMORY_REGION_RESOLUTION;
58            let end = memory.data_size().min(next);
59            /*
60            for (_, thread) in guard.threads.iter() {
61                let layout = thread.memory_layout();
62                if cur >= layout.stack_lower && cur < layout.stack_upper {
63                    cur = layout.stack_upper;
64                    again = true;
65                    break;
66                }
67                if end > layout.stack_lower && end < layout.stack_upper {
68                    end = end.min(layout.stack_lower);
69                }
70            }
71            if again {
72                continue;
73            }
74            */
75
76            let region = cur..end;
77            regions.push(region.into());
78            cur = end;
79        }
80
81        // Next we examine the dirty page manager and filter out any pages
82        // that have not been explicitly written to (according to the
83        // PTE)
84        //
85        // # TODO
86        // https://docs.kernel.org/admin-guide/mm/soft-dirty.html
87
88        // Now that we know all the regions that need to be saved we
89        // enter a processing loop that dumps all the data to the log
90        // file in an orderly manner.
91        let memory = unsafe { env.memory_view(ctx) };
92        let journal = ctx.data().active_journal()?;
93
94        let mut regions_phase2 = BTreeMap::new();
95        for region in regions.drain(..) {
96            // We grab this region of memory as a vector and hash
97            // it, which allows us to make some logging efficiency
98            // gains.
99            #[cfg(not(feature = "sys"))]
100            let data = memory
101                .copy_range_to_vec(region.into())
102                .map_err(mem_error_to_wasi)?;
103
104            // For x86 implementations running natively we have a
105            // performance optimization that avoids a copy of the
106            // memory when hashing for changed regions
107            #[cfg(feature = "sys")]
108            let data = {
109                let d = unsafe { memory.data_unchecked() };
110                if region.end > d.len() as u64 {
111                    return Err(anyhow::anyhow!(
112                        "memory access out of bounds ({} vs {})",
113                        region.end,
114                        d.len()
115                    ));
116                }
117                &d[region.start as usize..region.end as usize]
118            };
119
120            // Compute a checksum and skip the memory if its already
121            // been saved to the journal once already
122            let hash = {
123                let h: [u8; 32] = blake3::hash(data).into();
124                u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
125            };
126            match guard.snapshot_memory_hash.entry(region) {
127                hash_map::Entry::Occupied(mut val) => {
128                    if *val.get() == hash {
129                        continue;
130                    }
131                    val.insert(hash);
132                }
133                hash_map::Entry::Vacant(vacant) => {
134                    vacant.insert(hash);
135                }
136            }
137
138            regions_phase2.insert(region, ());
139        }
140
141        // Combine regions together that are next to each other
142        regions.clear();
143        let mut last_end = None;
144        for (region, _) in regions_phase2.iter() {
145            if Some(region.start) == last_end {
146                regions.last_mut().unwrap().end = region.end;
147            } else {
148                regions.push(*region);
149            }
150            last_end = Some(region.end);
151        }
152
153        // Perform the writes
154        for region in regions {
155            // We grab this region of memory as a vector and hash
156            // it, which allows us to make some logging efficiency
157            // gains.
158            #[cfg(not(feature = "sys"))]
159            let compressed_data = compress_prepend_size(
160                &memory
161                    .copy_range_to_vec(region.into())
162                    .map_err(mem_error_to_wasi)?,
163            );
164
165            // UNSAFE:
166            //
167            // This is only unsafe while the WASM process itself is running and using this
168            // method avoids a memory copy before its compressed, this also signficantly
169            // reduces the memory process
170            #[cfg(feature = "sys")]
171            let compressed_data = compress_prepend_size(unsafe {
172                &memory.data_unchecked()[region.start as usize..region.end as usize]
173            });
174
175            // Now we write it to the snap snapshot capturer
176            journal
177                .write(JournalEntry::UpdateMemoryRegionV1 {
178                    region: region.into(),
179                    compressed_data: compressed_data.into(),
180                })
181                .map_err(map_snapshot_err)?;
182        }
183
184        // Finally we mark the end of the snapshot so that
185        // it can act as a restoration point
186        let when = SystemTime::now();
187        journal
188            .write(JournalEntry::SnapshotV1 { when, trigger })
189            .map_err(map_snapshot_err)?;
190
191        // When writing snapshots we also flush the journal so that
192        // its guaranteed to be on the disk or network pipe
193        journal.flush().map_err(map_snapshot_err)?;
194        Ok(())
195    }
196
197    /// # Safety
198    ///
199    /// This function manipulates the memory of the process and thus must be executed
200    /// by the WASM process thread itself.
201    ///
202    pub unsafe fn apply_compressed_memory(
203        ctx: &mut FunctionEnvMut<'_, WasiEnv>,
204        region: Range<u64>,
205        compressed_data: &[u8],
206    ) -> anyhow::Result<()> {
207        let (env, mut store) = ctx.data_and_store_mut();
208
209        let (uncompressed_size, compressed_data) = block::uncompressed_size(compressed_data)
210            .map_err(|err| anyhow::anyhow!("failed to decompress - {err}"))?;
211
212        let memory = unsafe { env.memory() };
213        memory.grow_at_least(&mut store, region.end + uncompressed_size as u64)?;
214
215        // Write the data to the memory
216        let memory = unsafe { env.memory_view(&store) };
217
218        #[cfg(not(feature = "sys"))]
219        {
220            let decompressed_data = decompress(compressed_data, uncompressed_size)?;
221            memory
222                .write(region.start, &decompressed_data)
223                .map_err(|err| WasiRuntimeError::Runtime(RuntimeError::user(err.into())))?;
224
225            // Break the region down into chunks that align with the resolution
226            let mut decompressed_data = &decompressed_data[..];
227            let mut offset = region.start;
228            while offset < region.end {
229                let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
230                let region = offset..next;
231                offset = next;
232
233                // Compute the hash and update it
234                let size = region.end - region.start;
235                let hash = {
236                    let h: [u8; 32] = blake3::hash(&decompressed_data[..size as usize]).into();
237                    u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
238                };
239                env.process
240                    .inner
241                    .0
242                    .lock()
243                    .unwrap()
244                    .snapshot_memory_hash
245                    .insert(region.into(), hash);
246
247                // Shift the data pointer
248                decompressed_data = &decompressed_data[size as usize..];
249            }
250        }
251
252        #[cfg(feature = "sys")]
253        unsafe {
254            let start = region.start as usize;
255            let end = start + uncompressed_size;
256            decompress_into(
257                compressed_data,
258                &mut memory.data_unchecked_mut()[start..end],
259            )?;
260
261            // Break the region down into chunks that align with the resolution
262            let data = &memory.data_unchecked();
263            let mut offset = region.start;
264            while offset < region.end {
265                let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
266                let region = offset..next;
267
268                // Compute the hash and update it
269                let hash = {
270                    let h: [u8; 32] = blake3::hash(&data[offset as usize..next as usize]).into();
271                    u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
272                };
273                env.process
274                    .inner
275                    .0
276                    .lock()
277                    .unwrap()
278                    .snapshot_memory_hash
279                    .insert(region.into(), hash);
280
281                offset = next;
282            }
283        }
284
285        Ok(())
286    }
287}