wasmer_wasix/journal/effector/memory_and_snapshot.rs
1use std::collections::{BTreeMap, hash_map};
2
3#[allow(unused)]
4use lz4_flex::{
5 self, block, compress_prepend_size, decompress, decompress_into, decompress_size_prepended,
6};
7
8use crate::os::task::process::MemorySnapshotRegion;
9
10use super::*;
11
12/// This value is tweaked to minimize the amount of journal
13/// entries for a nominal workload but keep the resolution
14/// high enough that it reduces overhead and inefficiency.
15///
16/// The test case used to tune this value was a HTTP server
17/// serving a HTTP web page on hyper compiled to WASM. The
18/// server was first warmed up with a bunch of requests then
19/// the journal entries measured on subsequent requests, these
20/// are the values
21///
22/// Resolution | Journal Size | Memory Overhead
23/// -----------|--------------|----------------
24/// 128 bytes | 3584 bytes | 12.5%
25/// 256 bytes | 4096 bytes | 6.25%
26/// 512 bytes | 7680 bytes | 3.12%
27/// 1024 bytes | 12288 bytes | 1.56%
28/// 2048 bytes | 22528 bytes | 0.78%
29/// 4096 bytes | 32769 bytes | 0.39%
30///
31/// Based on this data we have settled on 512 byte memory resolution
32/// for region extents which keeps the journal size to a reasonable
33/// value and the memory overhead of the hash table within an acceptable
34/// limit
35const MEMORY_REGION_RESOLUTION: u64 = 512;
36
37impl JournalEffector {
38 pub fn save_memory_and_snapshot(
39 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
40 guard: &mut MutexGuard<'_, WasiProcessInner>,
41 trigger: SnapshotTrigger,
42 ) -> anyhow::Result<()> {
43 let env = ctx.data();
44 let memory = unsafe { env.memory_view(ctx) };
45
46 // Compute all the regions that we need to save which is basically
47 // everything in the memory except for the memory stacks.
48 //
49 // We do not want the regions to be greater than 64KB as this will
50 // otherwise create too much inefficiency. We choose 64KB as its
51 // aligned with the standard WASM page size.
52 let mut cur = 0u64;
53 let mut regions = Vec::<MemorySnapshotRegion>::new();
54 while cur < memory.data_size() {
55 //let mut again = false;
56 let next = ((cur + MEMORY_REGION_RESOLUTION) / MEMORY_REGION_RESOLUTION)
57 * MEMORY_REGION_RESOLUTION;
58 let end = memory.data_size().min(next);
59 /*
60 for (_, thread) in guard.threads.iter() {
61 let layout = thread.memory_layout();
62 if cur >= layout.stack_lower && cur < layout.stack_upper {
63 cur = layout.stack_upper;
64 again = true;
65 break;
66 }
67 if end > layout.stack_lower && end < layout.stack_upper {
68 end = end.min(layout.stack_lower);
69 }
70 }
71 if again {
72 continue;
73 }
74 */
75
76 let region = cur..end;
77 regions.push(region.into());
78 cur = end;
79 }
80
81 // Next we examine the dirty page manager and filter out any pages
82 // that have not been explicitly written to (according to the
83 // PTE)
84 //
85 // # TODO
86 // https://docs.kernel.org/admin-guide/mm/soft-dirty.html
87
88 // Now that we know all the regions that need to be saved we
89 // enter a processing loop that dumps all the data to the log
90 // file in an orderly manner.
91 let memory = unsafe { env.memory_view(ctx) };
92 let journal = ctx.data().active_journal()?;
93
94 let mut regions_phase2 = BTreeMap::new();
95 for region in regions.drain(..) {
96 // We grab this region of memory as a vector and hash
97 // it, which allows us to make some logging efficiency
98 // gains.
99 #[cfg(not(feature = "sys"))]
100 let data = memory
101 .copy_range_to_vec(region.into())
102 .map_err(mem_error_to_wasi)?;
103
104 // For x86 implementations running natively we have a
105 // performance optimization that avoids a copy of the
106 // memory when hashing for changed regions
107 #[cfg(feature = "sys")]
108 let data = {
109 let d = unsafe { memory.data_unchecked() };
110 if region.end > d.len() as u64 {
111 return Err(anyhow::anyhow!(
112 "memory access out of bounds ({} vs {})",
113 region.end,
114 d.len()
115 ));
116 }
117 &d[region.start as usize..region.end as usize]
118 };
119
120 // Compute a checksum and skip the memory if its already
121 // been saved to the journal once already
122 let hash = {
123 let h: [u8; 32] = blake3::hash(data).into();
124 u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
125 };
126 match guard.snapshot_memory_hash.entry(region) {
127 hash_map::Entry::Occupied(mut val) => {
128 if *val.get() == hash {
129 continue;
130 }
131 val.insert(hash);
132 }
133 hash_map::Entry::Vacant(vacant) => {
134 vacant.insert(hash);
135 }
136 }
137
138 regions_phase2.insert(region, ());
139 }
140
141 // Combine regions together that are next to each other
142 regions.clear();
143 let mut last_end = None;
144 for (region, _) in regions_phase2.iter() {
145 if Some(region.start) == last_end {
146 regions.last_mut().unwrap().end = region.end;
147 } else {
148 regions.push(*region);
149 }
150 last_end = Some(region.end);
151 }
152
153 // Perform the writes
154 for region in regions {
155 // We grab this region of memory as a vector and hash
156 // it, which allows us to make some logging efficiency
157 // gains.
158 #[cfg(not(feature = "sys"))]
159 let compressed_data = compress_prepend_size(
160 &memory
161 .copy_range_to_vec(region.into())
162 .map_err(mem_error_to_wasi)?,
163 );
164
165 // UNSAFE:
166 //
167 // This is only unsafe while the WASM process itself is running and using this
168 // method avoids a memory copy before its compressed, this also signficantly
169 // reduces the memory process
170 #[cfg(feature = "sys")]
171 let compressed_data = compress_prepend_size(unsafe {
172 &memory.data_unchecked()[region.start as usize..region.end as usize]
173 });
174
175 // Now we write it to the snap snapshot capturer
176 journal
177 .write(JournalEntry::UpdateMemoryRegionV1 {
178 region: region.into(),
179 compressed_data: compressed_data.into(),
180 })
181 .map_err(map_snapshot_err)?;
182 }
183
184 // Finally we mark the end of the snapshot so that
185 // it can act as a restoration point
186 let when = SystemTime::now();
187 journal
188 .write(JournalEntry::SnapshotV1 { when, trigger })
189 .map_err(map_snapshot_err)?;
190
191 // When writing snapshots we also flush the journal so that
192 // its guaranteed to be on the disk or network pipe
193 journal.flush().map_err(map_snapshot_err)?;
194 Ok(())
195 }
196
197 /// # Safety
198 ///
199 /// This function manipulates the memory of the process and thus must be executed
200 /// by the WASM process thread itself.
201 ///
202 pub unsafe fn apply_compressed_memory(
203 ctx: &mut FunctionEnvMut<'_, WasiEnv>,
204 region: Range<u64>,
205 compressed_data: &[u8],
206 ) -> anyhow::Result<()> {
207 let (env, mut store) = ctx.data_and_store_mut();
208
209 let (uncompressed_size, compressed_data) = block::uncompressed_size(compressed_data)
210 .map_err(|err| anyhow::anyhow!("failed to decompress - {err}"))?;
211
212 let memory = unsafe { env.memory() };
213 memory.grow_at_least(&mut store, region.end + uncompressed_size as u64)?;
214
215 // Write the data to the memory
216 let memory = unsafe { env.memory_view(&store) };
217
218 #[cfg(not(feature = "sys"))]
219 {
220 let decompressed_data = decompress(compressed_data, uncompressed_size)?;
221 memory
222 .write(region.start, &decompressed_data)
223 .map_err(|err| WasiRuntimeError::Runtime(RuntimeError::user(err.into())))?;
224
225 // Break the region down into chunks that align with the resolution
226 let mut decompressed_data = &decompressed_data[..];
227 let mut offset = region.start;
228 while offset < region.end {
229 let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
230 let region = offset..next;
231 offset = next;
232
233 // Compute the hash and update it
234 let size = region.end - region.start;
235 let hash = {
236 let h: [u8; 32] = blake3::hash(&decompressed_data[..size as usize]).into();
237 u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
238 };
239 env.process
240 .inner
241 .0
242 .lock()
243 .unwrap()
244 .snapshot_memory_hash
245 .insert(region.into(), hash);
246
247 // Shift the data pointer
248 decompressed_data = &decompressed_data[size as usize..];
249 }
250 }
251
252 #[cfg(feature = "sys")]
253 unsafe {
254 let start = region.start as usize;
255 let end = start + uncompressed_size;
256 decompress_into(
257 compressed_data,
258 &mut memory.data_unchecked_mut()[start..end],
259 )?;
260
261 // Break the region down into chunks that align with the resolution
262 let data = &memory.data_unchecked();
263 let mut offset = region.start;
264 while offset < region.end {
265 let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
266 let region = offset..next;
267
268 // Compute the hash and update it
269 let hash = {
270 let h: [u8; 32] = blake3::hash(&data[offset as usize..next as usize]).into();
271 u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
272 };
273 env.process
274 .inner
275 .0
276 .lock()
277 .unwrap()
278 .snapshot_memory_hash
279 .insert(region.into(), hash);
280
281 offset = next;
282 }
283 }
284
285 Ok(())
286 }
287}