wasmer_journal/concrete/
compacting_log_file.rs1use std::{
2 path::{Path, PathBuf},
3 sync::{Arc, Mutex},
4};
5
6use super::*;
7
8#[derive(Debug)]
9struct State {
10 on_n_records: Option<u64>,
11 on_n_size: Option<u64>,
12 on_factor_size: Option<f32>,
13 on_drop: bool,
14 cnt_records: u64,
15 cnt_size: u64,
16 ref_size: u64,
17}
18
19#[derive(Debug)]
20pub struct CompactingLogFileJournal {
21 tx: CompactingLogFileJournalTx,
22 rx: CompactingLogFileJournalRx,
23}
24
25#[derive(Debug)]
26pub struct CompactingLogFileJournalTx {
27 state: Arc<Mutex<State>>,
28 inner: CompactingJournalTx,
29 main_path: PathBuf,
30 temp_path: PathBuf,
31}
32
33#[derive(Debug)]
34pub struct CompactingLogFileJournalRx {
35 #[allow(dead_code)]
36 state: Arc<Mutex<State>>,
37 inner: CompactingJournalRx,
38}
39
40impl CompactingLogFileJournalRx {
41 pub fn swap_inner(&mut self, with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
42 self.inner.swap_inner(with)
43 }
44}
45
46impl CompactingLogFileJournal {
47 pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
48 let counting = CountingJournal::default();
51 let mut compacting = CompactingJournal::new(counting.clone())?;
52
53 let log_file = LogFileJournal::new(path.as_ref())?;
56 copy_journal(&log_file, &compacting)?;
57
58 compacting.replace_inner(log_file);
61 let (tx, rx) = compacting.into_split();
62
63 let mut temp_filename = path
64 .as_ref()
65 .file_name()
66 .ok_or_else(|| {
67 anyhow::format_err!(
68 "The path is not a valid filename - {}",
69 path.as_ref().to_string_lossy()
70 )
71 })?
72 .to_string_lossy()
73 .to_string();
74 temp_filename.insert_str(0, ".compacting.");
75 let temp_path = path.as_ref().with_file_name(&temp_filename);
76
77 let state = Arc::new(Mutex::new(State {
78 on_drop: false,
79 on_n_records: None,
80 on_n_size: None,
81 on_factor_size: None,
82 cnt_records: 0,
83 cnt_size: 0,
84 ref_size: counting.size(),
85 }));
86 let tx = CompactingLogFileJournalTx {
87 state: state.clone(),
88 inner: tx,
89 main_path: path.as_ref().to_path_buf(),
90 temp_path,
91 };
92 let rx = CompactingLogFileJournalRx { state, inner: rx };
93
94 Ok(Self { tx, rx })
95 }
96
97 pub fn compact_now(&mut self) -> anyhow::Result<CompactResult> {
98 let (result, new_rx) = self.tx.compact_now()?;
99 self.rx.inner = new_rx;
100 Ok(result)
101 }
102
103 pub fn with_compact_on_drop(self) -> Self {
104 self.tx.state.lock().unwrap().on_drop = true;
105 self
106 }
107
108 pub fn with_compact_on_n_records(self, n_records: u64) -> Self {
109 self.tx
110 .state
111 .lock()
112 .unwrap()
113 .on_n_records
114 .replace(n_records);
115 self
116 }
117
118 pub fn with_compact_on_n_size(self, n_size: u64) -> Self {
119 self.tx.state.lock().unwrap().on_n_size.replace(n_size);
120 self
121 }
122
123 pub fn with_compact_on_factor_size(self, factor_size: f32) -> Self {
124 self.tx
125 .state
126 .lock()
127 .unwrap()
128 .on_factor_size
129 .replace(factor_size);
130 self
131 }
132}
133
134impl CompactingLogFileJournalTx {
135 pub fn compact_now(&self) -> anyhow::Result<(CompactResult, CompactingJournalRx)> {
136 self.reset_counters();
138
139 std::fs::remove_file(&self.temp_path).ok();
141 let target = LogFileJournal::new(self.temp_path.clone())?;
142
143 let result = self.inner.compact_to(target)?;
145 std::fs::rename(&self.temp_path, &self.main_path)?;
146
147 let target = LogFileJournal::new(self.main_path.clone())?;
151
152 let counting = CountingJournal::default();
155 let mut compacting = CompactingJournal::new(counting)?;
156 copy_journal(&target, &compacting)?;
157
158 compacting.replace_inner(target);
161 let (tx, rx) = compacting.into_split();
162 self.inner.swap(tx);
163
164 {
166 let mut state = self.state.lock().unwrap();
167 state.ref_size = result.total_size;
168 }
169
170 Ok((result, rx))
171 }
172
173 pub fn reset_counters(&self) {
174 let mut state = self.state.lock().unwrap();
175 state.cnt_records = 0;
176 state.cnt_size = 0;
177 }
178}
179
180impl Drop for CompactingLogFileJournalTx {
181 fn drop(&mut self) {
182 let triggered = self.state.lock().unwrap().on_drop;
183 if triggered && let Err(err) = self.compact_now() {
184 tracing::error!("failed to compact log - {}", err);
185 }
186 }
187}
188
189impl ReadableJournal for CompactingLogFileJournalRx {
190 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
191 self.inner.read()
192 }
193
194 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
195 self.inner.as_restarted()
196 }
197}
198
199impl WritableJournal for CompactingLogFileJournalTx {
200 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
201 let res = self.inner.write(entry)?;
202
203 let triggered = {
204 let mut state = self.state.lock().unwrap();
205 if res.record_size() > 0 {
206 state.cnt_records += 1;
207 state.cnt_size += res.record_size();
208 }
209
210 let mut triggered = false;
211 if let Some(on) = state.on_n_records.as_ref()
212 && state.cnt_records >= *on
213 {
214 triggered = true;
215 }
216 if let Some(on) = state.on_n_size.as_ref()
217 && state.cnt_size >= *on
218 {
219 triggered = true;
220 }
221
222 if let Some(factor) = state.on_factor_size.as_ref() {
223 let next_ref = (*factor * state.ref_size as f32) as u64;
224 if state.cnt_size > next_ref {
225 triggered = true;
226 }
227 }
228
229 triggered
230 };
231
232 if triggered {
233 self.compact_now()?;
234 }
235
236 Ok(res)
237 }
238
239 fn flush(&self) -> anyhow::Result<()> {
240 self.inner.flush()
241 }
242
243 fn commit(&self) -> anyhow::Result<usize> {
244 self.inner.commit()
245 }
246
247 fn rollback(&self) -> anyhow::Result<usize> {
248 self.inner.rollback()
249 }
250}
251
252impl ReadableJournal for CompactingLogFileJournal {
253 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
254 self.rx.read()
255 }
256
257 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
258 self.rx.as_restarted()
259 }
260}
261
262impl WritableJournal for CompactingLogFileJournal {
263 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
264 self.tx.write(entry)
265 }
266
267 fn flush(&self) -> anyhow::Result<()> {
268 self.tx.flush()
269 }
270
271 fn commit(&self) -> anyhow::Result<usize> {
272 self.tx.commit()
273 }
274
275 fn rollback(&self) -> anyhow::Result<usize> {
276 self.tx.rollback()
277 }
278}
279
280impl Journal for CompactingLogFileJournal {
281 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
282 (Box::new(self.tx), Box::new(self.rx))
283 }
284}