virtual_fs/mem_fs/
offloaded_file.rs

1use bytes::Bytes;
2use shared_buffer::OwnedBuffer;
3use std::{
4    cmp,
5    fs::File,
6    io,
7    ops::Range,
8    sync::{Arc, Mutex, MutexGuard},
9};
10
11use crate::limiter::DynFsMemoryLimiter;
12
13#[derive(Debug)]
14pub enum FileExtent {
15    MmapOffload { offset: u64, size: u64 },
16    RepeatingBytes { value: u8, cnt: u64 },
17    InMemory { data: Bytes },
18}
19
20impl FileExtent {
21    pub fn size(&self) -> u64 {
22        match self {
23            FileExtent::MmapOffload { size, .. } => *size,
24            FileExtent::RepeatingBytes { cnt, .. } => *cnt,
25            FileExtent::InMemory { data } => data.len() as u64,
26        }
27    }
28
29    pub fn resize(&mut self, new_size: u64) {
30        match self {
31            FileExtent::MmapOffload { size, .. } => *size = new_size.min(*size),
32            FileExtent::RepeatingBytes { cnt, .. } => *cnt = new_size,
33            FileExtent::InMemory { data } => {
34                *data = data.slice(..(new_size as usize));
35            }
36        }
37    }
38}
39
40#[derive(Debug)]
41struct OffloadBackingStoreState {
42    mmap_file: Option<File>,
43    mmap_offload: OwnedBuffer,
44}
45
46impl OffloadBackingStoreState {
47    fn get_slice(&mut self, range: Range<u64>) -> io::Result<&[u8]> {
48        let offset = range.start;
49        let size = match range.end {
50            u64::MAX => {
51                let len = self.mmap_offload.len() as u64;
52                if len < offset {
53                    tracing::trace!("range out of bounds {} vs {}", len, offset);
54                    return Err(io::ErrorKind::UnexpectedEof.into());
55                }
56                len - offset
57            }
58            end => end - offset,
59        };
60
61        let end = offset + size;
62        if end > self.mmap_offload.len() as u64 {
63            let mmap_file = match self.mmap_file.as_ref() {
64                Some(a) => a,
65                None => {
66                    tracing::trace!(
67                        "mmap buffer out of bounds and no mmap file to reload {} vs {}",
68                        end,
69                        self.mmap_offload.len()
70                    );
71                    return Err(io::ErrorKind::UnexpectedEof.into());
72                }
73            };
74            self.mmap_offload = OwnedBuffer::from_file(mmap_file).map_err(io::Error::other)?;
75            if end > self.mmap_offload.len() as u64 {
76                tracing::trace!(
77                    "mmap buffer out of bounds {} vs {} for {:?}",
78                    end,
79                    self.mmap_offload.len(),
80                    range
81                );
82                return Err(io::ErrorKind::UnexpectedEof.into());
83            }
84        }
85        Ok(&self.mmap_offload[offset as usize..end as usize])
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct OffloadBackingStore {
91    state: Arc<Mutex<OffloadBackingStoreState>>,
92}
93
94impl OffloadBackingStore {
95    pub fn new(mmap_offload: OwnedBuffer, mmap_file: Option<File>) -> Self {
96        Self {
97            state: Arc::new(Mutex::new(OffloadBackingStoreState {
98                mmap_file,
99                mmap_offload,
100            })),
101        }
102    }
103
104    pub fn from_file(file: &File) -> Self {
105        let file = file.try_clone().unwrap();
106        let buffer = OwnedBuffer::from_file(&file).unwrap();
107        Self::new(buffer, Some(file))
108    }
109
110    pub fn from_buffer(buffer: OwnedBuffer) -> Self {
111        Self::new(buffer, None)
112    }
113
114    pub fn owned_buffer(&self) -> OwnedBuffer {
115        let guard = self.state.lock().unwrap();
116        guard.mmap_offload.clone()
117    }
118
119    fn lock(&self) -> MutexGuard<'_, OffloadBackingStoreState> {
120        self.state.lock().unwrap()
121    }
122}
123
124#[derive(Debug)]
125pub struct OffloadedFile {
126    backing: OffloadBackingStore,
127    #[allow(dead_code)]
128    limiter: Option<DynFsMemoryLimiter>,
129    extents: Vec<FileExtent>,
130    size: u64,
131}
132
133pub enum OffloadWrite<'a> {
134    MmapOffset { offset: u64, size: u64 },
135    Buffer(&'a [u8]),
136}
137
138impl OffloadWrite<'_> {
139    fn len(&self) -> usize {
140        match self {
141            OffloadWrite::MmapOffset { size, .. } => *size as usize,
142            OffloadWrite::Buffer(data) => data.len(),
143        }
144    }
145}
146
147impl OffloadedFile {
148    pub fn new(limiter: Option<DynFsMemoryLimiter>, backing: OffloadBackingStore) -> Self {
149        Self {
150            backing,
151            limiter,
152            extents: Vec::new(),
153            size: 0,
154        }
155    }
156
157    pub fn seek(&self, position: io::SeekFrom, cursor: &mut u64) -> io::Result<u64> {
158        let to_err = |_| io::ErrorKind::InvalidInput;
159
160        // Calculate the next cursor.
161        let next_cursor: i64 = match position {
162            io::SeekFrom::Start(offset) => offset.try_into().map_err(to_err)?,
163            io::SeekFrom::End(offset) => self.len() as i64 + offset,
164            io::SeekFrom::Current(offset) => {
165                TryInto::<i64>::try_into(*cursor).map_err(to_err)? + offset
166            }
167        };
168
169        // It's an error to seek before byte 0.
170        if next_cursor < 0 {
171            return Err(io::Error::new(
172                io::ErrorKind::InvalidInput,
173                "seeking before the byte 0",
174            ));
175        }
176
177        // In this implementation, it's an error to seek beyond the
178        // end of the buffer.
179        let next_cursor = next_cursor.try_into().map_err(to_err)?;
180        *cursor = cmp::min(self.len(), next_cursor);
181        Ok(*cursor)
182    }
183
184    pub fn read(&self, mut buf: &mut [u8], cursor: &mut u64) -> io::Result<usize> {
185        let cursor_start = *cursor;
186
187        let mut extent_offset = cursor_start;
188        let mut extent_index = 0usize;
189        while !buf.is_empty() && extent_index < self.extents.len() {
190            let extent = &self.extents[extent_index];
191
192            if extent_offset >= extent.size() {
193                extent_offset -= extent.size();
194                extent_index += 1;
195                continue;
196            }
197
198            let read = match extent {
199                FileExtent::MmapOffload {
200                    offset: mmap_offset,
201                    size: extent_size,
202                } => {
203                    let mut backing = self.backing.lock();
204                    let mmap_offset_plus_extent = mmap_offset + extent_offset;
205                    let data = backing.get_slice(
206                        mmap_offset_plus_extent
207                            ..(mmap_offset_plus_extent + *extent_size - extent_offset),
208                    )?;
209                    let data_len = cmp::min(buf.len(), data.len());
210                    buf[..data_len].copy_from_slice(&data[..data_len]);
211                    data_len
212                }
213                FileExtent::RepeatingBytes { value, cnt } => {
214                    let cnt = cmp::min(buf.len() as u64, cnt - extent_offset) as usize;
215                    buf[..cnt].iter_mut().for_each(|d| {
216                        *d = *value;
217                    });
218                    cnt
219                }
220                FileExtent::InMemory { data } => {
221                    let data = &data.as_ref()[extent_offset as usize..];
222                    let data_len = cmp::min(buf.len(), data.len());
223                    buf[..data_len].copy_from_slice(&data[..data_len]);
224                    data_len
225                }
226            };
227
228            *cursor += read as u64;
229            extent_offset = 0;
230            extent_index += 1;
231            buf = &mut buf[read..];
232        }
233        Ok((*cursor - cursor_start) as usize)
234    }
235
236    pub fn write(&mut self, data: OffloadWrite<'_>, cursor: &mut u64) -> io::Result<usize> {
237        let original_extent_offset = *cursor;
238        let mut extent_offset = original_extent_offset;
239        let mut data_len = data.len() as u64;
240
241        // We need to split any extents that are intersecting with the
242        // start or end of the new block of data we are about to write
243        let mut split_extents = |mut split_at: u64| {
244            let mut index = 0usize;
245            while split_at > 0 && index < self.extents.len() {
246                let extent = &mut self.extents[index];
247                if split_at >= extent.size() {
248                    split_at -= extent.size();
249                    index += 1;
250                    continue;
251                } else if split_at == 0 {
252                    break;
253                } else {
254                    let new_extent = match extent {
255                        FileExtent::MmapOffload {
256                            offset: other_offset,
257                            size: other_size,
258                        } => FileExtent::MmapOffload {
259                            offset: *other_offset + split_at,
260                            size: *other_size - split_at,
261                        },
262                        FileExtent::RepeatingBytes {
263                            value: other_value,
264                            cnt: other_cnt,
265                        } => FileExtent::RepeatingBytes {
266                            value: *other_value,
267                            cnt: *other_cnt - split_at,
268                        },
269                        FileExtent::InMemory { data: other_data } => FileExtent::InMemory {
270                            data: other_data.slice((split_at as usize)..),
271                        },
272                    };
273                    extent.resize(split_at);
274                    self.extents.insert(index + 1, new_extent);
275                    break;
276                }
277            }
278        };
279
280        // If the extent is below the actual size of the file then we need to split it
281        let mut index = if extent_offset < self.size {
282            split_extents(extent_offset);
283            split_extents(extent_offset + data_len);
284
285            // Now we delete all the extents that exist between the
286            // range that we are about to insert
287            let mut index = 0usize;
288            while index < self.extents.len() {
289                let extent = &self.extents[index];
290                if extent_offset >= extent.size() {
291                    extent_offset -= extent.size();
292                    index += 1;
293                    continue;
294                } else {
295                    break;
296                }
297            }
298            while index < self.extents.len() {
299                let extent = &self.extents[index];
300                if data_len < extent.size() {
301                    break;
302                }
303                data_len -= extent.size();
304                self.extents.remove(index);
305            }
306            index
307        } else {
308            self.extents.len()
309        };
310
311        // If we have a gap that needs to be filled then do so
312        if extent_offset > self.size {
313            self.extents.insert(
314                index,
315                FileExtent::RepeatingBytes {
316                    value: 0,
317                    cnt: extent_offset - self.size,
318                },
319            );
320            self.size = extent_offset;
321            index += 1;
322        }
323
324        // Insert the extent into the buffer
325        match data {
326            OffloadWrite::MmapOffset { offset, size } => {
327                self.extents
328                    .insert(index, FileExtent::MmapOffload { offset, size });
329            }
330            OffloadWrite::Buffer(data) => {
331                // Finally we add the new extent
332                let data_start = data.as_ptr() as u64;
333                let data_end = data_start + data.len() as u64;
334                let mut backing = self.backing.lock();
335                let backing_data = backing.get_slice(0u64..u64::MAX)?;
336
337                let mmap_start = backing_data.as_ptr() as u64;
338                let mmap_end = mmap_start + backing_data.len() as u64;
339
340                // If the data is within the mmap buffer then we use a extent range
341                // to represent the data, otherwise we fall back on copying the data
342                let new_extent = if data_start >= mmap_start && data_end <= mmap_end {
343                    FileExtent::MmapOffload {
344                        offset: data_start - mmap_start,
345                        size: data_end - data_start,
346                    }
347                } else {
348                    FileExtent::InMemory {
349                        data: data.to_vec().into(),
350                    }
351                };
352                self.extents.insert(index, new_extent);
353            }
354        }
355        self.size = self.size.max(original_extent_offset + data.len() as u64);
356
357        // Update the cursor
358        *cursor += data.len() as u64;
359        Ok(data.len())
360    }
361
362    pub fn flush(&mut self) -> io::Result<()> {
363        Ok(())
364    }
365
366    pub fn resize(&mut self, new_len: u64, value: u8) {
367        let mut cur_len = self.len();
368        if new_len > cur_len {
369            self.extents.push(FileExtent::RepeatingBytes {
370                value,
371                cnt: new_len - cur_len,
372            });
373        }
374        while cur_len > new_len && !self.extents.is_empty() {
375            let extent: &mut FileExtent = self.extents.last_mut().unwrap();
376            let diff = extent.size().min(cur_len - new_len);
377            extent.resize(extent.size() - diff);
378            cur_len -= diff;
379            if extent.size() == 0 {
380                self.extents.pop();
381            }
382        }
383        self.size = new_len;
384    }
385
386    pub fn len(&self) -> u64 {
387        self.size
388    }
389
390    pub fn truncate(&mut self) {
391        self.extents.clear();
392        self.size = 0;
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[test]
401    #[tracing_test::traced_test]
402    pub fn test_offload_file() -> anyhow::Result<()> {
403        let buffer = OwnedBuffer::from_bytes(std::iter::repeat_n(12u8, 100).collect::<Vec<_>>());
404        let test_data2 = buffer.clone();
405
406        let backing = OffloadBackingStore::new(buffer, None);
407        let mut file = OffloadedFile::new(None, backing);
408
409        let mut cursor = 0u64;
410        let test_data = std::iter::repeat_n(56u8, 100).collect::<Vec<_>>();
411        file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
412
413        assert_eq!(file.len(), 100);
414
415        cursor = 0;
416        let mut result = std::iter::repeat_n(0u8, 100).collect::<Vec<_>>();
417        file.read(&mut result, &mut cursor)?;
418        assert_eq!(&result, &std::iter::repeat_n(56u8, 100).collect::<Vec<_>>());
419
420        cursor = 50;
421        file.write(OffloadWrite::Buffer(&test_data2), &mut cursor)?;
422
423        assert_eq!(file.len(), 150);
424
425        cursor = 0;
426        let mut result = std::iter::repeat_n(0u8, 150).collect::<Vec<_>>();
427        file.read(&mut result, &mut cursor)?;
428        assert_eq!(
429            &result,
430            &std::iter::repeat_n(56u8, 50)
431                .chain(std::iter::repeat_n(12u8, 100))
432                .collect::<Vec<_>>()
433        );
434
435        file.resize(200, 99u8);
436        assert_eq!(file.len(), 200);
437
438        cursor = 0;
439        let mut result = std::iter::repeat_n(0u8, 200).collect::<Vec<_>>();
440        file.read(&mut result, &mut cursor)?;
441        assert_eq!(
442            &result,
443            &std::iter::repeat_n(56u8, 50)
444                .chain(std::iter::repeat_n(12u8, 100))
445                .chain(std::iter::repeat_n(99u8, 50))
446                .collect::<Vec<_>>()
447        );
448
449        file.resize(33, 1u8);
450
451        cursor = 0;
452        let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
453        file.read(&mut result, &mut cursor)?;
454        assert_eq!(&result, &std::iter::repeat_n(56u8, 33).collect::<Vec<_>>());
455
456        let mut cursor = 10u64;
457        let test_data = std::iter::repeat_n(74u8, 10).collect::<Vec<_>>();
458        file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
459
460        assert_eq!(file.len(), 33);
461
462        cursor = 0;
463        let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
464        file.read(&mut result, &mut cursor)?;
465        assert_eq!(
466            &result,
467            &std::iter::repeat_n(56u8, 10)
468                .chain(std::iter::repeat_n(74u8, 10))
469                .chain(std::iter::repeat_n(56u8, 13))
470                .collect::<Vec<_>>()
471        );
472
473        Ok(())
474    }
475}