wasmer_journal/concrete/compacting.rs
1use std::{
2 collections::{HashMap, HashSet},
3 ops::{DerefMut, Range},
4 sync::{Arc, Mutex},
5};
6use wasmer_wasix_types::wasi;
7
8use super::*;
9
10pub type Fd = u32;
11
12/// Subgroup of events that may or may not be retained in the
13/// final journal as it is compacted.
14///
15/// By grouping events into subevents it makes it possible to ignore an
16/// entire subgroup of events which are superseeded by a later event. For
17/// example, all the events involved in creating a file are irrelevant if
18/// that file is later deleted.
19#[derive(Debug, Default)]
20struct SubGroupOfevents {
21 /// List of all the events that will be transferred over
22 /// to the compacted journal if this sub group is selected
23 /// to be carried over
24 events: Vec<usize>,
25 /// The path metadata attached to this sub group of events
26 /// is used to discard all subgroups related to a particular
27 /// path of a file or directory. This is especially important
28 /// if that file is later deleted and hence all the events
29 /// related to it are no longer relevant
30 path: Option<String>,
31 /// The write map allows the ccompacted to only keep the
32 /// events relevant to the final outcome of a compacted
33 /// journal rather than written regions that are later
34 /// overridden. This is a crude write map that does not
35 /// deal with overlapping writes (they still remain)
36 /// However in the majority of cases this will remove
37 /// duplicates while retaining a simple implementation
38 write_map: HashMap<MemoryRange, usize>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42struct MemoryRange {
43 start: u64,
44 end: u64,
45}
46impl From<Range<u64>> for MemoryRange {
47 fn from(value: Range<u64>) -> Self {
48 Self {
49 start: value.start,
50 end: value.end,
51 }
52 }
53}
54
55/// Index of a group of subevents in the journal which relate to a particular
56/// collective impact. For example. Creating a new file which may consist of
57/// an event to open a file, the events for writing the file data and the
58/// closing of the file are all related to a group of sub events that make
59/// up the act of creating that file. During compaction these events
60/// will be grouped together so they can be retained or discarded based
61/// on the final deterministic outcome of the entire log.
62///
63/// By grouping events into subevents it makes it possible to ignore an
64/// entire subgroup of events which are superceded by a later event. For
65/// example, all the events involved in creating a file are irrelevant if
66/// that file is later deleted.
67#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
68struct SubGroupIndex(u64);
69
70#[derive(Debug)]
71struct State {
72 /// The descriptor seed is used generate descriptor lookups
73 descriptor_seed: u64,
74 // We maintain a memory map of the events that are significant
75 memory_map: HashMap<MemoryRange, usize>,
76 // List of all the snapshots
77 snapshots: Vec<usize>,
78 // Last tty event thats been set
79 tty: Option<usize>,
80 // The last change directory event
81 chdir: Option<usize>,
82 // Last exit that signals the exiting of the process
83 process_exit: Option<usize>,
84 // Last event that initialized the module
85 init_module: Option<usize>,
86 // Events that create a particular directory
87 create_directory: HashMap<String, SubGroupIndex>,
88 // Events that remove a particular directory
89 remove_directory: HashMap<String, usize>,
90 // Events that unlink a file
91 unlink_file: HashMap<String, usize>,
92 // Thread events are only maintained while the thread and the
93 // process are still running
94 thread_map: HashMap<u32, usize>,
95 // Thread events are only maintained while the thread and the
96 // process are still running
97 staged_thread_map: HashMap<u32, usize>,
98 // Sockets that are open and not yet closed are kept here
99 open_sockets: HashMap<Fd, SubGroupIndex>,
100 // Sockets that are open and not yet closed are kept here
101 accepted_sockets: HashMap<Fd, SubGroupIndex>,
102 // Open pipes have two file descriptors that are associated with
103 // them. We keep track of both of them
104 open_pipes: HashMap<Fd, SubGroupIndex>,
105 // Any descriptors are assumed to be read only operations until
106 // they actually do something that changes the system
107 suspect_descriptors: HashMap<Fd, SubGroupIndex>,
108 // Any descriptors are assumed to be read only operations until
109 // they actually do something that changes the system
110 keep_descriptors: HashMap<Fd, SubGroupIndex>,
111 kept_descriptors: Vec<SubGroupIndex>,
112 // We put the IO related to stdio into a special list
113 // which can be purged when the program exits as its no longer
114 // important.
115 stdio_descriptors: HashMap<Fd, SubGroupIndex>,
116 // Event objects handle events from other parts of the process
117 // and feed them to a processing thread
118 event_descriptors: HashMap<Fd, SubGroupIndex>,
119 // Epoll events
120 epoll_descriptors: HashMap<Fd, SubGroupIndex>,
121 // We abstract the descriptor state so that multiple file descriptors
122 // can refer to the same file descriptors
123 sub_events: HashMap<SubGroupIndex, SubGroupOfevents>,
124 // Everything that will be retained during the next compact
125 whitelist: HashSet<usize>,
126 // We use an event index to track what to keep
127 event_index: usize,
128 // The delta list is used for all the events that happened
129 // after a compact started
130 delta_list: Option<Vec<usize>>,
131 // The inner journal that we will write to
132 inner_tx: Box<DynWritableJournal>,
133 // The inner journal that we read from
134 inner_rx: Box<DynReadableJournal>,
135}
136
137impl State {
138 fn create_filter<J>(
139 &self,
140 inner: J,
141 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
142 where
143 J: Journal,
144 {
145 let (w, r) = inner.split();
146 self.create_split_filter(w, r)
147 }
148
149 fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
150 where
151 W: WritableJournal,
152 R: ReadableJournal,
153 {
154 let mut filter = FilteredJournalBuilder::new()
155 .with_filter_events(self.whitelist.clone().into_iter().collect());
156
157 for event_index in self
158 .tty
159 .as_ref()
160 .into_iter()
161 .chain(self.chdir.as_ref().into_iter())
162 .chain(self.process_exit.as_ref().into_iter())
163 .chain(self.init_module.as_ref().into_iter())
164 .chain(self.snapshots.iter())
165 .chain(self.memory_map.values())
166 .chain(self.thread_map.values())
167 .chain(self.remove_directory.values())
168 .chain(self.unlink_file.values())
169 .cloned()
170 {
171 filter.add_event_to_whitelist(event_index);
172 }
173 for d in self
174 .create_directory
175 .values()
176 .filter_map(|l| self.sub_events.get(l))
177 .chain(
178 self.suspect_descriptors
179 .values()
180 .filter_map(|l| self.sub_events.get(l)),
181 )
182 .chain(
183 self.keep_descriptors
184 .values()
185 .filter_map(|l| self.sub_events.get(l)),
186 )
187 .chain(
188 self.kept_descriptors
189 .iter()
190 .filter_map(|l| self.sub_events.get(l)),
191 )
192 .chain(
193 self.open_sockets
194 .values()
195 .filter_map(|l| self.sub_events.get(l)),
196 )
197 .chain(
198 self.accepted_sockets
199 .values()
200 .filter_map(|l| self.sub_events.get(l)),
201 )
202 .chain(
203 self.event_descriptors
204 .values()
205 .filter_map(|l| self.sub_events.get(l)),
206 )
207 .chain(
208 self.epoll_descriptors
209 .values()
210 .filter_map(|l| self.sub_events.get(l)),
211 )
212 .chain(
213 self.open_pipes
214 .values()
215 .filter_map(|l| self.sub_events.get(l)),
216 )
217 .chain(
218 self.stdio_descriptors
219 .values()
220 .filter_map(|l| self.sub_events.get(l)),
221 )
222 {
223 for e in d.events.iter() {
224 filter.add_event_to_whitelist(*e);
225 }
226 for e in d.write_map.values() {
227 filter.add_event_to_whitelist(*e);
228 }
229 }
230 filter.build_split(writer, reader)
231 }
232
233 fn insert_new_sub_events_empty(&mut self) -> SubGroupIndex {
234 let lookup = SubGroupIndex(self.descriptor_seed);
235 self.descriptor_seed += 1;
236
237 self.sub_events.entry(lookup).or_default();
238
239 lookup
240 }
241
242 fn insert_new_sub_events(&mut self, event_index: usize) -> SubGroupIndex {
243 let lookup = SubGroupIndex(self.descriptor_seed);
244 self.descriptor_seed += 1;
245
246 self.sub_events
247 .entry(lookup)
248 .or_default()
249 .events
250 .push(event_index);
251
252 lookup
253 }
254
255 fn append_to_sub_events(&mut self, lookup: &SubGroupIndex, event_index: usize) {
256 if let Some(state) = self.sub_events.get_mut(lookup) {
257 state.events.push(event_index);
258 }
259 }
260
261 fn set_path_for_sub_events(&mut self, lookup: &SubGroupIndex, path: &str) {
262 if let Some(state) = self.sub_events.get_mut(lookup) {
263 state.path = Some(path.to_string());
264 }
265 }
266
267 fn cancel_sub_events_by_path(&mut self, path: &str) {
268 let test = Some(path.to_string());
269 self.sub_events.retain(|_, d| d.path != test);
270 }
271
272 fn solidify_sub_events_by_path(&mut self, path: &str) {
273 let test = Some(path.to_string());
274 self.sub_events
275 .iter_mut()
276 .filter(|(_, d)| d.path == test)
277 .for_each(|(_, d)| {
278 d.path.take();
279 })
280 }
281
282 fn find_sub_events(&self, fd: &u32) -> Option<SubGroupIndex> {
283 self.suspect_descriptors
284 .get(fd)
285 .cloned()
286 .or_else(|| self.open_sockets.get(fd).cloned())
287 .or_else(|| self.accepted_sockets.get(fd).cloned())
288 .or_else(|| self.open_pipes.get(fd).cloned())
289 .or_else(|| self.keep_descriptors.get(fd).cloned())
290 .or_else(|| self.event_descriptors.get(fd).cloned())
291 .or_else(|| self.stdio_descriptors.get(fd).cloned())
292 }
293
294 fn find_sub_events_and_append(&mut self, fd: &u32, event_index: usize) {
295 if let Some(lookup) = self.find_sub_events(fd) {
296 self.append_to_sub_events(&lookup, event_index);
297 }
298 }
299
300 fn clear_run_sub_events(&mut self) {
301 self.accepted_sockets.clear();
302 self.event_descriptors.clear();
303 self.memory_map.clear();
304 self.open_pipes.clear();
305 self.open_sockets.clear();
306 self.snapshots.clear();
307 self.staged_thread_map.clear();
308 self.stdio_descriptors.clear();
309 self.suspect_descriptors.clear();
310 self.thread_map.clear();
311 for i in 0..=2 {
312 let lookup = self.insert_new_sub_events_empty();
313 self.stdio_descriptors.insert(i, lookup);
314 }
315 }
316}
317
318/// Deduplicates memory and stacks to reduce the number of volume of
319/// log events sent to its inner capturer. Compacting the events occurs
320/// in line as the events are generated
321#[derive(Debug, Clone)]
322pub struct CompactingJournalTx {
323 state: Arc<Mutex<State>>,
324 compacting: Arc<Mutex<()>>,
325}
326
327#[derive(Debug)]
328pub struct CompactingJournalRx {
329 inner: Box<DynReadableJournal>,
330}
331
332impl CompactingJournalRx {
333 pub fn swap_inner(&mut self, mut with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
334 std::mem::swap(&mut self.inner, &mut with);
335 with
336 }
337}
338
339#[derive(Debug)]
340pub struct CompactingJournal {
341 tx: CompactingJournalTx,
342 rx: CompactingJournalRx,
343}
344
345impl CompactingJournal {
346 pub fn new<J>(inner: J) -> anyhow::Result<Self>
347 where
348 J: Journal,
349 {
350 let (tx, rx) = inner.split();
351 let mut state = State {
352 inner_tx: tx,
353 inner_rx: rx.as_restarted()?,
354 tty: None,
355 chdir: None,
356 process_exit: None,
357 init_module: None,
358 snapshots: Default::default(),
359 memory_map: Default::default(),
360 thread_map: Default::default(),
361 staged_thread_map: Default::default(),
362 open_sockets: Default::default(),
363 accepted_sockets: Default::default(),
364 open_pipes: Default::default(),
365 create_directory: Default::default(),
366 remove_directory: Default::default(),
367 unlink_file: Default::default(),
368 suspect_descriptors: Default::default(),
369 keep_descriptors: Default::default(),
370 kept_descriptors: Default::default(),
371 stdio_descriptors: Default::default(),
372 event_descriptors: Default::default(),
373 epoll_descriptors: Default::default(),
374 descriptor_seed: 0,
375 sub_events: Default::default(),
376 whitelist: Default::default(),
377 delta_list: None,
378 event_index: 0,
379 };
380 // stdio FDs are always created for a process initially, fill them out here
381 for i in 0..=2 {
382 let lookup = state.insert_new_sub_events_empty();
383 state.stdio_descriptors.insert(i, lookup);
384 }
385 Ok(Self {
386 tx: CompactingJournalTx {
387 state: Arc::new(Mutex::new(state)),
388 compacting: Arc::new(Mutex::new(())),
389 },
390 rx: CompactingJournalRx { inner: rx },
391 })
392 }
393
394 /// Creates a filter jounral which will write all
395 /// its events to an inner journal
396 pub fn create_filter<J>(
397 &self,
398 inner: J,
399 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
400 where
401 J: Journal,
402 {
403 self.tx.create_filter(inner)
404 }
405
406 /// Creates a filter journal which will write all
407 /// its events to writer and readers supplied
408 pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
409 where
410 W: WritableJournal,
411 R: ReadableJournal,
412 {
413 self.tx.create_split_filter(writer, reader)
414 }
415}
416
417/// Represents the results of a compaction operation
418#[derive(Debug, Default)]
419pub struct CompactResult {
420 pub total_size: u64,
421 pub total_events: usize,
422}
423
424impl CompactingJournalTx {
425 pub fn create_filter<J>(
426 &self,
427 inner: J,
428 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
429 where
430 J: Journal,
431 {
432 let state = self.state.lock().unwrap();
433 state.create_filter(inner)
434 }
435
436 pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
437 where
438 W: WritableJournal,
439 R: ReadableJournal,
440 {
441 let state = self.state.lock().unwrap();
442 state.create_split_filter(writer, reader)
443 }
444
445 pub fn swap(&self, other: Self) -> Self {
446 let mut state1 = self.state.lock().unwrap();
447 let mut state2 = other.state.lock().unwrap();
448 std::mem::swap(state1.deref_mut(), state2.deref_mut());
449 drop(state1);
450 drop(state2);
451 other
452 }
453
454 /// Compacts the inner journal into a new journal
455 pub fn compact_to<J>(&self, new_journal: J) -> anyhow::Result<CompactResult>
456 where
457 J: Journal,
458 {
459 // Enter a compacting lock
460 let _guard = self.compacting.lock().unwrap();
461
462 // The first thing we do is create a filter that we
463 // place around the new journal so that it only receives new events
464 let (new_journal, replay_rx) = {
465 let mut state = self.state.lock().unwrap();
466 state.delta_list.replace(Default::default());
467 (
468 state.create_filter(new_journal),
469 state.inner_rx.as_restarted()?,
470 )
471 };
472
473 let mut result = CompactResult::default();
474
475 // Read all the events and feed them into the filtered journal and then
476 // strip off the filter so that its a normal journal again
477 while let Some(entry) = replay_rx.read()? {
478 let res = new_journal.write(entry.into_inner())?;
479 if res.record_size() > 0 {
480 result.total_size += res.record_size();
481 result.total_events += 1;
482 }
483 }
484 let new_journal = new_journal.into_inner();
485
486 // We now go into a blocking situation which will freeze the journals
487 let mut state = self.state.lock().unwrap();
488
489 // Now we build a filtered journal which will pick up any events that were
490 // added which we did the compacting.
491 let new_journal = FilteredJournalBuilder::new()
492 .with_filter_events(
493 state
494 .delta_list
495 .take()
496 .unwrap_or_default()
497 .into_iter()
498 .collect(),
499 )
500 .build(new_journal);
501
502 // Now we feed all the events into the new journal using the delta filter. After the
503 // extra events are added we strip off the filter again
504 let replay_rx = state.inner_rx.as_restarted()?;
505 while let Some(entry) = replay_rx.read()? {
506 new_journal.write(entry.into_inner())?;
507 }
508 let new_journal = new_journal.into_inner();
509
510 // Now we install the new journal
511 let (mut tx, mut rx) = new_journal.split();
512 std::mem::swap(&mut state.inner_tx, &mut tx);
513 std::mem::swap(&mut state.inner_rx, &mut rx);
514
515 Ok(result)
516 }
517
518 pub fn replace_inner<J: Journal>(&self, inner: J) {
519 let mut state = self.state.lock().unwrap();
520 let (mut tx, mut rx) = inner.split();
521 std::mem::swap(&mut state.inner_tx, &mut tx);
522 std::mem::swap(&mut state.inner_rx, &mut rx);
523 }
524}
525
526impl WritableJournal for CompactingJournalTx {
527 #[allow(clippy::assigning_clones)]
528 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
529 let mut state = self.state.lock().unwrap();
530 let event_index = state.event_index;
531 state.event_index += 1;
532
533 if let Some(delta) = state.delta_list.as_mut() {
534 delta.push(event_index);
535 }
536
537 match &entry {
538 JournalEntry::UpdateMemoryRegionV1 { region, .. } => {
539 state.memory_map.insert(region.clone().into(), event_index);
540 }
541 JournalEntry::SetThreadV1 { id, .. } => {
542 state.staged_thread_map.insert(*id, event_index);
543 }
544 JournalEntry::CloseThreadV1 { id, .. } => {
545 state.staged_thread_map.remove(id);
546 }
547 JournalEntry::SnapshotV1 { .. } => {
548 state.thread_map = state.staged_thread_map.clone();
549 state.snapshots.push(event_index);
550 }
551 JournalEntry::ProcessExitV1 { .. } => {
552 state.clear_run_sub_events();
553 state.process_exit = Some(event_index);
554 }
555 JournalEntry::TtySetV1 { .. } => {
556 state.tty.replace(event_index);
557 }
558 JournalEntry::ChangeDirectoryV1 { .. } => {
559 state.chdir.replace(event_index);
560 }
561 JournalEntry::CreateEventV1 { fd, .. } => {
562 let lookup = state.insert_new_sub_events(event_index);
563 state.event_descriptors.insert(*fd, lookup);
564 }
565 JournalEntry::OpenFileDescriptorV1 {
566 fd, o_flags, path, ..
567 }
568 | JournalEntry::OpenFileDescriptorV2 {
569 fd, o_flags, path, ..
570 } => {
571 // Creating a file and erasing anything that was there before means
572 // the entire create branch that exists before this one can be ignored
573 let path = path.to_string();
574 if o_flags.contains(wasi::Oflags::CREATE) && o_flags.contains(wasi::Oflags::TRUNC) {
575 state.cancel_sub_events_by_path(path.as_ref());
576 }
577 // All file descriptors are opened in a suspect state which
578 // means if they are closed without modifying the file system
579 // then the events will be ignored.
580 let lookup = state.insert_new_sub_events(event_index);
581 state.set_path_for_sub_events(&lookup, path.as_ref());
582
583 // There is an exception to the rule which is if the create
584 // flag is specified its always recorded as a mutating operation
585 // because it may create a file that does not exist on the file system
586 if o_flags.contains(wasi::Oflags::CREATE) {
587 state.keep_descriptors.insert(*fd, lookup);
588 } else {
589 state.suspect_descriptors.insert(*fd, lookup);
590 }
591 }
592 // Things that modify a file descriptor mean that it is
593 // no longer suspect and thus it needs to be kept
594 JournalEntry::FileDescriptorAdviseV1 { fd, .. }
595 | JournalEntry::FileDescriptorAllocateV1 { fd, .. }
596 | JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
597 | JournalEntry::FileDescriptorWriteV1 { fd, .. }
598 | JournalEntry::FileDescriptorSetRightsV1 { fd, .. }
599 | JournalEntry::FileDescriptorSetSizeV1 { fd, .. } => {
600 // Its no longer suspect
601 if let Some(lookup) = state.suspect_descriptors.remove(fd) {
602 state.keep_descriptors.insert(*fd, lookup);
603 }
604
605 // Update the state
606 if let Some(state) = state
607 .find_sub_events(fd)
608 .and_then(|lookup| state.sub_events.get_mut(&lookup))
609 {
610 if let JournalEntry::FileDescriptorWriteV1 { offset, data, .. } = &entry {
611 state.write_map.insert(
612 MemoryRange {
613 start: *offset,
614 end: *offset + data.len() as u64,
615 },
616 event_index,
617 );
618 } else {
619 state.events.push(event_index);
620 }
621 }
622 }
623 // We keep non-mutable events for file descriptors that are suspect
624 JournalEntry::FileDescriptorSeekV1 { fd, .. }
625 | JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
626 | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. }
627 | JournalEntry::SocketBindV1 { fd, .. }
628 | JournalEntry::SocketSendFileV1 { socket_fd: fd, .. }
629 | JournalEntry::SocketSendToV1 { fd, .. }
630 | JournalEntry::SocketSendV1 { fd, .. }
631 | JournalEntry::SocketSetOptFlagV1 { fd, .. }
632 | JournalEntry::SocketSetOptSizeV1 { fd, .. }
633 | JournalEntry::SocketSetOptTimeV1 { fd, .. }
634 | JournalEntry::SocketShutdownV1 { fd, .. }
635 | JournalEntry::SocketListenV1 { fd, .. }
636 | JournalEntry::SocketJoinIpv4MulticastV1 { fd, .. }
637 | JournalEntry::SocketJoinIpv6MulticastV1 { fd, .. }
638 | JournalEntry::SocketLeaveIpv4MulticastV1 { fd, .. }
639 | JournalEntry::SocketLeaveIpv6MulticastV1 { fd, .. } => {
640 state.find_sub_events_and_append(fd, event_index);
641 }
642 // Closing a file can stop all the events from appearing in the
643 // journal at all
644 JournalEntry::CloseFileDescriptorV1 { fd } => {
645 if let Some(lookup) = state.open_sockets.remove(fd) {
646 state.sub_events.remove(&lookup);
647 } else if let Some(lookup) = state.accepted_sockets.remove(fd) {
648 state.sub_events.remove(&lookup);
649 } else if let Some(lookup) = state.open_pipes.remove(fd) {
650 state.sub_events.remove(&lookup);
651 } else if let Some(lookup) = state.suspect_descriptors.remove(fd) {
652 state.sub_events.remove(&lookup);
653 } else if let Some(lookup) = state.event_descriptors.remove(fd) {
654 state.sub_events.remove(&lookup);
655 } else if let Some(lookup) = state.epoll_descriptors.remove(fd) {
656 state.sub_events.remove(&lookup);
657 } else if let Some(lookup) = state.keep_descriptors.remove(fd) {
658 state.append_to_sub_events(&lookup, event_index);
659 state.kept_descriptors.push(lookup);
660 } else {
661 state.find_sub_events_and_append(fd, event_index);
662 }
663 }
664 // Duplicating the file descriptor
665 JournalEntry::DuplicateFileDescriptorV1 {
666 original_fd,
667 copied_fd,
668 }
669 | JournalEntry::DuplicateFileDescriptorV2 {
670 original_fd,
671 copied_fd,
672 ..
673 } => {
674 if let Some(lookup) = state.suspect_descriptors.get(original_fd).cloned() {
675 state.suspect_descriptors.insert(*copied_fd, lookup);
676 state.append_to_sub_events(&lookup, event_index);
677 } else if let Some(lookup) = state.keep_descriptors.get(original_fd).cloned() {
678 state.keep_descriptors.insert(*copied_fd, lookup);
679 state.append_to_sub_events(&lookup, event_index);
680 } else if let Some(lookup) = state.stdio_descriptors.get(original_fd).cloned() {
681 state.stdio_descriptors.insert(*copied_fd, lookup);
682 state.append_to_sub_events(&lookup, event_index);
683 } else if let Some(lookup) = state.open_pipes.get(original_fd).cloned() {
684 state.open_pipes.insert(*copied_fd, lookup);
685 state.append_to_sub_events(&lookup, event_index);
686 } else if let Some(lookup) = state.open_sockets.get(original_fd).cloned() {
687 state.open_sockets.insert(*copied_fd, lookup);
688 state.append_to_sub_events(&lookup, event_index);
689 } else if let Some(lookup) = state.accepted_sockets.get(original_fd).cloned() {
690 state.accepted_sockets.insert(*copied_fd, lookup);
691 state.append_to_sub_events(&lookup, event_index);
692 } else if let Some(lookup) = state.event_descriptors.get(original_fd).cloned() {
693 state.event_descriptors.insert(*copied_fd, lookup);
694 state.append_to_sub_events(&lookup, event_index);
695 }
696 }
697 // Renumbered file descriptors will retain their suspect status
698 JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
699 if let Some(lookup) = state.suspect_descriptors.remove(old_fd) {
700 state.suspect_descriptors.insert(*new_fd, lookup);
701 state.append_to_sub_events(&lookup, event_index);
702 } else if let Some(lookup) = state.keep_descriptors.remove(old_fd) {
703 state.keep_descriptors.insert(*new_fd, lookup);
704 state.append_to_sub_events(&lookup, event_index);
705 } else if let Some(lookup) = state.stdio_descriptors.remove(old_fd) {
706 state.stdio_descriptors.insert(*new_fd, lookup);
707 state.append_to_sub_events(&lookup, event_index);
708 } else if let Some(lookup) = state.open_pipes.remove(old_fd) {
709 state.open_pipes.insert(*new_fd, lookup);
710 state.append_to_sub_events(&lookup, event_index);
711 } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
712 state.open_sockets.insert(*new_fd, lookup);
713 state.append_to_sub_events(&lookup, event_index);
714 } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
715 state.accepted_sockets.insert(*new_fd, lookup);
716 state.append_to_sub_events(&lookup, event_index);
717 } else if let Some(lookup) = state.event_descriptors.remove(old_fd) {
718 state.event_descriptors.insert(*new_fd, lookup);
719 state.append_to_sub_events(&lookup, event_index);
720 }
721 }
722 // Creating a new directory only needs to be done once
723 JournalEntry::CreateDirectoryV1 { path, .. } => {
724 let path = path.to_string();
725
726 // Newly created directories are stored as a set of .
727 #[allow(clippy::map_entry)]
728 if !state.create_directory.contains_key(&path) {
729 let lookup = state.insert_new_sub_events(event_index);
730 state.set_path_for_sub_events(&lookup, &path);
731 state.create_directory.insert(path, lookup);
732 };
733 }
734 // Deleting a directory only needs to be done once
735 JournalEntry::RemoveDirectoryV1 { path, .. } => {
736 let path = path.to_string();
737 state.create_directory.remove(&path);
738 state.remove_directory.insert(path, event_index);
739 }
740 // Unlinks the file from the file system
741 JournalEntry::UnlinkFileV1 { path, .. } => {
742 state.cancel_sub_events_by_path(path.as_ref());
743 state.unlink_file.insert(path.to_string(), event_index);
744 }
745 // Renames may update some of the tracking functions
746 JournalEntry::PathRenameV1 {
747 old_path, new_path, ..
748 } => {
749 state.solidify_sub_events_by_path(old_path.as_ref());
750 state.cancel_sub_events_by_path(new_path.as_ref());
751 state.whitelist.insert(event_index);
752 }
753 // Update all the directory operations
754 JournalEntry::PathSetTimesV1 { path, .. } => {
755 let path = path.to_string();
756 if let Some(lookup) = state.create_directory.get(&path).cloned() {
757 state.append_to_sub_events(&lookup, event_index);
758 } else if !state.remove_directory.contains_key(&path) {
759 state.whitelist.insert(event_index);
760 }
761 }
762 // Pipes that remain open at the end will be added
763 JournalEntry::CreatePipeV1 { read_fd, write_fd } => {
764 let lookup = state.insert_new_sub_events(event_index);
765 state.open_pipes.insert(*read_fd, lookup);
766 state.open_pipes.insert(*write_fd, lookup);
767 }
768 // Epoll events
769 JournalEntry::EpollCreateV1 { fd } => {
770 let lookup = state.insert_new_sub_events(event_index);
771 state.epoll_descriptors.insert(*fd, lookup);
772 }
773 JournalEntry::EpollCtlV1 { epfd, fd, .. } => {
774 if state.find_sub_events(fd).is_some() {
775 state.find_sub_events_and_append(epfd, event_index);
776 }
777 }
778 JournalEntry::SocketConnectedV1 { fd, .. } => {
779 let lookup = state.insert_new_sub_events(event_index);
780 state.accepted_sockets.insert(*fd, lookup);
781 }
782 // Sockets that are accepted are suspect
783 JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::SocketOpenV1 { fd, .. } => {
784 let lookup = state.insert_new_sub_events(event_index);
785 state.open_sockets.insert(*fd, lookup);
786 }
787 JournalEntry::SocketPairV1 { fd1, fd2 } => {
788 let lookup = state.insert_new_sub_events(event_index);
789 state.open_sockets.insert(*fd1, lookup);
790 state.open_sockets.insert(*fd2, lookup);
791 }
792 JournalEntry::InitModuleV1 { .. } => {
793 state.clear_run_sub_events();
794 state.init_module = Some(event_index);
795 }
796 JournalEntry::ClearEtherealV1 => {
797 state.clear_run_sub_events();
798 }
799 JournalEntry::SetClockTimeV1 { .. }
800 | JournalEntry::PortAddAddrV1 { .. }
801 | JournalEntry::PortDelAddrV1 { .. }
802 | JournalEntry::PortAddrClearV1
803 | JournalEntry::PortBridgeV1 { .. }
804 | JournalEntry::PortUnbridgeV1
805 | JournalEntry::PortDhcpAcquireV1
806 | JournalEntry::PortGatewaySetV1 { .. }
807 | JournalEntry::PortRouteAddV1 { .. }
808 | JournalEntry::PortRouteClearV1
809 | JournalEntry::PortRouteDelV1 { .. }
810 | JournalEntry::CreateSymbolicLinkV1 { .. }
811 | JournalEntry::CreateHardLinkV1 { .. } => {
812 state.whitelist.insert(event_index);
813 }
814 }
815 state.inner_tx.write(entry)
816 }
817
818 fn flush(&self) -> anyhow::Result<()> {
819 self.state.lock().unwrap().inner_tx.flush()
820 }
821
822 fn commit(&self) -> anyhow::Result<usize> {
823 self.state.lock().unwrap().inner_tx.commit()
824 }
825
826 fn rollback(&self) -> anyhow::Result<usize> {
827 self.state.lock().unwrap().inner_tx.rollback()
828 }
829}
830
831impl CompactingJournal {
832 /// Compacts the inner journal into a new journal
833 pub fn compact_to<J>(&mut self, new_journal: J) -> anyhow::Result<CompactResult>
834 where
835 J: Journal,
836 {
837 self.tx.compact_to(new_journal)
838 }
839
840 pub fn into_split(self) -> (CompactingJournalTx, CompactingJournalRx) {
841 (self.tx, self.rx)
842 }
843
844 pub fn replace_inner<J: Journal>(&mut self, inner: J) {
845 let (inner_tx, inner_rx) = inner.split();
846 let inner_rx_restarted = inner_rx.as_restarted().unwrap();
847
848 self.tx
849 .replace_inner(RecombinedJournal::new(inner_tx, inner_rx));
850 self.rx.inner = inner_rx_restarted;
851 }
852}
853
854impl ReadableJournal for CompactingJournalRx {
855 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
856 self.inner.read()
857 }
858
859 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
860 self.inner.as_restarted()
861 }
862}
863
864impl WritableJournal for CompactingJournal {
865 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
866 self.tx.write(entry)
867 }
868
869 fn flush(&self) -> anyhow::Result<()> {
870 self.tx.flush()
871 }
872
873 fn commit(&self) -> anyhow::Result<usize> {
874 self.tx.commit()
875 }
876
877 fn rollback(&self) -> anyhow::Result<usize> {
878 self.tx.rollback()
879 }
880}
881
882impl ReadableJournal for CompactingJournal {
883 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
884 self.rx.read()
885 }
886
887 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
888 let state = self.tx.state.lock().unwrap();
889 state.inner_rx.as_restarted()
890 }
891}
892
893impl Journal for CompactingJournal {
894 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
895 (Box::new(self.tx), Box::new(self.rx))
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use std::borrow::Cow;
902
903 use super::*;
904
905 pub fn run_test<'a>(
906 in_records: Vec<JournalEntry<'a>>,
907 out_records: Vec<JournalEntry<'a>>,
908 ) -> anyhow::Result<()> {
909 // Build a journal that will store the records before compacting
910 let mut compacting_journal = CompactingJournal::new(BufferedJournal::default())?;
911 for record in in_records {
912 compacting_journal.write(record)?;
913 }
914
915 // Now we build a new one using the compactor
916 let new_journal = BufferedJournal::default();
917 compacting_journal.compact_to(new_journal)?;
918
919 // Read the records
920 let new_records = compacting_journal.as_restarted()?;
921 for record1 in out_records {
922 let record2 = new_records.read()?.map(|r| r.record);
923 assert_eq!(Some(record1), record2);
924 }
925 assert_eq!(
926 None,
927 new_records.read()?.map(|x| x.record),
928 "found unexpected extra records in the compacted journal"
929 );
930
931 Ok(())
932 }
933
934 // #[tracing_test::traced_test]
935 // #[test]
936 // pub fn test_compact_purge_duplicate_memory_writes() {
937 // run_test(
938 // vec![
939 // JournalEntry::UpdateMemoryRegionV1 {
940 // region: 0..16,
941 // data: [11u8; 16].to_vec().into(),
942 // },
943 // JournalEntry::UpdateMemoryRegionV1 {
944 // region: 0..16,
945 // data: [22u8; 16].to_vec().into(),
946 // },
947 // ],
948 // vec![JournalEntry::UpdateMemoryRegionV1 {
949 // region: 0..16,
950 // data: [22u8; 16].to_vec().into(),
951 // }],
952 // )
953 // .unwrap()
954 // }
955 //
956 // #[tracing_test::traced_test]
957 // #[test]
958 // pub fn test_compact_keep_overlapping_memory() {
959 // run_test(
960 // vec![
961 // JournalEntry::UpdateMemoryRegionV1 {
962 // region: 0..16,
963 // data: [11u8; 16].to_vec().into(),
964 // },
965 // JournalEntry::UpdateMemoryRegionV1 {
966 // region: 20..36,
967 // data: [22u8; 16].to_vec().into(),
968 // },
969 // ],
970 // vec![
971 // JournalEntry::UpdateMemoryRegionV1 {
972 // region: 0..16,
973 // data: [11u8; 16].to_vec().into(),
974 // },
975 // JournalEntry::UpdateMemoryRegionV1 {
976 // region: 20..36,
977 // data: [22u8; 16].to_vec().into(),
978 // },
979 // ],
980 // )
981 // .unwrap()
982 // }
983 //
984 // #[tracing_test::traced_test]
985 // #[test]
986 // pub fn test_compact_keep_adjacent_memory_writes() {
987 // run_test(
988 // vec![
989 // JournalEntry::UpdateMemoryRegionV1 {
990 // region: 0..16,
991 // data: [11u8; 16].to_vec().into(),
992 // },
993 // JournalEntry::UpdateMemoryRegionV1 {
994 // region: 16..32,
995 // data: [22u8; 16].to_vec().into(),
996 // },
997 // ],
998 // vec![
999 // JournalEntry::UpdateMemoryRegionV1 {
1000 // region: 0..16,
1001 // data: [11u8; 16].to_vec().into(),
1002 // },
1003 // JournalEntry::UpdateMemoryRegionV1 {
1004 // region: 16..32,
1005 // data: [22u8; 16].to_vec().into(),
1006 // },
1007 // ],
1008 // )
1009 // .unwrap()
1010 // }
1011 //
1012 // #[tracing_test::traced_test]
1013 // #[test]
1014 // pub fn test_compact_purge_identical_memory_writes() {
1015 // run_test(
1016 // vec![
1017 // JournalEntry::UpdateMemoryRegionV1 {
1018 // region: 0..16,
1019 // data: [11u8; 16].to_vec().into(),
1020 // },
1021 // JournalEntry::UpdateMemoryRegionV1 {
1022 // region: 0..16,
1023 // data: [11u8; 16].to_vec().into(),
1024 // },
1025 // ],
1026 // vec![JournalEntry::UpdateMemoryRegionV1 {
1027 // region: 0..16,
1028 // data: [11u8; 16].to_vec().into(),
1029 // }],
1030 // )
1031 // .unwrap()
1032 // }
1033 //
1034 // #[tracing_test::traced_test]
1035 // #[test]
1036 // pub fn test_compact_thread_stacks() {
1037 // run_test(
1038 // vec![
1039 // JournalEntry::SetThreadV1 {
1040 // id: 4321.into(),
1041 // call_stack: [44u8; 87].to_vec().into(),
1042 // memory_stack: [55u8; 34].to_vec().into(),
1043 // store_data: [66u8; 70].to_vec().into(),
1044 // is_64bit: true,
1045 // },
1046 // JournalEntry::SetThreadV1 {
1047 // id: 1234.into(),
1048 // call_stack: [11u8; 124].to_vec().into(),
1049 // memory_stack: [22u8; 51].to_vec().into(),
1050 // store_data: [33u8; 87].to_vec().into(),
1051 // is_64bit: true,
1052 // },
1053 // JournalEntry::SetThreadV1 {
1054 // id: 65.into(),
1055 // call_stack: [77u8; 34].to_vec().into(),
1056 // memory_stack: [88u8; 51].to_vec().into(),
1057 // store_data: [99u8; 12].to_vec().into(),
1058 // is_64bit: true,
1059 // },
1060 // JournalEntry::CloseThreadV1 {
1061 // id: 1234.into(),
1062 // exit_code: None,
1063 // },
1064 // ],
1065 // vec![
1066 // JournalEntry::SetThreadV1 {
1067 // id: 4321.into(),
1068 // call_stack: [44u8; 87].to_vec().into(),
1069 // memory_stack: [55u8; 34].to_vec().into(),
1070 // store_data: [66u8; 70].to_vec().into(),
1071 // is_64bit: true,
1072 // },
1073 // JournalEntry::SetThreadV1 {
1074 // id: 65.into(),
1075 // call_stack: [77u8; 34].to_vec().into(),
1076 // memory_stack: [88u8; 51].to_vec().into(),
1077 // store_data: [99u8; 12].to_vec().into(),
1078 // is_64bit: true,
1079 // },
1080 // ],
1081 // )
1082 // .unwrap()
1083 // }
1084 //
1085 // #[tracing_test::traced_test]
1086 // #[test]
1087 // pub fn test_compact_processed_exited() {
1088 // run_test(
1089 // vec![
1090 // JournalEntry::UpdateMemoryRegionV1 {
1091 // region: 0..16,
1092 // data: [11u8; 16].to_vec().into(),
1093 // },
1094 // JournalEntry::SetThreadV1 {
1095 // id: 4321.into(),
1096 // call_stack: [44u8; 87].to_vec().into(),
1097 // memory_stack: [55u8; 34].to_vec().into(),
1098 // store_data: [66u8; 70].to_vec().into(),
1099 // is_64bit: true,
1100 // },
1101 // JournalEntry::SnapshotV1 {
1102 // when: SystemTime::now(),
1103 // trigger: SnapshotTrigger::FirstListen,
1104 // },
1105 // JournalEntry::OpenFileDescriptorV1 {
1106 // fd: 1234,
1107 // dirfd: 3452345,
1108 // dirflags: 0,
1109 // path: "/blah".into(),
1110 // o_flags: wasi::Oflags::empty(),
1111 // fs_rights_base: wasi::Rights::all(),
1112 // fs_rights_inheriting: wasi::Rights::all(),
1113 // fs_flags: wasi::Fdflags::all(),
1114 // },
1115 // JournalEntry::ProcessExitV1 { exit_code: None },
1116 // ],
1117 // vec![JournalEntry::ProcessExitV1 { exit_code: None }],
1118 // )
1119 // .unwrap()
1120 // }
1121 //
1122 // #[tracing_test::traced_test]
1123 // #[test]
1124 // pub fn test_compact_file_system_partial_write_survives() {
1125 // run_test(
1126 // vec![
1127 // JournalEntry::OpenFileDescriptorV1 {
1128 // fd: 1234,
1129 // dirfd: 3452345,
1130 // dirflags: 0,
1131 // path: "/blah".into(),
1132 // o_flags: wasi::Oflags::empty(),
1133 // fs_rights_base: wasi::Rights::all(),
1134 // fs_rights_inheriting: wasi::Rights::all(),
1135 // fs_flags: wasi::Fdflags::all(),
1136 // },
1137 // JournalEntry::FileDescriptorWriteV1 {
1138 // fd: 1234,
1139 // offset: 1234,
1140 // data: [1u8; 16].to_vec().into(),
1141 // is_64bit: true,
1142 // },
1143 // ],
1144 // vec![
1145 // JournalEntry::OpenFileDescriptorV1 {
1146 // fd: 1234,
1147 // dirfd: 3452345,
1148 // dirflags: 0,
1149 // path: "/blah".into(),
1150 // o_flags: wasi::Oflags::empty(),
1151 // fs_rights_base: wasi::Rights::all(),
1152 // fs_rights_inheriting: wasi::Rights::all(),
1153 // fs_flags: wasi::Fdflags::all(),
1154 // },
1155 // JournalEntry::FileDescriptorWriteV1 {
1156 // fd: 1234,
1157 // offset: 1234,
1158 // data: [1u8; 16].to_vec().into(),
1159 // is_64bit: true,
1160 // },
1161 // ],
1162 // )
1163 // .unwrap()
1164 // }
1165 //
1166 // #[tracing_test::traced_test]
1167 // #[test]
1168 // pub fn test_compact_file_system_write_survives_close() {
1169 // run_test(
1170 // vec![
1171 // JournalEntry::OpenFileDescriptorV1 {
1172 // fd: 1234,
1173 // dirfd: 3452345,
1174 // dirflags: 0,
1175 // path: "/blah".into(),
1176 // o_flags: wasi::Oflags::empty(),
1177 // fs_rights_base: wasi::Rights::all(),
1178 // fs_rights_inheriting: wasi::Rights::all(),
1179 // fs_flags: wasi::Fdflags::all(),
1180 // },
1181 // JournalEntry::FileDescriptorWriteV1 {
1182 // fd: 1234,
1183 // offset: 1234,
1184 // data: [1u8; 16].to_vec().into(),
1185 // is_64bit: true,
1186 // },
1187 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1188 // ],
1189 // vec![
1190 // JournalEntry::OpenFileDescriptorV1 {
1191 // fd: 1234,
1192 // dirfd: 3452345,
1193 // dirflags: 0,
1194 // path: "/blah".into(),
1195 // o_flags: wasi::Oflags::empty(),
1196 // fs_rights_base: wasi::Rights::all(),
1197 // fs_rights_inheriting: wasi::Rights::all(),
1198 // fs_flags: wasi::Fdflags::all(),
1199 // },
1200 // JournalEntry::FileDescriptorWriteV1 {
1201 // fd: 1234,
1202 // offset: 1234,
1203 // data: [1u8; 16].to_vec().into(),
1204 // is_64bit: true,
1205 // },
1206 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1207 // ],
1208 // )
1209 // .unwrap()
1210 // }
1211 //
1212 // #[tracing_test::traced_test]
1213 // #[test]
1214 // pub fn test_compact_file_system_write_survives_exit() {
1215 // run_test(
1216 // vec![
1217 // JournalEntry::OpenFileDescriptorV1 {
1218 // fd: 1234,
1219 // dirfd: 3452345,
1220 // dirflags: 0,
1221 // path: "/blah".into(),
1222 // o_flags: wasi::Oflags::empty(),
1223 // fs_rights_base: wasi::Rights::all(),
1224 // fs_rights_inheriting: wasi::Rights::all(),
1225 // fs_flags: wasi::Fdflags::all(),
1226 // },
1227 // JournalEntry::FileDescriptorWriteV1 {
1228 // fd: 1234,
1229 // offset: 1234,
1230 // data: [1u8; 16].to_vec().into(),
1231 // is_64bit: true,
1232 // },
1233 // JournalEntry::ProcessExitV1 { exit_code: None },
1234 // ],
1235 // vec![
1236 // JournalEntry::OpenFileDescriptorV1 {
1237 // fd: 1234,
1238 // dirfd: 3452345,
1239 // dirflags: 0,
1240 // path: "/blah".into(),
1241 // o_flags: wasi::Oflags::empty(),
1242 // fs_rights_base: wasi::Rights::all(),
1243 // fs_rights_inheriting: wasi::Rights::all(),
1244 // fs_flags: wasi::Fdflags::all(),
1245 // },
1246 // JournalEntry::FileDescriptorWriteV1 {
1247 // fd: 1234,
1248 // offset: 1234,
1249 // data: [1u8; 16].to_vec().into(),
1250 // is_64bit: true,
1251 // },
1252 // JournalEntry::ProcessExitV1 { exit_code: None },
1253 // ],
1254 // )
1255 // .unwrap()
1256 // }
1257 //
1258 // #[tracing_test::traced_test]
1259 // #[test]
1260 // pub fn test_compact_file_system_read_is_ignored() {
1261 // run_test(
1262 // vec![
1263 // JournalEntry::OpenFileDescriptorV1 {
1264 // fd: 1234,
1265 // dirfd: 3452345,
1266 // dirflags: 0,
1267 // path: "/blah".into(),
1268 // o_flags: wasi::Oflags::empty(),
1269 // fs_rights_base: wasi::Rights::all(),
1270 // fs_rights_inheriting: wasi::Rights::all(),
1271 // fs_flags: wasi::Fdflags::all(),
1272 // },
1273 // JournalEntry::FileDescriptorSeekV1 {
1274 // fd: 1234,
1275 // offset: 1234,
1276 // whence: wasi::Whence::End,
1277 // },
1278 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1279 // ],
1280 // Vec::new(),
1281 // )
1282 // .unwrap()
1283 // }
1284 //
1285 // #[tracing_test::traced_test]
1286 // #[test]
1287 // pub fn test_compact_file_system_touch() {
1288 // run_test(
1289 // vec![
1290 // JournalEntry::OpenFileDescriptorV1 {
1291 // fd: 1234,
1292 // dirfd: 3452345,
1293 // dirflags: 0,
1294 // path: "/blah".into(),
1295 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1296 // fs_rights_base: wasi::Rights::all(),
1297 // fs_rights_inheriting: wasi::Rights::all(),
1298 // fs_flags: wasi::Fdflags::all(),
1299 // },
1300 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1301 // JournalEntry::ProcessExitV1 { exit_code: None },
1302 // ],
1303 // vec![
1304 // JournalEntry::OpenFileDescriptorV1 {
1305 // fd: 1234,
1306 // dirfd: 3452345,
1307 // dirflags: 0,
1308 // path: "/blah".into(),
1309 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1310 // fs_rights_base: wasi::Rights::all(),
1311 // fs_rights_inheriting: wasi::Rights::all(),
1312 // fs_flags: wasi::Fdflags::all(),
1313 // },
1314 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1315 // JournalEntry::ProcessExitV1 { exit_code: None },
1316 // ],
1317 // )
1318 // .unwrap()
1319 // }
1320 //
1321 // #[tracing_test::traced_test]
1322 // #[test]
1323 // pub fn test_compact_file_system_redundant_file() {
1324 // run_test(
1325 // vec![
1326 // JournalEntry::OpenFileDescriptorV1 {
1327 // fd: 1234,
1328 // dirfd: 3452345,
1329 // dirflags: 0,
1330 // path: "/blah".into(),
1331 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1332 // fs_rights_base: wasi::Rights::all(),
1333 // fs_rights_inheriting: wasi::Rights::all(),
1334 // fs_flags: wasi::Fdflags::all(),
1335 // },
1336 // JournalEntry::FileDescriptorWriteV1 {
1337 // fd: 1234,
1338 // offset: 1234,
1339 // data: [5u8; 16].to_vec().into(),
1340 // is_64bit: true,
1341 // },
1342 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1343 // JournalEntry::OpenFileDescriptorV1 {
1344 // fd: 1235,
1345 // dirfd: 3452345,
1346 // dirflags: 0,
1347 // path: "/blah".into(),
1348 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1349 // fs_rights_base: wasi::Rights::all(),
1350 // fs_rights_inheriting: wasi::Rights::all(),
1351 // fs_flags: wasi::Fdflags::all(),
1352 // },
1353 // JournalEntry::FileDescriptorWriteV1 {
1354 // fd: 1235,
1355 // offset: 1234,
1356 // data: [6u8; 16].to_vec().into(),
1357 // is_64bit: true,
1358 // },
1359 // JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1360 // ],
1361 // vec![
1362 // JournalEntry::OpenFileDescriptorV1 {
1363 // fd: 1235,
1364 // dirfd: 3452345,
1365 // dirflags: 0,
1366 // path: "/blah".into(),
1367 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1368 // fs_rights_base: wasi::Rights::all(),
1369 // fs_rights_inheriting: wasi::Rights::all(),
1370 // fs_flags: wasi::Fdflags::all(),
1371 // },
1372 // JournalEntry::FileDescriptorWriteV1 {
1373 // fd: 1235,
1374 // offset: 1234,
1375 // data: [6u8; 16].to_vec().into(),
1376 // is_64bit: true,
1377 // },
1378 // JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1379 // ],
1380 // )
1381 // .unwrap()
1382 // }
1383 //
1384 // #[tracing_test::traced_test]
1385 // #[test]
1386 // pub fn test_compact_file_system_ignore_double_writes() {
1387 // run_test(
1388 // vec![
1389 // JournalEntry::OpenFileDescriptorV1 {
1390 // fd: 1234,
1391 // dirfd: 3452345,
1392 // dirflags: 0,
1393 // path: "/blah".into(),
1394 // o_flags: wasi::Oflags::empty(),
1395 // fs_rights_base: wasi::Rights::all(),
1396 // fs_rights_inheriting: wasi::Rights::all(),
1397 // fs_flags: wasi::Fdflags::all(),
1398 // },
1399 // JournalEntry::FileDescriptorWriteV1 {
1400 // fd: 1234,
1401 // offset: 1234,
1402 // data: [1u8; 16].to_vec().into(),
1403 // is_64bit: true,
1404 // },
1405 // JournalEntry::FileDescriptorWriteV1 {
1406 // fd: 1234,
1407 // offset: 1234,
1408 // data: [5u8; 16].to_vec().into(),
1409 // is_64bit: true,
1410 // },
1411 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1412 // ],
1413 // vec![
1414 // JournalEntry::OpenFileDescriptorV1 {
1415 // fd: 1234,
1416 // dirfd: 3452345,
1417 // dirflags: 0,
1418 // path: "/blah".into(),
1419 // o_flags: wasi::Oflags::empty(),
1420 // fs_rights_base: wasi::Rights::all(),
1421 // fs_rights_inheriting: wasi::Rights::all(),
1422 // fs_flags: wasi::Fdflags::all(),
1423 // },
1424 // JournalEntry::FileDescriptorWriteV1 {
1425 // fd: 1234,
1426 // offset: 1234,
1427 // data: [5u8; 16].to_vec().into(),
1428 // is_64bit: true,
1429 // },
1430 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1431 // ],
1432 // )
1433 // .unwrap()
1434 // }
1435 //
1436 // #[tracing_test::traced_test]
1437 // #[test]
1438 // pub fn test_compact_file_system_create_directory() {
1439 // run_test(
1440 // vec![JournalEntry::CreateDirectoryV1 {
1441 // fd: 1234,
1442 // path: "/blah".into(),
1443 // }],
1444 // vec![JournalEntry::CreateDirectoryV1 {
1445 // fd: 1234,
1446 // path: "/blah".into(),
1447 // }],
1448 // )
1449 // .unwrap()
1450 // }
1451 //
1452 // #[tracing_test::traced_test]
1453 // #[test]
1454 // pub fn test_compact_file_system_redundant_create_directory() {
1455 // run_test(
1456 // vec![
1457 // JournalEntry::CreateDirectoryV1 {
1458 // fd: 1234,
1459 // path: "/blah".into(),
1460 // },
1461 // JournalEntry::CreateDirectoryV1 {
1462 // fd: 1235,
1463 // path: "/blah".into(),
1464 // },
1465 // ],
1466 // vec![JournalEntry::CreateDirectoryV1 {
1467 // fd: 1234,
1468 // path: "/blah".into(),
1469 // }],
1470 // )
1471 // .unwrap()
1472 // }
1473 //
1474 // #[tracing_test::traced_test]
1475 // #[test]
1476 // pub fn test_compact_duplicate_tty() {
1477 // run_test(
1478 // vec![
1479 // JournalEntry::TtySetV1 {
1480 // tty: Tty {
1481 // cols: 123,
1482 // rows: 65,
1483 // width: 2341,
1484 // height: 573457,
1485 // stdin_tty: true,
1486 // stdout_tty: true,
1487 // stderr_tty: true,
1488 // echo: true,
1489 // line_buffered: true,
1490 // },
1491 // line_feeds: true,
1492 // },
1493 // JournalEntry::TtySetV1 {
1494 // tty: Tty {
1495 // cols: 12,
1496 // rows: 65,
1497 // width: 2341,
1498 // height: 573457,
1499 // stdin_tty: true,
1500 // stdout_tty: false,
1501 // stderr_tty: true,
1502 // echo: true,
1503 // line_buffered: true,
1504 // },
1505 // line_feeds: true,
1506 // },
1507 // ],
1508 // vec![JournalEntry::TtySetV1 {
1509 // tty: Tty {
1510 // cols: 12,
1511 // rows: 65,
1512 // width: 2341,
1513 // height: 573457,
1514 // stdin_tty: true,
1515 // stdout_tty: false,
1516 // stderr_tty: true,
1517 // echo: true,
1518 // line_buffered: true,
1519 // },
1520 // line_feeds: true,
1521 // }],
1522 // )
1523 // .unwrap()
1524 // }
1525
1526 #[tracing_test::traced_test]
1527 #[test]
1528 pub fn test_compact_close_sockets() {
1529 let fd = 512;
1530 run_test(
1531 vec![
1532 JournalEntry::SocketConnectedV1 {
1533 fd,
1534 local_addr: "127.0.0.1:3333".parse().unwrap(),
1535 peer_addr: "127.0.0.1:9999".parse().unwrap(),
1536 },
1537 JournalEntry::SocketSendV1 {
1538 fd,
1539 data: Cow::Borrowed(b"123"),
1540 // flags: SiFlags,
1541 flags: Default::default(),
1542 is_64bit: false,
1543 },
1544 JournalEntry::SocketSendV1 {
1545 fd,
1546 data: Cow::Borrowed(b"123"),
1547 // flags: SiFlags,
1548 flags: Default::default(),
1549 is_64bit: false,
1550 },
1551 JournalEntry::SocketSendV1 {
1552 fd,
1553 data: Cow::Borrowed(b"456"),
1554 // flags: SiFlags,
1555 flags: Default::default(),
1556 is_64bit: false,
1557 },
1558 JournalEntry::CloseFileDescriptorV1 { fd: 512 },
1559 ],
1560 vec![],
1561 )
1562 .unwrap()
1563 }
1564}