wasmer_journal/concrete/
log_file.rs

1use bytes::Buf;
2use rkyv::{
3    api::high::HighSerializer,
4    rancor::Strategy,
5    ser::{
6        Positional, Serializer, Writer,
7        allocator::{Arena, ArenaHandle},
8        sharing::Share,
9        writer::IoWriter,
10    },
11};
12use shared_buffer::OwnedBuffer;
13use std::{
14    fs::File,
15    io::{Seek, SeekFrom, Write},
16    path::Path,
17    sync::{Arc, Mutex},
18};
19use virtual_fs::mem_fs::OffloadBackingStore;
20
21use super::*;
22
23/// The LogFile snapshot capturer will write its snapshots to a linear journal
24/// and read them when restoring. It uses the `bincode` serializer which
25/// means that forwards and backwards compatibility must be dealt with
26/// carefully.
27///
28/// When opening an existing journal file that was previously saved
29/// then new entries will be added to the end regardless of if
30/// its been read.
31///
32/// The logfile snapshot capturer uses a 64bit number as a entry encoding
33/// delimiter.
34#[derive(Debug)]
35pub struct LogFileJournal {
36    tx: LogFileJournalTx,
37    rx: LogFileJournalRx,
38}
39
40struct TxState {
41    /// The original handle to the file
42    underlying_file: File,
43
44    /// A modified handle to the original underlying file
45    file: File,
46
47    /// The arena necessary for serialization
48    arena: Arena,
49
50    /// The latest position in the file the serializator got to
51    pos: usize,
52}
53
54impl TxState {
55    fn get_serializer(&mut self) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
56        self.get_serializer_with_pos(self.pos)
57    }
58
59    fn get_serializer_with_pos(
60        &mut self,
61        pos: usize,
62    ) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
63        Serializer::new(
64            IoWriter::with_pos(&self.file, pos),
65            self.arena.acquire(),
66            Share::new(),
67        )
68    }
69
70    fn to_high<'a>(
71        serializer: &'a mut Serializer<IoWriter<&'a File>, ArenaHandle<'a>, Share>,
72    ) -> &'a mut HighSerializer<IoWriter<&'a File>, ArenaHandle<'a>, rkyv::rancor::Error> {
73        Strategy::wrap(serializer)
74    }
75}
76
77impl std::fmt::Debug for TxState {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("TxState")
80            .field("file", &self.underlying_file)
81            .finish()
82    }
83}
84
85#[derive(Debug, Clone)]
86pub struct LogFileJournalTx {
87    state: Arc<Mutex<TxState>>,
88}
89
90#[derive(Debug)]
91pub struct LogFileJournalRx {
92    tx: Option<LogFileJournalTx>,
93    buffer_pos: Mutex<usize>,
94    buffer: OwnedBuffer,
95    store: OffloadBackingStore,
96}
97
98impl LogFileJournalRx {
99    pub fn owned_buffer(&self) -> OwnedBuffer {
100        self.store.owned_buffer().clone()
101    }
102
103    pub fn backing_store(&self) -> OffloadBackingStore {
104        self.store.clone()
105    }
106}
107
108impl LogFileJournalTx {
109    pub fn as_rx(&self) -> anyhow::Result<LogFileJournalRx> {
110        let state = self.state.lock().unwrap();
111        let file = state.underlying_file.try_clone()?;
112
113        let store = OffloadBackingStore::from_file(&file);
114        let buffer = store.owned_buffer();
115
116        // If the buffer exists we valid the magic number
117        let mut buffer_pos = 0;
118        let mut buffer_ptr = buffer.as_ref();
119        if buffer_ptr.len() >= 8 {
120            let magic = u64::from_be_bytes(buffer_ptr[0..8].try_into().unwrap());
121            if magic != JOURNAL_MAGIC_NUMBER {
122                return Err(anyhow::format_err!(
123                    "invalid magic number of journal ({magic} vs {JOURNAL_MAGIC_NUMBER})"
124                ));
125            }
126            buffer_ptr.advance(8);
127            buffer_pos += 8;
128        } else {
129            tracing::trace!("journal has no magic (could be empty?)");
130        }
131
132        Ok(LogFileJournalRx {
133            tx: Some(self.clone()),
134            buffer_pos: Mutex::new(buffer_pos),
135            buffer,
136            store,
137        })
138    }
139}
140
141impl LogFileJournal {
142    pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
143        let file = std::fs::File::options()
144            .read(true)
145            .write(true)
146            .create(true)
147            .truncate(false)
148            .open(path)?;
149        Self::from_file(file)
150    }
151
152    pub fn new_readonly(path: impl AsRef<Path>) -> anyhow::Result<Self> {
153        let file = std::fs::File::options().read(true).open(path)?;
154        Self::from_file(file)
155    }
156
157    pub fn owned_buffer(&self) -> OwnedBuffer {
158        self.rx.owned_buffer()
159    }
160
161    pub fn backing_store(&self) -> OffloadBackingStore {
162        self.rx.backing_store()
163    }
164
165    /// Create a new journal from a file
166    pub fn from_file(mut file: std::fs::File) -> anyhow::Result<Self> {
167        // Move to the end of the file and write the
168        // magic if one is needed
169        let underlying_file = file.try_clone()?;
170        let arena = Arena::new();
171
172        let end_pos = file.seek(SeekFrom::End(0))?;
173
174        let mut tx = TxState {
175            underlying_file,
176            arena,
177            file,
178            pos: end_pos as usize,
179        };
180
181        let mut serializer = tx.get_serializer();
182        let serializer = TxState::to_high(&mut serializer);
183
184        if serializer.pos() == 0 {
185            let magic = JOURNAL_MAGIC_NUMBER;
186            let magic = magic.to_be_bytes();
187            serializer.write(&magic)?;
188        }
189
190        let last_pos = serializer.pos();
191        let _ = serializer;
192
193        tx.arena.shrink();
194        tx.pos = last_pos;
195
196        // Create the tx
197        let tx = LogFileJournalTx {
198            state: Arc::new(Mutex::new(tx)),
199        };
200
201        // First we create the readable journal
202        let rx = tx.as_rx()?;
203
204        Ok(Self { rx, tx })
205    }
206
207    /// Create a new journal from a buffer
208    pub fn from_buffer(
209        buffer: OwnedBuffer,
210    ) -> RecombinedJournal<UnsupportedJournal, LogFileJournalRx> {
211        // Create the rx
212        let rx = LogFileJournalRx {
213            tx: None,
214            buffer_pos: Mutex::new(0),
215            buffer: buffer.clone(),
216            store: OffloadBackingStore::from_buffer(buffer),
217        };
218
219        // Create an unsupported write journal
220        let tx = UnsupportedJournal::default();
221
222        // Now recombine
223        RecombinedJournal::new(tx, rx)
224    }
225}
226
227impl WritableJournal for LogFileJournalTx {
228    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
229        tracing::debug!("journal event: {:?}", entry);
230
231        let mut state = self.state.lock().unwrap();
232
233        // Write the header (with a record size of zero)
234        let record_type: JournalEntryRecordType = entry.archive_record_type();
235        let mut serializer = state.get_serializer();
236        let serializer = TxState::to_high(&mut serializer);
237        let offset_header = serializer.pos() as u64;
238        tracing::trace!("serpos is {offset_header}");
239        serializer.write(&[0u8; 8])?;
240
241        // Now serialize the actual data to the log
242        let offset_start = serializer.pos() as u64;
243        entry.serialize_archive(serializer)?;
244        let offset_end = serializer.pos() as u64;
245        let record_size = offset_end - offset_start;
246        tracing::trace!(
247            "delimiter header={offset_header},start={offset_start},record_size={record_size}"
248        );
249
250        let last_pos = serializer.pos();
251        let _ = serializer;
252
253        // Write the record and then move back to the end again
254        state.underlying_file.seek(SeekFrom::Start(offset_header))?;
255        let header_bytes = {
256            let a = (record_type as u16).to_be_bytes();
257            let b = &record_size.to_be_bytes()[2..8];
258            [a[0], a[1], b[0], b[1], b[2], b[3], b[4], b[5]]
259        };
260        state.underlying_file.write_all(&header_bytes)?;
261        state.underlying_file.seek(SeekFrom::Start(offset_end))?;
262
263        state.arena.shrink();
264        state.pos = last_pos;
265
266        // Now write the actual data and update the offsets
267        Ok(LogWriteResult {
268            record_start: offset_start,
269            record_end: offset_end,
270        })
271    }
272
273    fn flush(&self) -> anyhow::Result<()> {
274        let mut state = self.state.lock().unwrap();
275        state.underlying_file.flush()?;
276        Ok(())
277    }
278}
279
280impl ReadableJournal for LogFileJournalRx {
281    /// UNSAFE: This method uses unsafe operations to remove the need to zero
282    /// the buffer before its read the log entries into it
283    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
284        let mut buffer_pos = self.buffer_pos.lock().unwrap();
285
286        // Get a memory reference to the data on the disk at
287        // the current read location
288        let mut buffer_ptr = self.buffer.as_ref();
289        buffer_ptr.advance(*buffer_pos);
290        loop {
291            // Read the headers and advance
292            if buffer_ptr.len() < 8 {
293                return Ok(None);
294            }
295
296            let record_type: JournalEntryRecordType;
297            let header = {
298                let b = buffer_ptr;
299
300                // If the next header is the magic itself then skip it.
301                // You may be wondering how a magic could appear later
302                // in the journal itself. This can happen if someone
303                // concat's multiple journals together to make a combined
304                // journal
305                if b[0..8] == JOURNAL_MAGIC_NUMBER_BYTES[0..8] {
306                    buffer_ptr.advance(8);
307                    *buffer_pos += 8;
308                    continue;
309                }
310
311                // Otherwise we decode the header
312                let header = JournalEntryHeader {
313                    record_type: u16::from_be_bytes([b[0], b[1]]),
314                    record_size: u64::from_be_bytes([0u8, 0u8, b[2], b[3], b[4], b[5], b[6], b[7]]),
315                };
316
317                // Now we read the entry
318                record_type = match header.record_type.try_into() {
319                    Ok(t) => t,
320                    Err(_) => {
321                        tracing::debug!(
322                            "unknown journal entry type ({}) - the journal stops here",
323                            header.record_type
324                        );
325                        return Ok(None);
326                    }
327                };
328
329                buffer_ptr.advance(8);
330                *buffer_pos += 8;
331                header
332            };
333            let record_start = *buffer_pos as u64;
334
335            // Move the buffer position forward past the record
336            let entry = &buffer_ptr[..(header.record_size as usize)];
337            buffer_ptr.advance(header.record_size as usize);
338            *buffer_pos += header.record_size as usize;
339
340            let record = unsafe { record_type.deserialize_archive(entry)? };
341            return Ok(Some(LogReadResult {
342                record_start,
343                record_end: *buffer_pos as u64,
344                record,
345            }));
346        }
347    }
348
349    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
350        if let Some(tx) = &self.tx {
351            let ret = tx.as_rx()?;
352            Ok(Box::new(ret))
353        } else {
354            Ok(Box::new(LogFileJournalRx {
355                tx: None,
356                buffer_pos: Mutex::new(0),
357                buffer: self.buffer.clone(),
358                store: self.store.clone(),
359            }))
360        }
361    }
362}
363
364impl WritableJournal for LogFileJournal {
365    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
366        self.tx.write(entry)
367    }
368
369    fn flush(&self) -> anyhow::Result<()> {
370        self.tx.flush()
371    }
372
373    fn commit(&self) -> anyhow::Result<usize> {
374        self.tx.commit()
375    }
376
377    fn rollback(&self) -> anyhow::Result<usize> {
378        self.tx.rollback()
379    }
380}
381
382impl ReadableJournal for LogFileJournal {
383    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
384        self.rx.read()
385    }
386
387    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
388        self.rx.as_restarted()
389    }
390}
391
392impl Journal for LogFileJournal {
393    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
394        (Box::new(self.tx), Box::new(self.rx))
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use wasmer_wasix_types::wasix::WasiMemoryLayout;
401
402    use super::*;
403
404    #[tracing_test::traced_test]
405    #[test]
406    pub fn test_save_and_load_journal_events() {
407        // Get a random file path
408        let file = tempfile::NamedTempFile::new().unwrap();
409
410        // Write some events to it
411        let journal = LogFileJournal::from_file(file.as_file().try_clone().unwrap()).unwrap();
412        journal
413            .write(JournalEntry::CreatePipeV1 {
414                read_fd: 1,
415                write_fd: 2,
416            })
417            .unwrap();
418        journal
419            .write(JournalEntry::SetThreadV1 {
420                id: 1,
421                call_stack: vec![11; 116].into(),
422                memory_stack: vec![22; 16].into(),
423                store_data: vec![33; 136].into(),
424                is_64bit: false,
425                layout: WasiMemoryLayout {
426                    stack_upper: 0,
427                    stack_lower: 1024,
428                    guard_size: 16,
429                    stack_size: 1024,
430                    tls_base: None,
431                },
432                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
433            })
434            .unwrap();
435        journal.write(JournalEntry::PortAddrClearV1).unwrap();
436        drop(journal);
437
438        // Read the events and validate
439        let journal = LogFileJournal::new(file.path()).unwrap();
440        let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
441        let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
442        let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
443        let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
444
445        // Check the events
446        assert_eq!(
447            event1,
448            Some(JournalEntry::CreatePipeV1 {
449                read_fd: 1,
450                write_fd: 2
451            })
452        );
453        assert_eq!(
454            event2,
455            Some(JournalEntry::SetThreadV1 {
456                id: 1,
457                call_stack: vec![11; 116].into(),
458                memory_stack: vec![22; 16].into(),
459                store_data: vec![33; 136].into(),
460                is_64bit: false,
461                layout: WasiMemoryLayout {
462                    stack_upper: 0,
463                    stack_lower: 1024,
464                    guard_size: 16,
465                    stack_size: 1024,
466                    tls_base: None,
467                },
468                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
469            })
470        );
471        assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
472        assert_eq!(event4, None);
473
474        // Now write another event
475        journal
476            .write(JournalEntry::SocketSendV1 {
477                fd: 1234,
478                data: [12; 1024].to_vec().into(),
479                flags: 123,
480                is_64bit: true,
481            })
482            .unwrap();
483
484        // The event should not be visible yet unless we reload the log file
485        assert_eq!(journal.read().unwrap().map(LogReadResult::into_inner), None);
486
487        // Reload the load file
488        let journal = LogFileJournal::new(file.path()).unwrap();
489
490        // Before we read it, we will throw in another event
491        journal
492            .write(JournalEntry::CreatePipeV1 {
493                read_fd: 1234,
494                write_fd: 5432,
495            })
496            .unwrap();
497
498        let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
499        let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
500        let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
501        let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
502        let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
503        assert_eq!(
504            event1,
505            Some(JournalEntry::CreatePipeV1 {
506                read_fd: 1,
507                write_fd: 2
508            })
509        );
510        assert_eq!(
511            event2,
512            Some(JournalEntry::SetThreadV1 {
513                id: 1,
514                call_stack: vec![11; 116].into(),
515                memory_stack: vec![22; 16].into(),
516                store_data: vec![33; 136].into(),
517                is_64bit: false,
518                layout: WasiMemoryLayout {
519                    stack_upper: 0,
520                    stack_lower: 1024,
521                    guard_size: 16,
522                    stack_size: 1024,
523                    tls_base: None,
524                },
525                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
526            })
527        );
528        assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
529        assert_eq!(
530            event4,
531            Some(JournalEntry::SocketSendV1 {
532                fd: 1234,
533                data: [12; 1024].to_vec().into(),
534                flags: 123,
535                is_64bit: true,
536            })
537        );
538        assert_eq!(event5, None);
539
540        // Load it again
541        let journal = LogFileJournal::new(file.path()).unwrap();
542
543        let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
544        let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
545        let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
546        let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
547        let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
548        let event6 = journal.read().unwrap().map(LogReadResult::into_inner);
549
550        tracing::info!("event1 {:?}", event1);
551        tracing::info!("event2 {:?}", event2);
552        tracing::info!("event3 {:?}", event3);
553        tracing::info!("event4 {:?}", event4);
554        tracing::info!("event5 {:?}", event5);
555        tracing::info!("event6 {:?}", event6);
556
557        assert_eq!(
558            event1,
559            Some(JournalEntry::CreatePipeV1 {
560                read_fd: 1,
561                write_fd: 2
562            })
563        );
564        assert_eq!(
565            event2,
566            Some(JournalEntry::SetThreadV1 {
567                id: 1,
568                call_stack: vec![11; 116].into(),
569                memory_stack: vec![22; 16].into(),
570                store_data: vec![33; 136].into(),
571                is_64bit: false,
572                layout: WasiMemoryLayout {
573                    stack_upper: 0,
574                    stack_lower: 1024,
575                    guard_size: 16,
576                    stack_size: 1024,
577                    tls_base: None,
578                },
579                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
580            })
581        );
582        assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
583        assert_eq!(
584            event4,
585            Some(JournalEntry::SocketSendV1 {
586                fd: 1234,
587                data: [12; 1024].to_vec().into(),
588                flags: 123,
589                is_64bit: true,
590            })
591        );
592        assert_eq!(
593            event5,
594            Some(JournalEntry::CreatePipeV1 {
595                read_fd: 1234,
596                write_fd: 5432,
597            })
598        );
599        assert_eq!(event6, None);
600    }
601}