virtual_fs/mem_fs/
offloaded_file.rs

1use bytes::Bytes;
2use shared_buffer::OwnedBuffer;
3use std::{
4    cmp,
5    fs::File,
6    io,
7    ops::Range,
8    sync::{Arc, Mutex, MutexGuard},
9};
10
11use crate::limiter::DynFsMemoryLimiter;
12
13#[derive(Debug)]
14pub enum FileExtent {
15    MmapOffload { offset: u64, size: u64 },
16    RepeatingBytes { value: u8, cnt: u64 },
17    InMemory { data: Bytes },
18}
19
20impl FileExtent {
21    pub fn size(&self) -> u64 {
22        match self {
23            FileExtent::MmapOffload { size, .. } => *size,
24            FileExtent::RepeatingBytes { cnt, .. } => *cnt,
25            FileExtent::InMemory { data } => data.len() as u64,
26        }
27    }
28
29    pub fn resize(&mut self, new_size: u64) {
30        match self {
31            FileExtent::MmapOffload { size, .. } => *size = new_size.min(*size),
32            FileExtent::RepeatingBytes { cnt, .. } => *cnt = new_size,
33            FileExtent::InMemory { data } => {
34                *data = data.slice(..(new_size as usize));
35            }
36        }
37    }
38}
39
40#[derive(Debug)]
41struct OffloadBackingStoreState {
42    mmap_file: Option<File>,
43    mmap_offload: OwnedBuffer,
44}
45
46impl OffloadBackingStoreState {
47    fn get_slice(&mut self, range: Range<u64>) -> io::Result<&[u8]> {
48        let offset = range.start;
49        let size = match range.end {
50            u64::MAX => {
51                let len = self.mmap_offload.len() as u64;
52                if len < offset {
53                    tracing::trace!("range out of bounds {} vs {}", len, offset);
54                    return Err(io::ErrorKind::UnexpectedEof.into());
55                }
56                len - offset
57            }
58            end => end - offset,
59        };
60
61        let end = offset + size;
62        if end > self.mmap_offload.len() as u64 {
63            let mmap_file = match self.mmap_file.as_ref() {
64                Some(a) => a,
65                None => {
66                    tracing::trace!(
67                        "mmap buffer out of bounds and no mmap file to reload {} vs {}",
68                        end,
69                        self.mmap_offload.len()
70                    );
71                    return Err(io::ErrorKind::UnexpectedEof.into());
72                }
73            };
74            self.mmap_offload = OwnedBuffer::from_file(mmap_file).map_err(io::Error::other)?;
75            if end > self.mmap_offload.len() as u64 {
76                tracing::trace!(
77                    "mmap buffer out of bounds {} vs {} for {:?}",
78                    end,
79                    self.mmap_offload.len(),
80                    range
81                );
82                return Err(io::ErrorKind::UnexpectedEof.into());
83            }
84        }
85        Ok(&self.mmap_offload[offset as usize..end as usize])
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct OffloadBackingStore {
91    state: Arc<Mutex<OffloadBackingStoreState>>,
92}
93
94impl OffloadBackingStore {
95    pub fn new(mmap_offload: OwnedBuffer, mmap_file: Option<File>) -> Self {
96        Self {
97            state: Arc::new(Mutex::new(OffloadBackingStoreState {
98                mmap_file,
99                mmap_offload,
100            })),
101        }
102    }
103
104    pub fn from_file(file: &File) -> Self {
105        let file = file.try_clone().unwrap();
106        let buffer = OwnedBuffer::from_file(&file).unwrap();
107        Self::new(buffer, Some(file))
108    }
109
110    pub fn from_buffer(buffer: OwnedBuffer) -> Self {
111        Self::new(buffer, None)
112    }
113
114    pub fn owned_buffer(&self) -> OwnedBuffer {
115        let guard = self.state.lock().unwrap();
116        guard.mmap_offload.clone()
117    }
118
119    fn lock(&self) -> MutexGuard<'_, OffloadBackingStoreState> {
120        self.state.lock().unwrap()
121    }
122}
123
124#[derive(Debug)]
125pub struct OffloadedFile {
126    backing: OffloadBackingStore,
127    #[allow(dead_code)]
128    limiter: Option<DynFsMemoryLimiter>,
129    extents: Vec<FileExtent>,
130    size: u64,
131}
132
133pub enum OffloadWrite<'a> {
134    MmapOffset { offset: u64, size: u64 },
135    Buffer(&'a [u8]),
136}
137
138impl OffloadWrite<'_> {
139    fn len(&self) -> usize {
140        match self {
141            OffloadWrite::MmapOffset { size, .. } => *size as usize,
142            OffloadWrite::Buffer(data) => data.len(),
143        }
144    }
145}
146
147impl OffloadedFile {
148    pub fn new(limiter: Option<DynFsMemoryLimiter>, backing: OffloadBackingStore) -> Self {
149        Self {
150            backing,
151            limiter,
152            extents: Vec::new(),
153            size: 0,
154        }
155    }
156
157    pub fn seek(&self, position: io::SeekFrom, cursor: &mut u64) -> io::Result<u64> {
158        let to_err = |_| io::ErrorKind::InvalidInput;
159
160        // Calculate the next cursor.
161        let next_cursor: i64 = match position {
162            io::SeekFrom::Start(offset) => offset.try_into().map_err(to_err)?,
163            io::SeekFrom::End(offset) => self.len() as i64 + offset,
164            io::SeekFrom::Current(offset) => {
165                TryInto::<i64>::try_into(*cursor).map_err(to_err)? + offset
166            }
167        };
168
169        // It's an error to seek before byte 0.
170        if next_cursor < 0 {
171            return Err(io::Error::new(
172                io::ErrorKind::InvalidInput,
173                "seeking before the byte 0",
174            ));
175        }
176
177        let next_cursor = next_cursor.try_into().map_err(to_err)?;
178        *cursor = next_cursor;
179        Ok(*cursor)
180    }
181
182    pub fn read(&self, mut buf: &mut [u8], cursor: &mut u64) -> io::Result<usize> {
183        let cursor_start = *cursor;
184
185        let mut extent_offset = cursor_start;
186        let mut extent_index = 0usize;
187        while !buf.is_empty() && extent_index < self.extents.len() {
188            let extent = &self.extents[extent_index];
189
190            if extent_offset >= extent.size() {
191                extent_offset -= extent.size();
192                extent_index += 1;
193                continue;
194            }
195
196            let read = match extent {
197                FileExtent::MmapOffload {
198                    offset: mmap_offset,
199                    size: extent_size,
200                } => {
201                    let mut backing = self.backing.lock();
202                    let mmap_offset_plus_extent = mmap_offset + extent_offset;
203                    let data = backing.get_slice(
204                        mmap_offset_plus_extent
205                            ..(mmap_offset_plus_extent + *extent_size - extent_offset),
206                    )?;
207                    let data_len = cmp::min(buf.len(), data.len());
208                    buf[..data_len].copy_from_slice(&data[..data_len]);
209                    data_len
210                }
211                FileExtent::RepeatingBytes { value, cnt } => {
212                    let cnt = cmp::min(buf.len() as u64, cnt - extent_offset) as usize;
213                    buf[..cnt].iter_mut().for_each(|d| {
214                        *d = *value;
215                    });
216                    cnt
217                }
218                FileExtent::InMemory { data } => {
219                    let data = &data.as_ref()[extent_offset as usize..];
220                    let data_len = cmp::min(buf.len(), data.len());
221                    buf[..data_len].copy_from_slice(&data[..data_len]);
222                    data_len
223                }
224            };
225
226            *cursor += read as u64;
227            extent_offset = 0;
228            extent_index += 1;
229            buf = &mut buf[read..];
230        }
231        Ok((*cursor - cursor_start) as usize)
232    }
233
234    pub fn write(&mut self, data: OffloadWrite<'_>, cursor: &mut u64) -> io::Result<usize> {
235        let original_extent_offset = *cursor;
236        let mut extent_offset = original_extent_offset;
237        let mut data_len = data.len() as u64;
238
239        // We need to split any extents that are intersecting with the
240        // start or end of the new block of data we are about to write
241        let mut split_extents = |mut split_at: u64| {
242            let mut index = 0usize;
243            while split_at > 0 && index < self.extents.len() {
244                let extent = &mut self.extents[index];
245                if split_at >= extent.size() {
246                    split_at -= extent.size();
247                    index += 1;
248                    continue;
249                } else if split_at == 0 {
250                    break;
251                } else {
252                    let new_extent = match extent {
253                        FileExtent::MmapOffload {
254                            offset: other_offset,
255                            size: other_size,
256                        } => FileExtent::MmapOffload {
257                            offset: *other_offset + split_at,
258                            size: *other_size - split_at,
259                        },
260                        FileExtent::RepeatingBytes {
261                            value: other_value,
262                            cnt: other_cnt,
263                        } => FileExtent::RepeatingBytes {
264                            value: *other_value,
265                            cnt: *other_cnt - split_at,
266                        },
267                        FileExtent::InMemory { data: other_data } => FileExtent::InMemory {
268                            data: other_data.slice((split_at as usize)..),
269                        },
270                    };
271                    extent.resize(split_at);
272                    self.extents.insert(index + 1, new_extent);
273                    break;
274                }
275            }
276        };
277
278        // If the extent is below the actual size of the file then we need to split it
279        let mut index = if extent_offset < self.size {
280            split_extents(extent_offset);
281            split_extents(extent_offset + data_len);
282
283            // Now we delete all the extents that exist between the
284            // range that we are about to insert
285            let mut index = 0usize;
286            while index < self.extents.len() {
287                let extent = &self.extents[index];
288                if extent_offset >= extent.size() {
289                    extent_offset -= extent.size();
290                    index += 1;
291                    continue;
292                } else {
293                    break;
294                }
295            }
296            while index < self.extents.len() {
297                let extent = &self.extents[index];
298                if data_len < extent.size() {
299                    break;
300                }
301                data_len -= extent.size();
302                self.extents.remove(index);
303            }
304            index
305        } else {
306            self.extents.len()
307        };
308
309        // If we have a gap that needs to be filled then do so
310        if extent_offset > self.size {
311            self.extents.insert(
312                index,
313                FileExtent::RepeatingBytes {
314                    value: 0,
315                    cnt: extent_offset - self.size,
316                },
317            );
318            self.size = extent_offset;
319            index += 1;
320        }
321
322        // Insert the extent into the buffer
323        match data {
324            OffloadWrite::MmapOffset { offset, size } => {
325                self.extents
326                    .insert(index, FileExtent::MmapOffload { offset, size });
327            }
328            OffloadWrite::Buffer(data) => {
329                // Finally we add the new extent
330                let data_start = data.as_ptr() as u64;
331                let data_end = data_start + data.len() as u64;
332                let mut backing = self.backing.lock();
333                let backing_data = backing.get_slice(0u64..u64::MAX)?;
334
335                let mmap_start = backing_data.as_ptr() as u64;
336                let mmap_end = mmap_start + backing_data.len() as u64;
337
338                // If the data is within the mmap buffer then we use a extent range
339                // to represent the data, otherwise we fall back on copying the data
340                let new_extent = if data_start >= mmap_start && data_end <= mmap_end {
341                    FileExtent::MmapOffload {
342                        offset: data_start - mmap_start,
343                        size: data_end - data_start,
344                    }
345                } else {
346                    FileExtent::InMemory {
347                        data: data.to_vec().into(),
348                    }
349                };
350                self.extents.insert(index, new_extent);
351            }
352        }
353        self.size = self.size.max(original_extent_offset + data.len() as u64);
354
355        // Update the cursor
356        *cursor += data.len() as u64;
357        Ok(data.len())
358    }
359
360    pub fn flush(&mut self) -> io::Result<()> {
361        Ok(())
362    }
363
364    pub fn resize(&mut self, new_len: u64, value: u8) {
365        let mut cur_len = self.len();
366        if new_len > cur_len {
367            self.extents.push(FileExtent::RepeatingBytes {
368                value,
369                cnt: new_len - cur_len,
370            });
371        }
372        while cur_len > new_len && !self.extents.is_empty() {
373            let extent: &mut FileExtent = self.extents.last_mut().unwrap();
374            let diff = extent.size().min(cur_len - new_len);
375            extent.resize(extent.size() - diff);
376            cur_len -= diff;
377            if extent.size() == 0 {
378                self.extents.pop();
379            }
380        }
381        self.size = new_len;
382    }
383
384    pub fn len(&self) -> u64 {
385        self.size
386    }
387
388    pub fn truncate(&mut self) {
389        self.extents.clear();
390        self.size = 0;
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    #[tracing_test::traced_test]
400    pub fn test_offload_file() -> anyhow::Result<()> {
401        let buffer = OwnedBuffer::from_bytes(std::iter::repeat_n(12u8, 100).collect::<Vec<_>>());
402        let test_data2 = buffer.clone();
403
404        let backing = OffloadBackingStore::new(buffer, None);
405        let mut file = OffloadedFile::new(None, backing);
406
407        let mut cursor = 0u64;
408        let test_data = std::iter::repeat_n(56u8, 100).collect::<Vec<_>>();
409        file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
410
411        assert_eq!(file.len(), 100);
412
413        cursor = 0;
414        let mut result = std::iter::repeat_n(0u8, 100).collect::<Vec<_>>();
415        file.read(&mut result, &mut cursor)?;
416        assert_eq!(&result, &std::iter::repeat_n(56u8, 100).collect::<Vec<_>>());
417
418        cursor = 50;
419        file.write(OffloadWrite::Buffer(&test_data2), &mut cursor)?;
420
421        assert_eq!(file.len(), 150);
422
423        cursor = 0;
424        let mut result = std::iter::repeat_n(0u8, 150).collect::<Vec<_>>();
425        file.read(&mut result, &mut cursor)?;
426        assert_eq!(
427            &result,
428            &std::iter::repeat_n(56u8, 50)
429                .chain(std::iter::repeat_n(12u8, 100))
430                .collect::<Vec<_>>()
431        );
432
433        file.resize(200, 99u8);
434        assert_eq!(file.len(), 200);
435
436        cursor = 0;
437        let mut result = std::iter::repeat_n(0u8, 200).collect::<Vec<_>>();
438        file.read(&mut result, &mut cursor)?;
439        assert_eq!(
440            &result,
441            &std::iter::repeat_n(56u8, 50)
442                .chain(std::iter::repeat_n(12u8, 100))
443                .chain(std::iter::repeat_n(99u8, 50))
444                .collect::<Vec<_>>()
445        );
446
447        file.resize(33, 1u8);
448
449        cursor = 0;
450        let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
451        file.read(&mut result, &mut cursor)?;
452        assert_eq!(&result, &std::iter::repeat_n(56u8, 33).collect::<Vec<_>>());
453
454        let mut cursor = 10u64;
455        let test_data = std::iter::repeat_n(74u8, 10).collect::<Vec<_>>();
456        file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
457
458        assert_eq!(file.len(), 33);
459
460        cursor = 0;
461        let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
462        file.read(&mut result, &mut cursor)?;
463        assert_eq!(
464            &result,
465            &std::iter::repeat_n(56u8, 10)
466                .chain(std::iter::repeat_n(74u8, 10))
467                .chain(std::iter::repeat_n(56u8, 13))
468                .collect::<Vec<_>>()
469        );
470
471        Ok(())
472    }
473}