wasmer_journal/concrete/
compacting_log_file.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::{Arc, Mutex},
4};
5
6use super::*;
7
8#[derive(Debug)]
9struct State {
10    on_n_records: Option<u64>,
11    on_n_size: Option<u64>,
12    on_factor_size: Option<f32>,
13    on_drop: bool,
14    cnt_records: u64,
15    cnt_size: u64,
16    ref_size: u64,
17}
18
19#[derive(Debug)]
20pub struct CompactingLogFileJournal {
21    tx: CompactingLogFileJournalTx,
22    rx: CompactingLogFileJournalRx,
23}
24
25#[derive(Debug)]
26pub struct CompactingLogFileJournalTx {
27    state: Arc<Mutex<State>>,
28    inner: CompactingJournalTx,
29    main_path: PathBuf,
30    temp_path: PathBuf,
31}
32
33#[derive(Debug)]
34pub struct CompactingLogFileJournalRx {
35    #[allow(dead_code)]
36    state: Arc<Mutex<State>>,
37    inner: CompactingJournalRx,
38}
39
40impl CompactingLogFileJournalRx {
41    pub fn swap_inner(&mut self, with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
42        self.inner.swap_inner(with)
43    }
44}
45
46impl CompactingLogFileJournal {
47    pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
48        // We prepare a compacting journal which does nothing
49        // with the events other than learn from them
50        let counting = CountingJournal::default();
51        let mut compacting = CompactingJournal::new(counting.clone())?;
52
53        // We first feed all the entries into the compactor so that
54        // it learns all the records
55        let log_file = LogFileJournal::new(path.as_ref())?;
56        copy_journal(&log_file, &compacting)?;
57
58        // Now everything is learned its time to attach the
59        // log file to the compacting journal
60        compacting.replace_inner(log_file);
61        let (tx, rx) = compacting.into_split();
62
63        let mut temp_filename = path
64            .as_ref()
65            .file_name()
66            .ok_or_else(|| {
67                anyhow::format_err!(
68                    "The path is not a valid filename - {}",
69                    path.as_ref().to_string_lossy()
70                )
71            })?
72            .to_string_lossy()
73            .to_string();
74        temp_filename.insert_str(0, ".compacting.");
75        let temp_path = path.as_ref().with_file_name(&temp_filename);
76
77        let state = Arc::new(Mutex::new(State {
78            on_drop: false,
79            on_n_records: None,
80            on_n_size: None,
81            on_factor_size: None,
82            cnt_records: 0,
83            cnt_size: 0,
84            ref_size: counting.size(),
85        }));
86        let tx = CompactingLogFileJournalTx {
87            state: state.clone(),
88            inner: tx,
89            main_path: path.as_ref().to_path_buf(),
90            temp_path,
91        };
92        let rx = CompactingLogFileJournalRx { state, inner: rx };
93
94        Ok(Self { tx, rx })
95    }
96
97    pub fn compact_now(&mut self) -> anyhow::Result<CompactResult> {
98        let (result, new_rx) = self.tx.compact_now()?;
99        self.rx.inner = new_rx;
100        Ok(result)
101    }
102
103    pub fn with_compact_on_drop(self) -> Self {
104        self.tx.state.lock().unwrap().on_drop = true;
105        self
106    }
107
108    pub fn with_compact_on_n_records(self, n_records: u64) -> Self {
109        self.tx
110            .state
111            .lock()
112            .unwrap()
113            .on_n_records
114            .replace(n_records);
115        self
116    }
117
118    pub fn with_compact_on_n_size(self, n_size: u64) -> Self {
119        self.tx.state.lock().unwrap().on_n_size.replace(n_size);
120        self
121    }
122
123    pub fn with_compact_on_factor_size(self, factor_size: f32) -> Self {
124        self.tx
125            .state
126            .lock()
127            .unwrap()
128            .on_factor_size
129            .replace(factor_size);
130        self
131    }
132}
133
134impl CompactingLogFileJournalTx {
135    pub fn compact_now(&self) -> anyhow::Result<(CompactResult, CompactingJournalRx)> {
136        // Reset the counters
137        self.reset_counters();
138
139        // Create the staging file and open it
140        std::fs::remove_file(&self.temp_path).ok();
141        let target = LogFileJournal::new(self.temp_path.clone())?;
142
143        // Compact the data into the new target and rename it over the last one
144        let result = self.inner.compact_to(target)?;
145        std::fs::rename(&self.temp_path, &self.main_path)?;
146
147        // Renaming the file has quite a detrimental effect on the file as
148        // it means any new mmap operations will fail, hence we need to
149        // reopen the log file, seek to the end and reattach it
150        let target = LogFileJournal::new(self.main_path.clone())?;
151
152        // We prepare a compacting journal which does nothing
153        // with the events other than learn from them
154        let counting = CountingJournal::default();
155        let mut compacting = CompactingJournal::new(counting)?;
156        copy_journal(&target, &compacting)?;
157
158        // Now everything is learned its time to attach the log file to the compacting journal
159        // and replace the current one
160        compacting.replace_inner(target);
161        let (tx, rx) = compacting.into_split();
162        self.inner.swap(tx);
163
164        // We take a new reference point for the size of the journal
165        {
166            let mut state = self.state.lock().unwrap();
167            state.ref_size = result.total_size;
168        }
169
170        Ok((result, rx))
171    }
172
173    pub fn reset_counters(&self) {
174        let mut state = self.state.lock().unwrap();
175        state.cnt_records = 0;
176        state.cnt_size = 0;
177    }
178}
179
180impl Drop for CompactingLogFileJournalTx {
181    fn drop(&mut self) {
182        let triggered = self.state.lock().unwrap().on_drop;
183        if triggered && let Err(err) = self.compact_now() {
184            tracing::error!("failed to compact log - {}", err);
185        }
186    }
187}
188
189impl ReadableJournal for CompactingLogFileJournalRx {
190    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
191        self.inner.read()
192    }
193
194    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
195        self.inner.as_restarted()
196    }
197}
198
199impl WritableJournal for CompactingLogFileJournalTx {
200    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
201        let res = self.inner.write(entry)?;
202
203        let triggered = {
204            let mut state = self.state.lock().unwrap();
205            if res.record_size() > 0 {
206                state.cnt_records += 1;
207                state.cnt_size += res.record_size();
208            }
209
210            let mut triggered = false;
211            if let Some(on) = state.on_n_records.as_ref()
212                && state.cnt_records >= *on
213            {
214                triggered = true;
215            }
216            if let Some(on) = state.on_n_size.as_ref()
217                && state.cnt_size >= *on
218            {
219                triggered = true;
220            }
221
222            if let Some(factor) = state.on_factor_size.as_ref() {
223                let next_ref = (*factor * state.ref_size as f32) as u64;
224                if state.cnt_size > next_ref {
225                    triggered = true;
226                }
227            }
228
229            triggered
230        };
231
232        if triggered {
233            self.compact_now()?;
234        }
235
236        Ok(res)
237    }
238
239    fn flush(&self) -> anyhow::Result<()> {
240        self.inner.flush()
241    }
242
243    fn commit(&self) -> anyhow::Result<usize> {
244        self.inner.commit()
245    }
246
247    fn rollback(&self) -> anyhow::Result<usize> {
248        self.inner.rollback()
249    }
250}
251
252impl ReadableJournal for CompactingLogFileJournal {
253    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
254        self.rx.read()
255    }
256
257    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
258        self.rx.as_restarted()
259    }
260}
261
262impl WritableJournal for CompactingLogFileJournal {
263    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
264        self.tx.write(entry)
265    }
266
267    fn flush(&self) -> anyhow::Result<()> {
268        self.tx.flush()
269    }
270
271    fn commit(&self) -> anyhow::Result<usize> {
272        self.tx.commit()
273    }
274
275    fn rollback(&self) -> anyhow::Result<usize> {
276        self.tx.rollback()
277    }
278}
279
280impl Journal for CompactingLogFileJournal {
281    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
282        (Box::new(self.tx), Box::new(self.rx))
283    }
284}