1use bytes::Buf;
2use rkyv::{
3 api::high::HighSerializer,
4 rancor::Strategy,
5 ser::{
6 Positional, Serializer, Writer,
7 allocator::{Arena, ArenaHandle},
8 sharing::Share,
9 writer::IoWriter,
10 },
11};
12use shared_buffer::OwnedBuffer;
13use std::{
14 fs::File,
15 io::{Seek, SeekFrom, Write},
16 path::Path,
17 sync::{Arc, Mutex},
18};
19use virtual_fs::mem_fs::OffloadBackingStore;
20
21use super::*;
22
23#[derive(Debug)]
35pub struct LogFileJournal {
36 tx: LogFileJournalTx,
37 rx: LogFileJournalRx,
38}
39
40struct TxState {
41 underlying_file: File,
43
44 file: File,
46
47 arena: Arena,
49
50 pos: usize,
52}
53
54impl TxState {
55 fn get_serializer(&mut self) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
56 self.get_serializer_with_pos(self.pos)
57 }
58
59 fn get_serializer_with_pos(
60 &mut self,
61 pos: usize,
62 ) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
63 Serializer::new(
64 IoWriter::with_pos(&self.file, pos),
65 self.arena.acquire(),
66 Share::new(),
67 )
68 }
69
70 fn to_high<'a>(
71 serializer: &'a mut Serializer<IoWriter<&'a File>, ArenaHandle<'a>, Share>,
72 ) -> &'a mut HighSerializer<IoWriter<&'a File>, ArenaHandle<'a>, rkyv::rancor::Error> {
73 Strategy::wrap(serializer)
74 }
75}
76
77impl std::fmt::Debug for TxState {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("TxState")
80 .field("file", &self.underlying_file)
81 .finish()
82 }
83}
84
85#[derive(Debug, Clone)]
86pub struct LogFileJournalTx {
87 state: Arc<Mutex<TxState>>,
88}
89
90#[derive(Debug)]
91pub struct LogFileJournalRx {
92 tx: Option<LogFileJournalTx>,
93 buffer_pos: Mutex<usize>,
94 buffer: OwnedBuffer,
95 store: OffloadBackingStore,
96}
97
98impl LogFileJournalRx {
99 pub fn owned_buffer(&self) -> OwnedBuffer {
100 self.store.owned_buffer().clone()
101 }
102
103 pub fn backing_store(&self) -> OffloadBackingStore {
104 self.store.clone()
105 }
106}
107
108impl LogFileJournalTx {
109 pub fn as_rx(&self) -> anyhow::Result<LogFileJournalRx> {
110 let state = self.state.lock().unwrap();
111 let file = state.underlying_file.try_clone()?;
112
113 let store = OffloadBackingStore::from_file(&file);
114 let buffer = store.owned_buffer();
115
116 let mut buffer_pos = 0;
118 let mut buffer_ptr = buffer.as_ref();
119 if buffer_ptr.len() >= 8 {
120 let magic = u64::from_be_bytes(buffer_ptr[0..8].try_into().unwrap());
121 if magic != JOURNAL_MAGIC_NUMBER {
122 return Err(anyhow::format_err!(
123 "invalid magic number of journal ({magic} vs {JOURNAL_MAGIC_NUMBER})"
124 ));
125 }
126 buffer_ptr.advance(8);
127 buffer_pos += 8;
128 } else {
129 tracing::trace!("journal has no magic (could be empty?)");
130 }
131
132 Ok(LogFileJournalRx {
133 tx: Some(self.clone()),
134 buffer_pos: Mutex::new(buffer_pos),
135 buffer,
136 store,
137 })
138 }
139}
140
141impl LogFileJournal {
142 pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
143 let file = std::fs::File::options()
144 .read(true)
145 .write(true)
146 .create(true)
147 .truncate(false)
148 .open(path)?;
149 Self::from_file(file)
150 }
151
152 pub fn new_readonly(path: impl AsRef<Path>) -> anyhow::Result<Self> {
153 let file = std::fs::File::options().read(true).open(path)?;
154 Self::from_file(file)
155 }
156
157 pub fn owned_buffer(&self) -> OwnedBuffer {
158 self.rx.owned_buffer()
159 }
160
161 pub fn backing_store(&self) -> OffloadBackingStore {
162 self.rx.backing_store()
163 }
164
165 pub fn from_file(mut file: std::fs::File) -> anyhow::Result<Self> {
167 let underlying_file = file.try_clone()?;
170 let arena = Arena::new();
171
172 let end_pos = file.seek(SeekFrom::End(0))?;
173
174 let mut tx = TxState {
175 underlying_file,
176 arena,
177 file,
178 pos: end_pos as usize,
179 };
180
181 let mut serializer = tx.get_serializer();
182 let serializer = TxState::to_high(&mut serializer);
183
184 if serializer.pos() == 0 {
185 let magic = JOURNAL_MAGIC_NUMBER;
186 let magic = magic.to_be_bytes();
187 serializer.write(&magic)?;
188 }
189
190 let last_pos = serializer.pos();
191 let _ = serializer;
192
193 tx.arena.shrink();
194 tx.pos = last_pos;
195
196 let tx = LogFileJournalTx {
198 state: Arc::new(Mutex::new(tx)),
199 };
200
201 let rx = tx.as_rx()?;
203
204 Ok(Self { rx, tx })
205 }
206
207 pub fn from_buffer(
209 buffer: OwnedBuffer,
210 ) -> RecombinedJournal<UnsupportedJournal, LogFileJournalRx> {
211 let rx = LogFileJournalRx {
213 tx: None,
214 buffer_pos: Mutex::new(0),
215 buffer: buffer.clone(),
216 store: OffloadBackingStore::from_buffer(buffer),
217 };
218
219 let tx = UnsupportedJournal::default();
221
222 RecombinedJournal::new(tx, rx)
224 }
225}
226
227impl WritableJournal for LogFileJournalTx {
228 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
229 tracing::debug!("journal event: {:?}", entry);
230
231 let mut state = self.state.lock().unwrap();
232
233 let record_type: JournalEntryRecordType = entry.archive_record_type();
235 let mut serializer = state.get_serializer();
236 let serializer = TxState::to_high(&mut serializer);
237 let offset_header = serializer.pos() as u64;
238 tracing::trace!("serpos is {offset_header}");
239 serializer.write(&[0u8; 8])?;
240
241 let offset_start = serializer.pos() as u64;
243 entry.serialize_archive(serializer)?;
244 let offset_end = serializer.pos() as u64;
245 let record_size = offset_end - offset_start;
246 tracing::trace!(
247 "delimiter header={offset_header},start={offset_start},record_size={record_size}"
248 );
249
250 let last_pos = serializer.pos();
251 let _ = serializer;
252
253 state.underlying_file.seek(SeekFrom::Start(offset_header))?;
255 let header_bytes = {
256 let a = (record_type as u16).to_be_bytes();
257 let b = &record_size.to_be_bytes()[2..8];
258 [a[0], a[1], b[0], b[1], b[2], b[3], b[4], b[5]]
259 };
260 state.underlying_file.write_all(&header_bytes)?;
261 state.underlying_file.seek(SeekFrom::Start(offset_end))?;
262
263 state.arena.shrink();
264 state.pos = last_pos;
265
266 Ok(LogWriteResult {
268 record_start: offset_start,
269 record_end: offset_end,
270 })
271 }
272
273 fn flush(&self) -> anyhow::Result<()> {
274 let mut state = self.state.lock().unwrap();
275 state.underlying_file.flush()?;
276 Ok(())
277 }
278}
279
280impl ReadableJournal for LogFileJournalRx {
281 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
284 let mut buffer_pos = self.buffer_pos.lock().unwrap();
285
286 let mut buffer_ptr = self.buffer.as_ref();
289 buffer_ptr.advance(*buffer_pos);
290 loop {
291 if buffer_ptr.len() < 8 {
293 return Ok(None);
294 }
295
296 let record_type: JournalEntryRecordType;
297 let header = {
298 let b = buffer_ptr;
299
300 if b[0..8] == JOURNAL_MAGIC_NUMBER_BYTES[0..8] {
306 buffer_ptr.advance(8);
307 *buffer_pos += 8;
308 continue;
309 }
310
311 let header = JournalEntryHeader {
313 record_type: u16::from_be_bytes([b[0], b[1]]),
314 record_size: u64::from_be_bytes([0u8, 0u8, b[2], b[3], b[4], b[5], b[6], b[7]]),
315 };
316
317 record_type = match header.record_type.try_into() {
319 Ok(t) => t,
320 Err(_) => {
321 tracing::debug!(
322 "unknown journal entry type ({}) - the journal stops here",
323 header.record_type
324 );
325 return Ok(None);
326 }
327 };
328
329 buffer_ptr.advance(8);
330 *buffer_pos += 8;
331 header
332 };
333 let record_start = *buffer_pos as u64;
334
335 let entry = &buffer_ptr[..(header.record_size as usize)];
337 buffer_ptr.advance(header.record_size as usize);
338 *buffer_pos += header.record_size as usize;
339
340 let record = unsafe { record_type.deserialize_archive(entry)? };
341 return Ok(Some(LogReadResult {
342 record_start,
343 record_end: *buffer_pos as u64,
344 record,
345 }));
346 }
347 }
348
349 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
350 if let Some(tx) = &self.tx {
351 let ret = tx.as_rx()?;
352 Ok(Box::new(ret))
353 } else {
354 Ok(Box::new(LogFileJournalRx {
355 tx: None,
356 buffer_pos: Mutex::new(0),
357 buffer: self.buffer.clone(),
358 store: self.store.clone(),
359 }))
360 }
361 }
362}
363
364impl WritableJournal for LogFileJournal {
365 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
366 self.tx.write(entry)
367 }
368
369 fn flush(&self) -> anyhow::Result<()> {
370 self.tx.flush()
371 }
372
373 fn commit(&self) -> anyhow::Result<usize> {
374 self.tx.commit()
375 }
376
377 fn rollback(&self) -> anyhow::Result<usize> {
378 self.tx.rollback()
379 }
380}
381
382impl ReadableJournal for LogFileJournal {
383 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
384 self.rx.read()
385 }
386
387 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
388 self.rx.as_restarted()
389 }
390}
391
392impl Journal for LogFileJournal {
393 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
394 (Box::new(self.tx), Box::new(self.rx))
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use wasmer_wasix_types::wasix::WasiMemoryLayout;
401
402 use super::*;
403
404 #[tracing_test::traced_test]
405 #[test]
406 pub fn test_save_and_load_journal_events() {
407 let file = tempfile::NamedTempFile::new().unwrap();
409
410 let journal = LogFileJournal::from_file(file.as_file().try_clone().unwrap()).unwrap();
412 journal
413 .write(JournalEntry::CreatePipeV1 {
414 read_fd: 1,
415 write_fd: 2,
416 })
417 .unwrap();
418 journal
419 .write(JournalEntry::SetThreadV1 {
420 id: 1,
421 call_stack: vec![11; 116].into(),
422 memory_stack: vec![22; 16].into(),
423 store_data: vec![33; 136].into(),
424 is_64bit: false,
425 layout: WasiMemoryLayout {
426 stack_upper: 0,
427 stack_lower: 1024,
428 guard_size: 16,
429 stack_size: 1024,
430 tls_base: None,
431 },
432 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
433 })
434 .unwrap();
435 journal.write(JournalEntry::PortAddrClearV1).unwrap();
436 drop(journal);
437
438 let journal = LogFileJournal::new(file.path()).unwrap();
440 let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
441 let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
442 let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
443 let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
444
445 assert_eq!(
447 event1,
448 Some(JournalEntry::CreatePipeV1 {
449 read_fd: 1,
450 write_fd: 2
451 })
452 );
453 assert_eq!(
454 event2,
455 Some(JournalEntry::SetThreadV1 {
456 id: 1,
457 call_stack: vec![11; 116].into(),
458 memory_stack: vec![22; 16].into(),
459 store_data: vec![33; 136].into(),
460 is_64bit: false,
461 layout: WasiMemoryLayout {
462 stack_upper: 0,
463 stack_lower: 1024,
464 guard_size: 16,
465 stack_size: 1024,
466 tls_base: None,
467 },
468 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
469 })
470 );
471 assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
472 assert_eq!(event4, None);
473
474 journal
476 .write(JournalEntry::SocketSendV1 {
477 fd: 1234,
478 data: [12; 1024].to_vec().into(),
479 flags: 123,
480 is_64bit: true,
481 })
482 .unwrap();
483
484 assert_eq!(journal.read().unwrap().map(LogReadResult::into_inner), None);
486
487 let journal = LogFileJournal::new(file.path()).unwrap();
489
490 journal
492 .write(JournalEntry::CreatePipeV1 {
493 read_fd: 1234,
494 write_fd: 5432,
495 })
496 .unwrap();
497
498 let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
499 let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
500 let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
501 let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
502 let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
503 assert_eq!(
504 event1,
505 Some(JournalEntry::CreatePipeV1 {
506 read_fd: 1,
507 write_fd: 2
508 })
509 );
510 assert_eq!(
511 event2,
512 Some(JournalEntry::SetThreadV1 {
513 id: 1,
514 call_stack: vec![11; 116].into(),
515 memory_stack: vec![22; 16].into(),
516 store_data: vec![33; 136].into(),
517 is_64bit: false,
518 layout: WasiMemoryLayout {
519 stack_upper: 0,
520 stack_lower: 1024,
521 guard_size: 16,
522 stack_size: 1024,
523 tls_base: None,
524 },
525 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
526 })
527 );
528 assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
529 assert_eq!(
530 event4,
531 Some(JournalEntry::SocketSendV1 {
532 fd: 1234,
533 data: [12; 1024].to_vec().into(),
534 flags: 123,
535 is_64bit: true,
536 })
537 );
538 assert_eq!(event5, None);
539
540 let journal = LogFileJournal::new(file.path()).unwrap();
542
543 let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
544 let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
545 let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
546 let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
547 let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
548 let event6 = journal.read().unwrap().map(LogReadResult::into_inner);
549
550 tracing::info!("event1 {:?}", event1);
551 tracing::info!("event2 {:?}", event2);
552 tracing::info!("event3 {:?}", event3);
553 tracing::info!("event4 {:?}", event4);
554 tracing::info!("event5 {:?}", event5);
555 tracing::info!("event6 {:?}", event6);
556
557 assert_eq!(
558 event1,
559 Some(JournalEntry::CreatePipeV1 {
560 read_fd: 1,
561 write_fd: 2
562 })
563 );
564 assert_eq!(
565 event2,
566 Some(JournalEntry::SetThreadV1 {
567 id: 1,
568 call_stack: vec![11; 116].into(),
569 memory_stack: vec![22; 16].into(),
570 store_data: vec![33; 136].into(),
571 is_64bit: false,
572 layout: WasiMemoryLayout {
573 stack_upper: 0,
574 stack_lower: 1024,
575 guard_size: 16,
576 stack_size: 1024,
577 tls_base: None,
578 },
579 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
580 })
581 );
582 assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
583 assert_eq!(
584 event4,
585 Some(JournalEntry::SocketSendV1 {
586 fd: 1234,
587 data: [12; 1024].to_vec().into(),
588 flags: 123,
589 is_64bit: true,
590 })
591 );
592 assert_eq!(
593 event5,
594 Some(JournalEntry::CreatePipeV1 {
595 read_fd: 1234,
596 write_fd: 5432,
597 })
598 );
599 assert_eq!(event6, None);
600 }
601}