wasmer_journal/concrete/
compacting.rs

1use std::{
2    collections::{HashMap, HashSet},
3    ops::{DerefMut, Range},
4    sync::{Arc, Mutex},
5};
6use wasmer_wasix_types::wasi;
7
8use super::*;
9
10pub type Fd = u32;
11
12/// Subgroup of events that may or may not be retained in the
13/// final journal as it is compacted.
14///
15/// By grouping events into subevents it makes it possible to ignore an
16/// entire subgroup of events which are superseeded by a later event. For
17/// example, all the events involved in creating a file are irrelevant if
18/// that file is later deleted.
19#[derive(Debug, Default)]
20struct SubGroupOfevents {
21    /// List of all the events that will be transferred over
22    /// to the compacted journal if this sub group is selected
23    /// to be carried over
24    events: Vec<usize>,
25    /// The path metadata attached to this sub group of events
26    /// is used to discard all subgroups related to a particular
27    /// path of a file or directory. This is especially important
28    /// if that file is later deleted and hence all the events
29    /// related to it are no longer relevant
30    path: Option<String>,
31    /// The write map allows the ccompacted to only keep the
32    /// events relevant to the final outcome of a compacted
33    /// journal rather than written regions that are later
34    /// overridden. This is a crude write map that does not
35    /// deal with overlapping writes (they still remain)
36    /// However in the majority of cases this will remove
37    /// duplicates while retaining a simple implementation
38    write_map: HashMap<MemoryRange, usize>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42struct MemoryRange {
43    start: u64,
44    end: u64,
45}
46impl From<Range<u64>> for MemoryRange {
47    fn from(value: Range<u64>) -> Self {
48        Self {
49            start: value.start,
50            end: value.end,
51        }
52    }
53}
54
55/// Index of a group of subevents in the journal which relate to a particular
56/// collective impact. For example. Creating a new file which may consist of
57/// an event to open a file, the events for writing the file data and the
58/// closing of the file are all related to a group of sub events that make
59/// up the act of creating that file. During compaction these events
60/// will be grouped together so they can be retained or discarded based
61/// on the final deterministic outcome of the entire log.
62///
63/// By grouping events into subevents it makes it possible to ignore an
64/// entire subgroup of events which are superceded by a later event. For
65/// example, all the events involved in creating a file are irrelevant if
66/// that file is later deleted.
67#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
68struct SubGroupIndex(u64);
69
70#[derive(Debug)]
71struct State {
72    /// The descriptor seed is used generate descriptor lookups
73    descriptor_seed: u64,
74    // We maintain a memory map of the events that are significant
75    memory_map: HashMap<MemoryRange, usize>,
76    // List of all the snapshots
77    snapshots: Vec<usize>,
78    // Last tty event thats been set
79    tty: Option<usize>,
80    // The last change directory event
81    chdir: Option<usize>,
82    // Last exit that signals the exiting of the process
83    process_exit: Option<usize>,
84    // Last event that initialized the module
85    init_module: Option<usize>,
86    // Events that create a particular directory
87    create_directory: HashMap<String, SubGroupIndex>,
88    // Events that remove a particular directory
89    remove_directory: HashMap<String, usize>,
90    // Events that unlink a file
91    unlink_file: HashMap<String, usize>,
92    // Thread events are only maintained while the thread and the
93    // process are still running
94    thread_map: HashMap<u32, usize>,
95    // Thread events are only maintained while the thread and the
96    // process are still running
97    staged_thread_map: HashMap<u32, usize>,
98    // Sockets that are open and not yet closed are kept here
99    open_sockets: HashMap<Fd, SubGroupIndex>,
100    // Sockets that are open and not yet closed are kept here
101    accepted_sockets: HashMap<Fd, SubGroupIndex>,
102    // Open pipes have two file descriptors that are associated with
103    // them. We keep track of both of them
104    open_pipes: HashMap<Fd, SubGroupIndex>,
105    // Any descriptors are assumed to be read only operations until
106    // they actually do something that changes the system
107    suspect_descriptors: HashMap<Fd, SubGroupIndex>,
108    // Any descriptors are assumed to be read only operations until
109    // they actually do something that changes the system
110    keep_descriptors: HashMap<Fd, SubGroupIndex>,
111    kept_descriptors: Vec<SubGroupIndex>,
112    // We put the IO related to stdio into a special list
113    // which can be purged when the program exits as its no longer
114    // important.
115    stdio_descriptors: HashMap<Fd, SubGroupIndex>,
116    // Event objects handle events from other parts of the process
117    // and feed them to a processing thread
118    event_descriptors: HashMap<Fd, SubGroupIndex>,
119    // Epoll events
120    epoll_descriptors: HashMap<Fd, SubGroupIndex>,
121    // We abstract the descriptor state so that multiple file descriptors
122    // can refer to the same file descriptors
123    sub_events: HashMap<SubGroupIndex, SubGroupOfevents>,
124    // Everything that will be retained during the next compact
125    whitelist: HashSet<usize>,
126    // We use an event index to track what to keep
127    event_index: usize,
128    // The delta list is used for all the events that happened
129    // after a compact started
130    delta_list: Option<Vec<usize>>,
131    // The inner journal that we will write to
132    inner_tx: Box<DynWritableJournal>,
133    // The inner journal that we read from
134    inner_rx: Box<DynReadableJournal>,
135}
136
137impl State {
138    fn create_filter<J>(
139        &self,
140        inner: J,
141    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
142    where
143        J: Journal,
144    {
145        let (w, r) = inner.split();
146        self.create_split_filter(w, r)
147    }
148
149    fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
150    where
151        W: WritableJournal,
152        R: ReadableJournal,
153    {
154        let mut filter = FilteredJournalBuilder::new()
155            .with_filter_events(self.whitelist.clone().into_iter().collect());
156
157        for event_index in self
158            .tty
159            .as_ref()
160            .into_iter()
161            .chain(self.chdir.as_ref().into_iter())
162            .chain(self.process_exit.as_ref().into_iter())
163            .chain(self.init_module.as_ref().into_iter())
164            .chain(self.snapshots.iter())
165            .chain(self.memory_map.values())
166            .chain(self.thread_map.values())
167            .chain(self.remove_directory.values())
168            .chain(self.unlink_file.values())
169            .cloned()
170        {
171            filter.add_event_to_whitelist(event_index);
172        }
173        for d in self
174            .create_directory
175            .values()
176            .filter_map(|l| self.sub_events.get(l))
177            .chain(
178                self.suspect_descriptors
179                    .values()
180                    .filter_map(|l| self.sub_events.get(l)),
181            )
182            .chain(
183                self.keep_descriptors
184                    .values()
185                    .filter_map(|l| self.sub_events.get(l)),
186            )
187            .chain(
188                self.kept_descriptors
189                    .iter()
190                    .filter_map(|l| self.sub_events.get(l)),
191            )
192            .chain(
193                self.open_sockets
194                    .values()
195                    .filter_map(|l| self.sub_events.get(l)),
196            )
197            .chain(
198                self.accepted_sockets
199                    .values()
200                    .filter_map(|l| self.sub_events.get(l)),
201            )
202            .chain(
203                self.event_descriptors
204                    .values()
205                    .filter_map(|l| self.sub_events.get(l)),
206            )
207            .chain(
208                self.epoll_descriptors
209                    .values()
210                    .filter_map(|l| self.sub_events.get(l)),
211            )
212            .chain(
213                self.open_pipes
214                    .values()
215                    .filter_map(|l| self.sub_events.get(l)),
216            )
217            .chain(
218                self.stdio_descriptors
219                    .values()
220                    .filter_map(|l| self.sub_events.get(l)),
221            )
222        {
223            for e in d.events.iter() {
224                filter.add_event_to_whitelist(*e);
225            }
226            for e in d.write_map.values() {
227                filter.add_event_to_whitelist(*e);
228            }
229        }
230        filter.build_split(writer, reader)
231    }
232
233    fn insert_new_sub_events_empty(&mut self) -> SubGroupIndex {
234        let lookup = SubGroupIndex(self.descriptor_seed);
235        self.descriptor_seed += 1;
236
237        self.sub_events.entry(lookup).or_default();
238
239        lookup
240    }
241
242    fn insert_new_sub_events(&mut self, event_index: usize) -> SubGroupIndex {
243        let lookup = SubGroupIndex(self.descriptor_seed);
244        self.descriptor_seed += 1;
245
246        self.sub_events
247            .entry(lookup)
248            .or_default()
249            .events
250            .push(event_index);
251
252        lookup
253    }
254
255    fn append_to_sub_events(&mut self, lookup: &SubGroupIndex, event_index: usize) {
256        if let Some(state) = self.sub_events.get_mut(lookup) {
257            state.events.push(event_index);
258        }
259    }
260
261    fn set_path_for_sub_events(&mut self, lookup: &SubGroupIndex, path: &str) {
262        if let Some(state) = self.sub_events.get_mut(lookup) {
263            state.path = Some(path.to_string());
264        }
265    }
266
267    fn cancel_sub_events_by_path(&mut self, path: &str) {
268        let test = Some(path.to_string());
269        self.sub_events.retain(|_, d| d.path != test);
270    }
271
272    fn solidify_sub_events_by_path(&mut self, path: &str) {
273        let test = Some(path.to_string());
274        self.sub_events
275            .iter_mut()
276            .filter(|(_, d)| d.path == test)
277            .for_each(|(_, d)| {
278                d.path.take();
279            })
280    }
281
282    fn find_sub_events(&self, fd: &u32) -> Option<SubGroupIndex> {
283        self.suspect_descriptors
284            .get(fd)
285            .cloned()
286            .or_else(|| self.open_sockets.get(fd).cloned())
287            .or_else(|| self.accepted_sockets.get(fd).cloned())
288            .or_else(|| self.open_pipes.get(fd).cloned())
289            .or_else(|| self.keep_descriptors.get(fd).cloned())
290            .or_else(|| self.event_descriptors.get(fd).cloned())
291            .or_else(|| self.stdio_descriptors.get(fd).cloned())
292    }
293
294    fn find_sub_events_and_append(&mut self, fd: &u32, event_index: usize) {
295        if let Some(lookup) = self.find_sub_events(fd) {
296            self.append_to_sub_events(&lookup, event_index);
297        }
298    }
299
300    fn clear_run_sub_events(&mut self) {
301        self.accepted_sockets.clear();
302        self.event_descriptors.clear();
303        self.memory_map.clear();
304        self.open_pipes.clear();
305        self.open_sockets.clear();
306        self.snapshots.clear();
307        self.staged_thread_map.clear();
308        self.stdio_descriptors.clear();
309        self.suspect_descriptors.clear();
310        self.thread_map.clear();
311        for i in 0..=2 {
312            let lookup = self.insert_new_sub_events_empty();
313            self.stdio_descriptors.insert(i, lookup);
314        }
315    }
316}
317
318/// Deduplicates memory and stacks to reduce the number of volume of
319/// log events sent to its inner capturer. Compacting the events occurs
320/// in line as the events are generated
321#[derive(Debug, Clone)]
322pub struct CompactingJournalTx {
323    state: Arc<Mutex<State>>,
324    compacting: Arc<Mutex<()>>,
325}
326
327#[derive(Debug)]
328pub struct CompactingJournalRx {
329    inner: Box<DynReadableJournal>,
330}
331
332impl CompactingJournalRx {
333    pub fn swap_inner(&mut self, mut with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
334        std::mem::swap(&mut self.inner, &mut with);
335        with
336    }
337}
338
339#[derive(Debug)]
340pub struct CompactingJournal {
341    tx: CompactingJournalTx,
342    rx: CompactingJournalRx,
343}
344
345impl CompactingJournal {
346    pub fn new<J>(inner: J) -> anyhow::Result<Self>
347    where
348        J: Journal,
349    {
350        let (tx, rx) = inner.split();
351        let mut state = State {
352            inner_tx: tx,
353            inner_rx: rx.as_restarted()?,
354            tty: None,
355            chdir: None,
356            process_exit: None,
357            init_module: None,
358            snapshots: Default::default(),
359            memory_map: Default::default(),
360            thread_map: Default::default(),
361            staged_thread_map: Default::default(),
362            open_sockets: Default::default(),
363            accepted_sockets: Default::default(),
364            open_pipes: Default::default(),
365            create_directory: Default::default(),
366            remove_directory: Default::default(),
367            unlink_file: Default::default(),
368            suspect_descriptors: Default::default(),
369            keep_descriptors: Default::default(),
370            kept_descriptors: Default::default(),
371            stdio_descriptors: Default::default(),
372            event_descriptors: Default::default(),
373            epoll_descriptors: Default::default(),
374            descriptor_seed: 0,
375            sub_events: Default::default(),
376            whitelist: Default::default(),
377            delta_list: None,
378            event_index: 0,
379        };
380        // stdio FDs are always created for a process initially, fill them out here
381        for i in 0..=2 {
382            let lookup = state.insert_new_sub_events_empty();
383            state.stdio_descriptors.insert(i, lookup);
384        }
385        Ok(Self {
386            tx: CompactingJournalTx {
387                state: Arc::new(Mutex::new(state)),
388                compacting: Arc::new(Mutex::new(())),
389            },
390            rx: CompactingJournalRx { inner: rx },
391        })
392    }
393
394    /// Creates a filter jounral which will write all
395    /// its events to an inner journal
396    pub fn create_filter<J>(
397        &self,
398        inner: J,
399    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
400    where
401        J: Journal,
402    {
403        self.tx.create_filter(inner)
404    }
405
406    /// Creates a filter journal which will write all
407    /// its events to writer and readers supplied
408    pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
409    where
410        W: WritableJournal,
411        R: ReadableJournal,
412    {
413        self.tx.create_split_filter(writer, reader)
414    }
415}
416
417/// Represents the results of a compaction operation
418#[derive(Debug, Default)]
419pub struct CompactResult {
420    pub total_size: u64,
421    pub total_events: usize,
422}
423
424impl CompactingJournalTx {
425    pub fn create_filter<J>(
426        &self,
427        inner: J,
428    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
429    where
430        J: Journal,
431    {
432        let state = self.state.lock().unwrap();
433        state.create_filter(inner)
434    }
435
436    pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
437    where
438        W: WritableJournal,
439        R: ReadableJournal,
440    {
441        let state = self.state.lock().unwrap();
442        state.create_split_filter(writer, reader)
443    }
444
445    pub fn swap(&self, other: Self) -> Self {
446        let mut state1 = self.state.lock().unwrap();
447        let mut state2 = other.state.lock().unwrap();
448        std::mem::swap(state1.deref_mut(), state2.deref_mut());
449        drop(state1);
450        drop(state2);
451        other
452    }
453
454    /// Compacts the inner journal into a new journal
455    pub fn compact_to<J>(&self, new_journal: J) -> anyhow::Result<CompactResult>
456    where
457        J: Journal,
458    {
459        // Enter a compacting lock
460        let _guard = self.compacting.lock().unwrap();
461
462        // The first thing we do is create a filter that we
463        // place around the new journal so that it only receives new events
464        let (new_journal, replay_rx) = {
465            let mut state = self.state.lock().unwrap();
466            state.delta_list.replace(Default::default());
467            (
468                state.create_filter(new_journal),
469                state.inner_rx.as_restarted()?,
470            )
471        };
472
473        let mut result = CompactResult::default();
474
475        // Read all the events and feed them into the filtered journal and then
476        // strip off the filter so that its a normal journal again
477        while let Some(entry) = replay_rx.read()? {
478            let res = new_journal.write(entry.into_inner())?;
479            if res.record_size() > 0 {
480                result.total_size += res.record_size();
481                result.total_events += 1;
482            }
483        }
484        let new_journal = new_journal.into_inner();
485
486        // We now go into a blocking situation which will freeze the journals
487        let mut state = self.state.lock().unwrap();
488
489        // Now we build a filtered journal which will pick up any events that were
490        // added which we did the compacting.
491        let new_journal = FilteredJournalBuilder::new()
492            .with_filter_events(
493                state
494                    .delta_list
495                    .take()
496                    .unwrap_or_default()
497                    .into_iter()
498                    .collect(),
499            )
500            .build(new_journal);
501
502        // Now we feed all the events into the new journal using the delta filter. After the
503        // extra events are added we strip off the filter again
504        let replay_rx = state.inner_rx.as_restarted()?;
505        while let Some(entry) = replay_rx.read()? {
506            new_journal.write(entry.into_inner())?;
507        }
508        let new_journal = new_journal.into_inner();
509
510        // Now we install the new journal
511        let (mut tx, mut rx) = new_journal.split();
512        std::mem::swap(&mut state.inner_tx, &mut tx);
513        std::mem::swap(&mut state.inner_rx, &mut rx);
514
515        Ok(result)
516    }
517
518    pub fn replace_inner<J: Journal>(&self, inner: J) {
519        let mut state = self.state.lock().unwrap();
520        let (mut tx, mut rx) = inner.split();
521        std::mem::swap(&mut state.inner_tx, &mut tx);
522        std::mem::swap(&mut state.inner_rx, &mut rx);
523    }
524}
525
526impl WritableJournal for CompactingJournalTx {
527    #[allow(clippy::assigning_clones)]
528    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
529        let mut state = self.state.lock().unwrap();
530        let event_index = state.event_index;
531        state.event_index += 1;
532
533        if let Some(delta) = state.delta_list.as_mut() {
534            delta.push(event_index);
535        }
536
537        match &entry {
538            JournalEntry::UpdateMemoryRegionV1 { region, .. } => {
539                state.memory_map.insert(region.clone().into(), event_index);
540            }
541            JournalEntry::SetThreadV1 { id, .. } => {
542                state.staged_thread_map.insert(*id, event_index);
543            }
544            JournalEntry::CloseThreadV1 { id, .. } => {
545                state.staged_thread_map.remove(id);
546            }
547            JournalEntry::SnapshotV1 { .. } => {
548                state.thread_map = state.staged_thread_map.clone();
549                state.snapshots.push(event_index);
550            }
551            JournalEntry::ProcessExitV1 { .. } => {
552                state.clear_run_sub_events();
553                state.process_exit = Some(event_index);
554            }
555            JournalEntry::TtySetV1 { .. } => {
556                state.tty.replace(event_index);
557            }
558            JournalEntry::ChangeDirectoryV1 { .. } => {
559                state.chdir.replace(event_index);
560            }
561            JournalEntry::CreateEventV1 { fd, .. } => {
562                let lookup = state.insert_new_sub_events(event_index);
563                state.event_descriptors.insert(*fd, lookup);
564            }
565            JournalEntry::OpenFileDescriptorV1 {
566                fd, o_flags, path, ..
567            }
568            | JournalEntry::OpenFileDescriptorV2 {
569                fd, o_flags, path, ..
570            } => {
571                // Creating a file and erasing anything that was there before means
572                // the entire create branch that exists before this one can be ignored
573                let path = path.to_string();
574                if o_flags.contains(wasi::Oflags::CREATE) && o_flags.contains(wasi::Oflags::TRUNC) {
575                    state.cancel_sub_events_by_path(path.as_ref());
576                }
577                // All file descriptors are opened in a suspect state which
578                // means if they are closed without modifying the file system
579                // then the events will be ignored.
580                let lookup = state.insert_new_sub_events(event_index);
581                state.set_path_for_sub_events(&lookup, path.as_ref());
582
583                // There is an exception to the rule which is if the create
584                // flag is specified its always recorded as a mutating operation
585                // because it may create a file that does not exist on the file system
586                if o_flags.contains(wasi::Oflags::CREATE) {
587                    state.keep_descriptors.insert(*fd, lookup);
588                } else {
589                    state.suspect_descriptors.insert(*fd, lookup);
590                }
591            }
592            // Things that modify a file descriptor mean that it is
593            // no longer suspect and thus it needs to be kept
594            JournalEntry::FileDescriptorAdviseV1 { fd, .. }
595            | JournalEntry::FileDescriptorAllocateV1 { fd, .. }
596            | JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
597            | JournalEntry::FileDescriptorWriteV1 { fd, .. }
598            | JournalEntry::FileDescriptorSetRightsV1 { fd, .. }
599            | JournalEntry::FileDescriptorSetSizeV1 { fd, .. } => {
600                // Its no longer suspect
601                if let Some(lookup) = state.suspect_descriptors.remove(fd) {
602                    state.keep_descriptors.insert(*fd, lookup);
603                }
604
605                // Update the state
606                if let Some(state) = state
607                    .find_sub_events(fd)
608                    .and_then(|lookup| state.sub_events.get_mut(&lookup))
609                {
610                    if let JournalEntry::FileDescriptorWriteV1 { offset, data, .. } = &entry {
611                        state.write_map.insert(
612                            MemoryRange {
613                                start: *offset,
614                                end: *offset + data.len() as u64,
615                            },
616                            event_index,
617                        );
618                    } else {
619                        state.events.push(event_index);
620                    }
621                }
622            }
623            // We keep non-mutable events for file descriptors that are suspect
624            JournalEntry::FileDescriptorSeekV1 { fd, .. }
625            | JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
626            | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. }
627            | JournalEntry::SocketBindV1 { fd, .. }
628            | JournalEntry::SocketSendFileV1 { socket_fd: fd, .. }
629            | JournalEntry::SocketSendToV1 { fd, .. }
630            | JournalEntry::SocketSendV1 { fd, .. }
631            | JournalEntry::SocketSetOptFlagV1 { fd, .. }
632            | JournalEntry::SocketSetOptSizeV1 { fd, .. }
633            | JournalEntry::SocketSetOptTimeV1 { fd, .. }
634            | JournalEntry::SocketShutdownV1 { fd, .. }
635            | JournalEntry::SocketListenV1 { fd, .. }
636            | JournalEntry::SocketJoinIpv4MulticastV1 { fd, .. }
637            | JournalEntry::SocketJoinIpv6MulticastV1 { fd, .. }
638            | JournalEntry::SocketLeaveIpv4MulticastV1 { fd, .. }
639            | JournalEntry::SocketLeaveIpv6MulticastV1 { fd, .. } => {
640                state.find_sub_events_and_append(fd, event_index);
641            }
642            // Closing a file can stop all the events from appearing in the
643            // journal at all
644            JournalEntry::CloseFileDescriptorV1 { fd } => {
645                if let Some(lookup) = state.open_sockets.remove(fd) {
646                    state.sub_events.remove(&lookup);
647                } else if let Some(lookup) = state.accepted_sockets.remove(fd) {
648                    state.sub_events.remove(&lookup);
649                } else if let Some(lookup) = state.open_pipes.remove(fd) {
650                    state.sub_events.remove(&lookup);
651                } else if let Some(lookup) = state.suspect_descriptors.remove(fd) {
652                    state.sub_events.remove(&lookup);
653                } else if let Some(lookup) = state.event_descriptors.remove(fd) {
654                    state.sub_events.remove(&lookup);
655                } else if let Some(lookup) = state.epoll_descriptors.remove(fd) {
656                    state.sub_events.remove(&lookup);
657                } else if let Some(lookup) = state.keep_descriptors.remove(fd) {
658                    state.append_to_sub_events(&lookup, event_index);
659                    state.kept_descriptors.push(lookup);
660                } else {
661                    state.find_sub_events_and_append(fd, event_index);
662                }
663            }
664            // Duplicating the file descriptor
665            JournalEntry::DuplicateFileDescriptorV1 {
666                original_fd,
667                copied_fd,
668            }
669            | JournalEntry::DuplicateFileDescriptorV2 {
670                original_fd,
671                copied_fd,
672                ..
673            } => {
674                if let Some(lookup) = state.suspect_descriptors.get(original_fd).cloned() {
675                    state.suspect_descriptors.insert(*copied_fd, lookup);
676                    state.append_to_sub_events(&lookup, event_index);
677                } else if let Some(lookup) = state.keep_descriptors.get(original_fd).cloned() {
678                    state.keep_descriptors.insert(*copied_fd, lookup);
679                    state.append_to_sub_events(&lookup, event_index);
680                } else if let Some(lookup) = state.stdio_descriptors.get(original_fd).cloned() {
681                    state.stdio_descriptors.insert(*copied_fd, lookup);
682                    state.append_to_sub_events(&lookup, event_index);
683                } else if let Some(lookup) = state.open_pipes.get(original_fd).cloned() {
684                    state.open_pipes.insert(*copied_fd, lookup);
685                    state.append_to_sub_events(&lookup, event_index);
686                } else if let Some(lookup) = state.open_sockets.get(original_fd).cloned() {
687                    state.open_sockets.insert(*copied_fd, lookup);
688                    state.append_to_sub_events(&lookup, event_index);
689                } else if let Some(lookup) = state.accepted_sockets.get(original_fd).cloned() {
690                    state.accepted_sockets.insert(*copied_fd, lookup);
691                    state.append_to_sub_events(&lookup, event_index);
692                } else if let Some(lookup) = state.event_descriptors.get(original_fd).cloned() {
693                    state.event_descriptors.insert(*copied_fd, lookup);
694                    state.append_to_sub_events(&lookup, event_index);
695                }
696            }
697            // Renumbered file descriptors will retain their suspect status
698            JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
699                if let Some(lookup) = state.suspect_descriptors.remove(old_fd) {
700                    state.suspect_descriptors.insert(*new_fd, lookup);
701                    state.append_to_sub_events(&lookup, event_index);
702                } else if let Some(lookup) = state.keep_descriptors.remove(old_fd) {
703                    state.keep_descriptors.insert(*new_fd, lookup);
704                    state.append_to_sub_events(&lookup, event_index);
705                } else if let Some(lookup) = state.stdio_descriptors.remove(old_fd) {
706                    state.stdio_descriptors.insert(*new_fd, lookup);
707                    state.append_to_sub_events(&lookup, event_index);
708                } else if let Some(lookup) = state.open_pipes.remove(old_fd) {
709                    state.open_pipes.insert(*new_fd, lookup);
710                    state.append_to_sub_events(&lookup, event_index);
711                } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
712                    state.open_sockets.insert(*new_fd, lookup);
713                    state.append_to_sub_events(&lookup, event_index);
714                } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
715                    state.accepted_sockets.insert(*new_fd, lookup);
716                    state.append_to_sub_events(&lookup, event_index);
717                } else if let Some(lookup) = state.event_descriptors.remove(old_fd) {
718                    state.event_descriptors.insert(*new_fd, lookup);
719                    state.append_to_sub_events(&lookup, event_index);
720                }
721            }
722            // Creating a new directory only needs to be done once
723            JournalEntry::CreateDirectoryV1 { path, .. } => {
724                let path = path.to_string();
725
726                // Newly created directories are stored as a set of .
727                #[allow(clippy::map_entry)]
728                if !state.create_directory.contains_key(&path) {
729                    let lookup = state.insert_new_sub_events(event_index);
730                    state.set_path_for_sub_events(&lookup, &path);
731                    state.create_directory.insert(path, lookup);
732                };
733            }
734            // Deleting a directory only needs to be done once
735            JournalEntry::RemoveDirectoryV1 { path, .. } => {
736                let path = path.to_string();
737                state.create_directory.remove(&path);
738                state.remove_directory.insert(path, event_index);
739            }
740            // Unlinks the file from the file system
741            JournalEntry::UnlinkFileV1 { path, .. } => {
742                state.cancel_sub_events_by_path(path.as_ref());
743                state.unlink_file.insert(path.to_string(), event_index);
744            }
745            // Renames may update some of the tracking functions
746            JournalEntry::PathRenameV1 {
747                old_path, new_path, ..
748            } => {
749                state.solidify_sub_events_by_path(old_path.as_ref());
750                state.cancel_sub_events_by_path(new_path.as_ref());
751                state.whitelist.insert(event_index);
752            }
753            // Update all the directory operations
754            JournalEntry::PathSetTimesV1 { path, .. } => {
755                let path = path.to_string();
756                if let Some(lookup) = state.create_directory.get(&path).cloned() {
757                    state.append_to_sub_events(&lookup, event_index);
758                } else if !state.remove_directory.contains_key(&path) {
759                    state.whitelist.insert(event_index);
760                }
761            }
762            // Pipes that remain open at the end will be added
763            JournalEntry::CreatePipeV1 { read_fd, write_fd } => {
764                let lookup = state.insert_new_sub_events(event_index);
765                state.open_pipes.insert(*read_fd, lookup);
766                state.open_pipes.insert(*write_fd, lookup);
767            }
768            // Epoll events
769            JournalEntry::EpollCreateV1 { fd } => {
770                let lookup = state.insert_new_sub_events(event_index);
771                state.epoll_descriptors.insert(*fd, lookup);
772            }
773            JournalEntry::EpollCtlV1 { epfd, fd, .. } => {
774                if state.find_sub_events(fd).is_some() {
775                    state.find_sub_events_and_append(epfd, event_index);
776                }
777            }
778            JournalEntry::SocketConnectedV1 { fd, .. } => {
779                let lookup = state.insert_new_sub_events(event_index);
780                state.accepted_sockets.insert(*fd, lookup);
781            }
782            // Sockets that are accepted are suspect
783            JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::SocketOpenV1 { fd, .. } => {
784                let lookup = state.insert_new_sub_events(event_index);
785                state.open_sockets.insert(*fd, lookup);
786            }
787            JournalEntry::SocketPairV1 { fd1, fd2 } => {
788                let lookup = state.insert_new_sub_events(event_index);
789                state.open_sockets.insert(*fd1, lookup);
790                state.open_sockets.insert(*fd2, lookup);
791            }
792            JournalEntry::InitModuleV1 { .. } => {
793                state.clear_run_sub_events();
794                state.init_module = Some(event_index);
795            }
796            JournalEntry::ClearEtherealV1 => {
797                state.clear_run_sub_events();
798            }
799            JournalEntry::SetClockTimeV1 { .. }
800            | JournalEntry::PortAddAddrV1 { .. }
801            | JournalEntry::PortDelAddrV1 { .. }
802            | JournalEntry::PortAddrClearV1
803            | JournalEntry::PortBridgeV1 { .. }
804            | JournalEntry::PortUnbridgeV1
805            | JournalEntry::PortDhcpAcquireV1
806            | JournalEntry::PortGatewaySetV1 { .. }
807            | JournalEntry::PortRouteAddV1 { .. }
808            | JournalEntry::PortRouteClearV1
809            | JournalEntry::PortRouteDelV1 { .. }
810            | JournalEntry::CreateSymbolicLinkV1 { .. }
811            | JournalEntry::CreateHardLinkV1 { .. } => {
812                state.whitelist.insert(event_index);
813            }
814        }
815        state.inner_tx.write(entry)
816    }
817
818    fn flush(&self) -> anyhow::Result<()> {
819        self.state.lock().unwrap().inner_tx.flush()
820    }
821
822    fn commit(&self) -> anyhow::Result<usize> {
823        self.state.lock().unwrap().inner_tx.commit()
824    }
825
826    fn rollback(&self) -> anyhow::Result<usize> {
827        self.state.lock().unwrap().inner_tx.rollback()
828    }
829}
830
831impl CompactingJournal {
832    /// Compacts the inner journal into a new journal
833    pub fn compact_to<J>(&mut self, new_journal: J) -> anyhow::Result<CompactResult>
834    where
835        J: Journal,
836    {
837        self.tx.compact_to(new_journal)
838    }
839
840    pub fn into_split(self) -> (CompactingJournalTx, CompactingJournalRx) {
841        (self.tx, self.rx)
842    }
843
844    pub fn replace_inner<J: Journal>(&mut self, inner: J) {
845        let (inner_tx, inner_rx) = inner.split();
846        let inner_rx_restarted = inner_rx.as_restarted().unwrap();
847
848        self.tx
849            .replace_inner(RecombinedJournal::new(inner_tx, inner_rx));
850        self.rx.inner = inner_rx_restarted;
851    }
852}
853
854impl ReadableJournal for CompactingJournalRx {
855    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
856        self.inner.read()
857    }
858
859    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
860        self.inner.as_restarted()
861    }
862}
863
864impl WritableJournal for CompactingJournal {
865    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
866        self.tx.write(entry)
867    }
868
869    fn flush(&self) -> anyhow::Result<()> {
870        self.tx.flush()
871    }
872
873    fn commit(&self) -> anyhow::Result<usize> {
874        self.tx.commit()
875    }
876
877    fn rollback(&self) -> anyhow::Result<usize> {
878        self.tx.rollback()
879    }
880}
881
882impl ReadableJournal for CompactingJournal {
883    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
884        self.rx.read()
885    }
886
887    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
888        let state = self.tx.state.lock().unwrap();
889        state.inner_rx.as_restarted()
890    }
891}
892
893impl Journal for CompactingJournal {
894    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
895        (Box::new(self.tx), Box::new(self.rx))
896    }
897}
898
899#[cfg(test)]
900mod tests {
901    use std::borrow::Cow;
902
903    use super::*;
904
905    pub fn run_test<'a>(
906        in_records: Vec<JournalEntry<'a>>,
907        out_records: Vec<JournalEntry<'a>>,
908    ) -> anyhow::Result<()> {
909        // Build a journal that will store the records before compacting
910        let mut compacting_journal = CompactingJournal::new(BufferedJournal::default())?;
911        for record in in_records {
912            compacting_journal.write(record)?;
913        }
914
915        // Now we build a new one using the compactor
916        let new_journal = BufferedJournal::default();
917        compacting_journal.compact_to(new_journal)?;
918
919        // Read the records
920        let new_records = compacting_journal.as_restarted()?;
921        for record1 in out_records {
922            let record2 = new_records.read()?.map(|r| r.record);
923            assert_eq!(Some(record1), record2);
924        }
925        assert_eq!(
926            None,
927            new_records.read()?.map(|x| x.record),
928            "found unexpected extra records in the compacted journal"
929        );
930
931        Ok(())
932    }
933
934    // #[tracing_test::traced_test]
935    // #[test]
936    // pub fn test_compact_purge_duplicate_memory_writes() {
937    //     run_test(
938    //         vec![
939    //             JournalEntry::UpdateMemoryRegionV1 {
940    //                 region: 0..16,
941    //                 data: [11u8; 16].to_vec().into(),
942    //             },
943    //             JournalEntry::UpdateMemoryRegionV1 {
944    //                 region: 0..16,
945    //                 data: [22u8; 16].to_vec().into(),
946    //             },
947    //         ],
948    //         vec![JournalEntry::UpdateMemoryRegionV1 {
949    //             region: 0..16,
950    //             data: [22u8; 16].to_vec().into(),
951    //         }],
952    //     )
953    //     .unwrap()
954    // }
955    //
956    // #[tracing_test::traced_test]
957    // #[test]
958    // pub fn test_compact_keep_overlapping_memory() {
959    //     run_test(
960    //         vec![
961    //             JournalEntry::UpdateMemoryRegionV1 {
962    //                 region: 0..16,
963    //                 data: [11u8; 16].to_vec().into(),
964    //             },
965    //             JournalEntry::UpdateMemoryRegionV1 {
966    //                 region: 20..36,
967    //                 data: [22u8; 16].to_vec().into(),
968    //             },
969    //         ],
970    //         vec![
971    //             JournalEntry::UpdateMemoryRegionV1 {
972    //                 region: 0..16,
973    //                 data: [11u8; 16].to_vec().into(),
974    //             },
975    //             JournalEntry::UpdateMemoryRegionV1 {
976    //                 region: 20..36,
977    //                 data: [22u8; 16].to_vec().into(),
978    //             },
979    //         ],
980    //     )
981    //     .unwrap()
982    // }
983    //
984    // #[tracing_test::traced_test]
985    // #[test]
986    // pub fn test_compact_keep_adjacent_memory_writes() {
987    //     run_test(
988    //         vec![
989    //             JournalEntry::UpdateMemoryRegionV1 {
990    //                 region: 0..16,
991    //                 data: [11u8; 16].to_vec().into(),
992    //             },
993    //             JournalEntry::UpdateMemoryRegionV1 {
994    //                 region: 16..32,
995    //                 data: [22u8; 16].to_vec().into(),
996    //             },
997    //         ],
998    //         vec![
999    //             JournalEntry::UpdateMemoryRegionV1 {
1000    //                 region: 0..16,
1001    //                 data: [11u8; 16].to_vec().into(),
1002    //             },
1003    //             JournalEntry::UpdateMemoryRegionV1 {
1004    //                 region: 16..32,
1005    //                 data: [22u8; 16].to_vec().into(),
1006    //             },
1007    //         ],
1008    //     )
1009    //     .unwrap()
1010    // }
1011    //
1012    // #[tracing_test::traced_test]
1013    // #[test]
1014    // pub fn test_compact_purge_identical_memory_writes() {
1015    //     run_test(
1016    //         vec![
1017    //             JournalEntry::UpdateMemoryRegionV1 {
1018    //                 region: 0..16,
1019    //                 data: [11u8; 16].to_vec().into(),
1020    //             },
1021    //             JournalEntry::UpdateMemoryRegionV1 {
1022    //                 region: 0..16,
1023    //                 data: [11u8; 16].to_vec().into(),
1024    //             },
1025    //         ],
1026    //         vec![JournalEntry::UpdateMemoryRegionV1 {
1027    //             region: 0..16,
1028    //             data: [11u8; 16].to_vec().into(),
1029    //         }],
1030    //     )
1031    //     .unwrap()
1032    // }
1033    //
1034    // #[tracing_test::traced_test]
1035    // #[test]
1036    // pub fn test_compact_thread_stacks() {
1037    //     run_test(
1038    //         vec![
1039    //             JournalEntry::SetThreadV1 {
1040    //                 id: 4321.into(),
1041    //                 call_stack: [44u8; 87].to_vec().into(),
1042    //                 memory_stack: [55u8; 34].to_vec().into(),
1043    //                 store_data: [66u8; 70].to_vec().into(),
1044    //                 is_64bit: true,
1045    //             },
1046    //             JournalEntry::SetThreadV1 {
1047    //                 id: 1234.into(),
1048    //                 call_stack: [11u8; 124].to_vec().into(),
1049    //                 memory_stack: [22u8; 51].to_vec().into(),
1050    //                 store_data: [33u8; 87].to_vec().into(),
1051    //                 is_64bit: true,
1052    //             },
1053    //             JournalEntry::SetThreadV1 {
1054    //                 id: 65.into(),
1055    //                 call_stack: [77u8; 34].to_vec().into(),
1056    //                 memory_stack: [88u8; 51].to_vec().into(),
1057    //                 store_data: [99u8; 12].to_vec().into(),
1058    //                 is_64bit: true,
1059    //             },
1060    //             JournalEntry::CloseThreadV1 {
1061    //                 id: 1234.into(),
1062    //                 exit_code: None,
1063    //             },
1064    //         ],
1065    //         vec![
1066    //             JournalEntry::SetThreadV1 {
1067    //                 id: 4321.into(),
1068    //                 call_stack: [44u8; 87].to_vec().into(),
1069    //                 memory_stack: [55u8; 34].to_vec().into(),
1070    //                 store_data: [66u8; 70].to_vec().into(),
1071    //                 is_64bit: true,
1072    //             },
1073    //             JournalEntry::SetThreadV1 {
1074    //                 id: 65.into(),
1075    //                 call_stack: [77u8; 34].to_vec().into(),
1076    //                 memory_stack: [88u8; 51].to_vec().into(),
1077    //                 store_data: [99u8; 12].to_vec().into(),
1078    //                 is_64bit: true,
1079    //             },
1080    //         ],
1081    //     )
1082    //     .unwrap()
1083    // }
1084    //
1085    // #[tracing_test::traced_test]
1086    // #[test]
1087    // pub fn test_compact_processed_exited() {
1088    //     run_test(
1089    //         vec![
1090    //             JournalEntry::UpdateMemoryRegionV1 {
1091    //                 region: 0..16,
1092    //                 data: [11u8; 16].to_vec().into(),
1093    //             },
1094    //             JournalEntry::SetThreadV1 {
1095    //                 id: 4321.into(),
1096    //                 call_stack: [44u8; 87].to_vec().into(),
1097    //                 memory_stack: [55u8; 34].to_vec().into(),
1098    //                 store_data: [66u8; 70].to_vec().into(),
1099    //                 is_64bit: true,
1100    //             },
1101    //             JournalEntry::SnapshotV1 {
1102    //                 when: SystemTime::now(),
1103    //                 trigger: SnapshotTrigger::FirstListen,
1104    //             },
1105    //             JournalEntry::OpenFileDescriptorV1 {
1106    //                 fd: 1234,
1107    //                 dirfd: 3452345,
1108    //                 dirflags: 0,
1109    //                 path: "/blah".into(),
1110    //                 o_flags: wasi::Oflags::empty(),
1111    //                 fs_rights_base: wasi::Rights::all(),
1112    //                 fs_rights_inheriting: wasi::Rights::all(),
1113    //                 fs_flags: wasi::Fdflags::all(),
1114    //             },
1115    //             JournalEntry::ProcessExitV1 { exit_code: None },
1116    //         ],
1117    //         vec![JournalEntry::ProcessExitV1 { exit_code: None }],
1118    //     )
1119    //     .unwrap()
1120    // }
1121    //
1122    // #[tracing_test::traced_test]
1123    // #[test]
1124    // pub fn test_compact_file_system_partial_write_survives() {
1125    //     run_test(
1126    //         vec![
1127    //             JournalEntry::OpenFileDescriptorV1 {
1128    //                 fd: 1234,
1129    //                 dirfd: 3452345,
1130    //                 dirflags: 0,
1131    //                 path: "/blah".into(),
1132    //                 o_flags: wasi::Oflags::empty(),
1133    //                 fs_rights_base: wasi::Rights::all(),
1134    //                 fs_rights_inheriting: wasi::Rights::all(),
1135    //                 fs_flags: wasi::Fdflags::all(),
1136    //             },
1137    //             JournalEntry::FileDescriptorWriteV1 {
1138    //                 fd: 1234,
1139    //                 offset: 1234,
1140    //                 data: [1u8; 16].to_vec().into(),
1141    //                 is_64bit: true,
1142    //             },
1143    //         ],
1144    //         vec![
1145    //             JournalEntry::OpenFileDescriptorV1 {
1146    //                 fd: 1234,
1147    //                 dirfd: 3452345,
1148    //                 dirflags: 0,
1149    //                 path: "/blah".into(),
1150    //                 o_flags: wasi::Oflags::empty(),
1151    //                 fs_rights_base: wasi::Rights::all(),
1152    //                 fs_rights_inheriting: wasi::Rights::all(),
1153    //                 fs_flags: wasi::Fdflags::all(),
1154    //             },
1155    //             JournalEntry::FileDescriptorWriteV1 {
1156    //                 fd: 1234,
1157    //                 offset: 1234,
1158    //                 data: [1u8; 16].to_vec().into(),
1159    //                 is_64bit: true,
1160    //             },
1161    //         ],
1162    //     )
1163    //     .unwrap()
1164    // }
1165    //
1166    // #[tracing_test::traced_test]
1167    // #[test]
1168    // pub fn test_compact_file_system_write_survives_close() {
1169    //     run_test(
1170    //         vec![
1171    //             JournalEntry::OpenFileDescriptorV1 {
1172    //                 fd: 1234,
1173    //                 dirfd: 3452345,
1174    //                 dirflags: 0,
1175    //                 path: "/blah".into(),
1176    //                 o_flags: wasi::Oflags::empty(),
1177    //                 fs_rights_base: wasi::Rights::all(),
1178    //                 fs_rights_inheriting: wasi::Rights::all(),
1179    //                 fs_flags: wasi::Fdflags::all(),
1180    //             },
1181    //             JournalEntry::FileDescriptorWriteV1 {
1182    //                 fd: 1234,
1183    //                 offset: 1234,
1184    //                 data: [1u8; 16].to_vec().into(),
1185    //                 is_64bit: true,
1186    //             },
1187    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1188    //         ],
1189    //         vec![
1190    //             JournalEntry::OpenFileDescriptorV1 {
1191    //                 fd: 1234,
1192    //                 dirfd: 3452345,
1193    //                 dirflags: 0,
1194    //                 path: "/blah".into(),
1195    //                 o_flags: wasi::Oflags::empty(),
1196    //                 fs_rights_base: wasi::Rights::all(),
1197    //                 fs_rights_inheriting: wasi::Rights::all(),
1198    //                 fs_flags: wasi::Fdflags::all(),
1199    //             },
1200    //             JournalEntry::FileDescriptorWriteV1 {
1201    //                 fd: 1234,
1202    //                 offset: 1234,
1203    //                 data: [1u8; 16].to_vec().into(),
1204    //                 is_64bit: true,
1205    //             },
1206    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1207    //         ],
1208    //     )
1209    //     .unwrap()
1210    // }
1211    //
1212    // #[tracing_test::traced_test]
1213    // #[test]
1214    // pub fn test_compact_file_system_write_survives_exit() {
1215    //     run_test(
1216    //         vec![
1217    //             JournalEntry::OpenFileDescriptorV1 {
1218    //                 fd: 1234,
1219    //                 dirfd: 3452345,
1220    //                 dirflags: 0,
1221    //                 path: "/blah".into(),
1222    //                 o_flags: wasi::Oflags::empty(),
1223    //                 fs_rights_base: wasi::Rights::all(),
1224    //                 fs_rights_inheriting: wasi::Rights::all(),
1225    //                 fs_flags: wasi::Fdflags::all(),
1226    //             },
1227    //             JournalEntry::FileDescriptorWriteV1 {
1228    //                 fd: 1234,
1229    //                 offset: 1234,
1230    //                 data: [1u8; 16].to_vec().into(),
1231    //                 is_64bit: true,
1232    //             },
1233    //             JournalEntry::ProcessExitV1 { exit_code: None },
1234    //         ],
1235    //         vec![
1236    //             JournalEntry::OpenFileDescriptorV1 {
1237    //                 fd: 1234,
1238    //                 dirfd: 3452345,
1239    //                 dirflags: 0,
1240    //                 path: "/blah".into(),
1241    //                 o_flags: wasi::Oflags::empty(),
1242    //                 fs_rights_base: wasi::Rights::all(),
1243    //                 fs_rights_inheriting: wasi::Rights::all(),
1244    //                 fs_flags: wasi::Fdflags::all(),
1245    //             },
1246    //             JournalEntry::FileDescriptorWriteV1 {
1247    //                 fd: 1234,
1248    //                 offset: 1234,
1249    //                 data: [1u8; 16].to_vec().into(),
1250    //                 is_64bit: true,
1251    //             },
1252    //             JournalEntry::ProcessExitV1 { exit_code: None },
1253    //         ],
1254    //     )
1255    //     .unwrap()
1256    // }
1257    //
1258    // #[tracing_test::traced_test]
1259    // #[test]
1260    // pub fn test_compact_file_system_read_is_ignored() {
1261    //     run_test(
1262    //         vec![
1263    //             JournalEntry::OpenFileDescriptorV1 {
1264    //                 fd: 1234,
1265    //                 dirfd: 3452345,
1266    //                 dirflags: 0,
1267    //                 path: "/blah".into(),
1268    //                 o_flags: wasi::Oflags::empty(),
1269    //                 fs_rights_base: wasi::Rights::all(),
1270    //                 fs_rights_inheriting: wasi::Rights::all(),
1271    //                 fs_flags: wasi::Fdflags::all(),
1272    //             },
1273    //             JournalEntry::FileDescriptorSeekV1 {
1274    //                 fd: 1234,
1275    //                 offset: 1234,
1276    //                 whence: wasi::Whence::End,
1277    //             },
1278    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1279    //         ],
1280    //         Vec::new(),
1281    //     )
1282    //     .unwrap()
1283    // }
1284    //
1285    // #[tracing_test::traced_test]
1286    // #[test]
1287    // pub fn test_compact_file_system_touch() {
1288    //     run_test(
1289    //         vec![
1290    //             JournalEntry::OpenFileDescriptorV1 {
1291    //                 fd: 1234,
1292    //                 dirfd: 3452345,
1293    //                 dirflags: 0,
1294    //                 path: "/blah".into(),
1295    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1296    //                 fs_rights_base: wasi::Rights::all(),
1297    //                 fs_rights_inheriting: wasi::Rights::all(),
1298    //                 fs_flags: wasi::Fdflags::all(),
1299    //             },
1300    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1301    //             JournalEntry::ProcessExitV1 { exit_code: None },
1302    //         ],
1303    //         vec![
1304    //             JournalEntry::OpenFileDescriptorV1 {
1305    //                 fd: 1234,
1306    //                 dirfd: 3452345,
1307    //                 dirflags: 0,
1308    //                 path: "/blah".into(),
1309    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1310    //                 fs_rights_base: wasi::Rights::all(),
1311    //                 fs_rights_inheriting: wasi::Rights::all(),
1312    //                 fs_flags: wasi::Fdflags::all(),
1313    //             },
1314    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1315    //             JournalEntry::ProcessExitV1 { exit_code: None },
1316    //         ],
1317    //     )
1318    //     .unwrap()
1319    // }
1320    //
1321    // #[tracing_test::traced_test]
1322    // #[test]
1323    // pub fn test_compact_file_system_redundant_file() {
1324    //     run_test(
1325    //         vec![
1326    //             JournalEntry::OpenFileDescriptorV1 {
1327    //                 fd: 1234,
1328    //                 dirfd: 3452345,
1329    //                 dirflags: 0,
1330    //                 path: "/blah".into(),
1331    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1332    //                 fs_rights_base: wasi::Rights::all(),
1333    //                 fs_rights_inheriting: wasi::Rights::all(),
1334    //                 fs_flags: wasi::Fdflags::all(),
1335    //             },
1336    //             JournalEntry::FileDescriptorWriteV1 {
1337    //                 fd: 1234,
1338    //                 offset: 1234,
1339    //                 data: [5u8; 16].to_vec().into(),
1340    //                 is_64bit: true,
1341    //             },
1342    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1343    //             JournalEntry::OpenFileDescriptorV1 {
1344    //                 fd: 1235,
1345    //                 dirfd: 3452345,
1346    //                 dirflags: 0,
1347    //                 path: "/blah".into(),
1348    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1349    //                 fs_rights_base: wasi::Rights::all(),
1350    //                 fs_rights_inheriting: wasi::Rights::all(),
1351    //                 fs_flags: wasi::Fdflags::all(),
1352    //             },
1353    //             JournalEntry::FileDescriptorWriteV1 {
1354    //                 fd: 1235,
1355    //                 offset: 1234,
1356    //                 data: [6u8; 16].to_vec().into(),
1357    //                 is_64bit: true,
1358    //             },
1359    //             JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1360    //         ],
1361    //         vec![
1362    //             JournalEntry::OpenFileDescriptorV1 {
1363    //                 fd: 1235,
1364    //                 dirfd: 3452345,
1365    //                 dirflags: 0,
1366    //                 path: "/blah".into(),
1367    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1368    //                 fs_rights_base: wasi::Rights::all(),
1369    //                 fs_rights_inheriting: wasi::Rights::all(),
1370    //                 fs_flags: wasi::Fdflags::all(),
1371    //             },
1372    //             JournalEntry::FileDescriptorWriteV1 {
1373    //                 fd: 1235,
1374    //                 offset: 1234,
1375    //                 data: [6u8; 16].to_vec().into(),
1376    //                 is_64bit: true,
1377    //             },
1378    //             JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1379    //         ],
1380    //     )
1381    //     .unwrap()
1382    // }
1383    //
1384    // #[tracing_test::traced_test]
1385    // #[test]
1386    // pub fn test_compact_file_system_ignore_double_writes() {
1387    //     run_test(
1388    //         vec![
1389    //             JournalEntry::OpenFileDescriptorV1 {
1390    //                 fd: 1234,
1391    //                 dirfd: 3452345,
1392    //                 dirflags: 0,
1393    //                 path: "/blah".into(),
1394    //                 o_flags: wasi::Oflags::empty(),
1395    //                 fs_rights_base: wasi::Rights::all(),
1396    //                 fs_rights_inheriting: wasi::Rights::all(),
1397    //                 fs_flags: wasi::Fdflags::all(),
1398    //             },
1399    //             JournalEntry::FileDescriptorWriteV1 {
1400    //                 fd: 1234,
1401    //                 offset: 1234,
1402    //                 data: [1u8; 16].to_vec().into(),
1403    //                 is_64bit: true,
1404    //             },
1405    //             JournalEntry::FileDescriptorWriteV1 {
1406    //                 fd: 1234,
1407    //                 offset: 1234,
1408    //                 data: [5u8; 16].to_vec().into(),
1409    //                 is_64bit: true,
1410    //             },
1411    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1412    //         ],
1413    //         vec![
1414    //             JournalEntry::OpenFileDescriptorV1 {
1415    //                 fd: 1234,
1416    //                 dirfd: 3452345,
1417    //                 dirflags: 0,
1418    //                 path: "/blah".into(),
1419    //                 o_flags: wasi::Oflags::empty(),
1420    //                 fs_rights_base: wasi::Rights::all(),
1421    //                 fs_rights_inheriting: wasi::Rights::all(),
1422    //                 fs_flags: wasi::Fdflags::all(),
1423    //             },
1424    //             JournalEntry::FileDescriptorWriteV1 {
1425    //                 fd: 1234,
1426    //                 offset: 1234,
1427    //                 data: [5u8; 16].to_vec().into(),
1428    //                 is_64bit: true,
1429    //             },
1430    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1431    //         ],
1432    //     )
1433    //     .unwrap()
1434    // }
1435    //
1436    // #[tracing_test::traced_test]
1437    // #[test]
1438    // pub fn test_compact_file_system_create_directory() {
1439    //     run_test(
1440    //         vec![JournalEntry::CreateDirectoryV1 {
1441    //             fd: 1234,
1442    //             path: "/blah".into(),
1443    //         }],
1444    //         vec![JournalEntry::CreateDirectoryV1 {
1445    //             fd: 1234,
1446    //             path: "/blah".into(),
1447    //         }],
1448    //     )
1449    //     .unwrap()
1450    // }
1451    //
1452    // #[tracing_test::traced_test]
1453    // #[test]
1454    // pub fn test_compact_file_system_redundant_create_directory() {
1455    //     run_test(
1456    //         vec![
1457    //             JournalEntry::CreateDirectoryV1 {
1458    //                 fd: 1234,
1459    //                 path: "/blah".into(),
1460    //             },
1461    //             JournalEntry::CreateDirectoryV1 {
1462    //                 fd: 1235,
1463    //                 path: "/blah".into(),
1464    //             },
1465    //         ],
1466    //         vec![JournalEntry::CreateDirectoryV1 {
1467    //             fd: 1234,
1468    //             path: "/blah".into(),
1469    //         }],
1470    //     )
1471    //     .unwrap()
1472    // }
1473    //
1474    // #[tracing_test::traced_test]
1475    // #[test]
1476    // pub fn test_compact_duplicate_tty() {
1477    //     run_test(
1478    //         vec![
1479    //             JournalEntry::TtySetV1 {
1480    //                 tty: Tty {
1481    //                     cols: 123,
1482    //                     rows: 65,
1483    //                     width: 2341,
1484    //                     height: 573457,
1485    //                     stdin_tty: true,
1486    //                     stdout_tty: true,
1487    //                     stderr_tty: true,
1488    //                     echo: true,
1489    //                     line_buffered: true,
1490    //                 },
1491    //                 line_feeds: true,
1492    //             },
1493    //             JournalEntry::TtySetV1 {
1494    //                 tty: Tty {
1495    //                     cols: 12,
1496    //                     rows: 65,
1497    //                     width: 2341,
1498    //                     height: 573457,
1499    //                     stdin_tty: true,
1500    //                     stdout_tty: false,
1501    //                     stderr_tty: true,
1502    //                     echo: true,
1503    //                     line_buffered: true,
1504    //                 },
1505    //                 line_feeds: true,
1506    //             },
1507    //         ],
1508    //         vec![JournalEntry::TtySetV1 {
1509    //             tty: Tty {
1510    //                 cols: 12,
1511    //                 rows: 65,
1512    //                 width: 2341,
1513    //                 height: 573457,
1514    //                 stdin_tty: true,
1515    //                 stdout_tty: false,
1516    //                 stderr_tty: true,
1517    //                 echo: true,
1518    //                 line_buffered: true,
1519    //             },
1520    //             line_feeds: true,
1521    //         }],
1522    //     )
1523    //     .unwrap()
1524    // }
1525
1526    #[tracing_test::traced_test]
1527    #[test]
1528    pub fn test_compact_close_sockets() {
1529        let fd = 512;
1530        run_test(
1531            vec![
1532                JournalEntry::SocketConnectedV1 {
1533                    fd,
1534                    local_addr: "127.0.0.1:3333".parse().unwrap(),
1535                    peer_addr: "127.0.0.1:9999".parse().unwrap(),
1536                },
1537                JournalEntry::SocketSendV1 {
1538                    fd,
1539                    data: Cow::Borrowed(b"123"),
1540                    // flags: SiFlags,
1541                    flags: Default::default(),
1542                    is_64bit: false,
1543                },
1544                JournalEntry::SocketSendV1 {
1545                    fd,
1546                    data: Cow::Borrowed(b"123"),
1547                    // flags: SiFlags,
1548                    flags: Default::default(),
1549                    is_64bit: false,
1550                },
1551                JournalEntry::SocketSendV1 {
1552                    fd,
1553                    data: Cow::Borrowed(b"456"),
1554                    // flags: SiFlags,
1555                    flags: Default::default(),
1556                    is_64bit: false,
1557                },
1558                JournalEntry::CloseFileDescriptorV1 { fd: 512 },
1559            ],
1560            vec![],
1561        )
1562        .unwrap()
1563    }
1564}