1use bytes::Bytes;
2use shared_buffer::OwnedBuffer;
3use std::{
4 cmp,
5 fs::File,
6 io,
7 ops::Range,
8 sync::{Arc, Mutex, MutexGuard},
9};
10
11use crate::limiter::DynFsMemoryLimiter;
12
13#[derive(Debug)]
14pub enum FileExtent {
15 MmapOffload { offset: u64, size: u64 },
16 RepeatingBytes { value: u8, cnt: u64 },
17 InMemory { data: Bytes },
18}
19
20impl FileExtent {
21 pub fn size(&self) -> u64 {
22 match self {
23 FileExtent::MmapOffload { size, .. } => *size,
24 FileExtent::RepeatingBytes { cnt, .. } => *cnt,
25 FileExtent::InMemory { data } => data.len() as u64,
26 }
27 }
28
29 pub fn resize(&mut self, new_size: u64) {
30 match self {
31 FileExtent::MmapOffload { size, .. } => *size = new_size.min(*size),
32 FileExtent::RepeatingBytes { cnt, .. } => *cnt = new_size,
33 FileExtent::InMemory { data } => {
34 *data = data.slice(..(new_size as usize));
35 }
36 }
37 }
38}
39
40#[derive(Debug)]
41struct OffloadBackingStoreState {
42 mmap_file: Option<File>,
43 mmap_offload: OwnedBuffer,
44}
45
46impl OffloadBackingStoreState {
47 fn get_slice(&mut self, range: Range<u64>) -> io::Result<&[u8]> {
48 let offset = range.start;
49 let size = match range.end {
50 u64::MAX => {
51 let len = self.mmap_offload.len() as u64;
52 if len < offset {
53 tracing::trace!("range out of bounds {} vs {}", len, offset);
54 return Err(io::ErrorKind::UnexpectedEof.into());
55 }
56 len - offset
57 }
58 end => end - offset,
59 };
60
61 let end = offset + size;
62 if end > self.mmap_offload.len() as u64 {
63 let mmap_file = match self.mmap_file.as_ref() {
64 Some(a) => a,
65 None => {
66 tracing::trace!(
67 "mmap buffer out of bounds and no mmap file to reload {} vs {}",
68 end,
69 self.mmap_offload.len()
70 );
71 return Err(io::ErrorKind::UnexpectedEof.into());
72 }
73 };
74 self.mmap_offload = OwnedBuffer::from_file(mmap_file).map_err(io::Error::other)?;
75 if end > self.mmap_offload.len() as u64 {
76 tracing::trace!(
77 "mmap buffer out of bounds {} vs {} for {:?}",
78 end,
79 self.mmap_offload.len(),
80 range
81 );
82 return Err(io::ErrorKind::UnexpectedEof.into());
83 }
84 }
85 Ok(&self.mmap_offload[offset as usize..end as usize])
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct OffloadBackingStore {
91 state: Arc<Mutex<OffloadBackingStoreState>>,
92}
93
94impl OffloadBackingStore {
95 pub fn new(mmap_offload: OwnedBuffer, mmap_file: Option<File>) -> Self {
96 Self {
97 state: Arc::new(Mutex::new(OffloadBackingStoreState {
98 mmap_file,
99 mmap_offload,
100 })),
101 }
102 }
103
104 pub fn from_file(file: &File) -> Self {
105 let file = file.try_clone().unwrap();
106 let buffer = OwnedBuffer::from_file(&file).unwrap();
107 Self::new(buffer, Some(file))
108 }
109
110 pub fn from_buffer(buffer: OwnedBuffer) -> Self {
111 Self::new(buffer, None)
112 }
113
114 pub fn owned_buffer(&self) -> OwnedBuffer {
115 let guard = self.state.lock().unwrap();
116 guard.mmap_offload.clone()
117 }
118
119 fn lock(&self) -> MutexGuard<'_, OffloadBackingStoreState> {
120 self.state.lock().unwrap()
121 }
122}
123
124#[derive(Debug)]
125pub struct OffloadedFile {
126 backing: OffloadBackingStore,
127 #[allow(dead_code)]
128 limiter: Option<DynFsMemoryLimiter>,
129 extents: Vec<FileExtent>,
130 size: u64,
131}
132
133pub enum OffloadWrite<'a> {
134 MmapOffset { offset: u64, size: u64 },
135 Buffer(&'a [u8]),
136}
137
138impl OffloadWrite<'_> {
139 fn len(&self) -> usize {
140 match self {
141 OffloadWrite::MmapOffset { size, .. } => *size as usize,
142 OffloadWrite::Buffer(data) => data.len(),
143 }
144 }
145}
146
147impl OffloadedFile {
148 pub fn new(limiter: Option<DynFsMemoryLimiter>, backing: OffloadBackingStore) -> Self {
149 Self {
150 backing,
151 limiter,
152 extents: Vec::new(),
153 size: 0,
154 }
155 }
156
157 pub fn seek(&self, position: io::SeekFrom, cursor: &mut u64) -> io::Result<u64> {
158 let to_err = |_| io::ErrorKind::InvalidInput;
159
160 let next_cursor: i64 = match position {
162 io::SeekFrom::Start(offset) => offset.try_into().map_err(to_err)?,
163 io::SeekFrom::End(offset) => self.len() as i64 + offset,
164 io::SeekFrom::Current(offset) => {
165 TryInto::<i64>::try_into(*cursor).map_err(to_err)? + offset
166 }
167 };
168
169 if next_cursor < 0 {
171 return Err(io::Error::new(
172 io::ErrorKind::InvalidInput,
173 "seeking before the byte 0",
174 ));
175 }
176
177 let next_cursor = next_cursor.try_into().map_err(to_err)?;
178 *cursor = next_cursor;
179 Ok(*cursor)
180 }
181
182 pub fn read(&self, mut buf: &mut [u8], cursor: &mut u64) -> io::Result<usize> {
183 let cursor_start = *cursor;
184
185 let mut extent_offset = cursor_start;
186 let mut extent_index = 0usize;
187 while !buf.is_empty() && extent_index < self.extents.len() {
188 let extent = &self.extents[extent_index];
189
190 if extent_offset >= extent.size() {
191 extent_offset -= extent.size();
192 extent_index += 1;
193 continue;
194 }
195
196 let read = match extent {
197 FileExtent::MmapOffload {
198 offset: mmap_offset,
199 size: extent_size,
200 } => {
201 let mut backing = self.backing.lock();
202 let mmap_offset_plus_extent = mmap_offset + extent_offset;
203 let data = backing.get_slice(
204 mmap_offset_plus_extent
205 ..(mmap_offset_plus_extent + *extent_size - extent_offset),
206 )?;
207 let data_len = cmp::min(buf.len(), data.len());
208 buf[..data_len].copy_from_slice(&data[..data_len]);
209 data_len
210 }
211 FileExtent::RepeatingBytes { value, cnt } => {
212 let cnt = cmp::min(buf.len() as u64, cnt - extent_offset) as usize;
213 buf[..cnt].iter_mut().for_each(|d| {
214 *d = *value;
215 });
216 cnt
217 }
218 FileExtent::InMemory { data } => {
219 let data = &data.as_ref()[extent_offset as usize..];
220 let data_len = cmp::min(buf.len(), data.len());
221 buf[..data_len].copy_from_slice(&data[..data_len]);
222 data_len
223 }
224 };
225
226 *cursor += read as u64;
227 extent_offset = 0;
228 extent_index += 1;
229 buf = &mut buf[read..];
230 }
231 Ok((*cursor - cursor_start) as usize)
232 }
233
234 pub fn write(&mut self, data: OffloadWrite<'_>, cursor: &mut u64) -> io::Result<usize> {
235 let original_extent_offset = *cursor;
236 let mut extent_offset = original_extent_offset;
237 let mut data_len = data.len() as u64;
238
239 let mut split_extents = |mut split_at: u64| {
242 let mut index = 0usize;
243 while split_at > 0 && index < self.extents.len() {
244 let extent = &mut self.extents[index];
245 if split_at >= extent.size() {
246 split_at -= extent.size();
247 index += 1;
248 continue;
249 } else if split_at == 0 {
250 break;
251 } else {
252 let new_extent = match extent {
253 FileExtent::MmapOffload {
254 offset: other_offset,
255 size: other_size,
256 } => FileExtent::MmapOffload {
257 offset: *other_offset + split_at,
258 size: *other_size - split_at,
259 },
260 FileExtent::RepeatingBytes {
261 value: other_value,
262 cnt: other_cnt,
263 } => FileExtent::RepeatingBytes {
264 value: *other_value,
265 cnt: *other_cnt - split_at,
266 },
267 FileExtent::InMemory { data: other_data } => FileExtent::InMemory {
268 data: other_data.slice((split_at as usize)..),
269 },
270 };
271 extent.resize(split_at);
272 self.extents.insert(index + 1, new_extent);
273 break;
274 }
275 }
276 };
277
278 let mut index = if extent_offset < self.size {
280 split_extents(extent_offset);
281 split_extents(extent_offset + data_len);
282
283 let mut index = 0usize;
286 while index < self.extents.len() {
287 let extent = &self.extents[index];
288 if extent_offset >= extent.size() {
289 extent_offset -= extent.size();
290 index += 1;
291 continue;
292 } else {
293 break;
294 }
295 }
296 while index < self.extents.len() {
297 let extent = &self.extents[index];
298 if data_len < extent.size() {
299 break;
300 }
301 data_len -= extent.size();
302 self.extents.remove(index);
303 }
304 index
305 } else {
306 self.extents.len()
307 };
308
309 if extent_offset > self.size {
311 self.extents.insert(
312 index,
313 FileExtent::RepeatingBytes {
314 value: 0,
315 cnt: extent_offset - self.size,
316 },
317 );
318 self.size = extent_offset;
319 index += 1;
320 }
321
322 match data {
324 OffloadWrite::MmapOffset { offset, size } => {
325 self.extents
326 .insert(index, FileExtent::MmapOffload { offset, size });
327 }
328 OffloadWrite::Buffer(data) => {
329 let data_start = data.as_ptr() as u64;
331 let data_end = data_start + data.len() as u64;
332 let mut backing = self.backing.lock();
333 let backing_data = backing.get_slice(0u64..u64::MAX)?;
334
335 let mmap_start = backing_data.as_ptr() as u64;
336 let mmap_end = mmap_start + backing_data.len() as u64;
337
338 let new_extent = if data_start >= mmap_start && data_end <= mmap_end {
341 FileExtent::MmapOffload {
342 offset: data_start - mmap_start,
343 size: data_end - data_start,
344 }
345 } else {
346 FileExtent::InMemory {
347 data: data.to_vec().into(),
348 }
349 };
350 self.extents.insert(index, new_extent);
351 }
352 }
353 self.size = self.size.max(original_extent_offset + data.len() as u64);
354
355 *cursor += data.len() as u64;
357 Ok(data.len())
358 }
359
360 pub fn flush(&mut self) -> io::Result<()> {
361 Ok(())
362 }
363
364 pub fn resize(&mut self, new_len: u64, value: u8) {
365 let mut cur_len = self.len();
366 if new_len > cur_len {
367 self.extents.push(FileExtent::RepeatingBytes {
368 value,
369 cnt: new_len - cur_len,
370 });
371 }
372 while cur_len > new_len && !self.extents.is_empty() {
373 let extent: &mut FileExtent = self.extents.last_mut().unwrap();
374 let diff = extent.size().min(cur_len - new_len);
375 extent.resize(extent.size() - diff);
376 cur_len -= diff;
377 if extent.size() == 0 {
378 self.extents.pop();
379 }
380 }
381 self.size = new_len;
382 }
383
384 pub fn len(&self) -> u64 {
385 self.size
386 }
387
388 pub fn truncate(&mut self) {
389 self.extents.clear();
390 self.size = 0;
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 #[tracing_test::traced_test]
400 pub fn test_offload_file() -> anyhow::Result<()> {
401 let buffer = OwnedBuffer::from_bytes(std::iter::repeat_n(12u8, 100).collect::<Vec<_>>());
402 let test_data2 = buffer.clone();
403
404 let backing = OffloadBackingStore::new(buffer, None);
405 let mut file = OffloadedFile::new(None, backing);
406
407 let mut cursor = 0u64;
408 let test_data = std::iter::repeat_n(56u8, 100).collect::<Vec<_>>();
409 file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
410
411 assert_eq!(file.len(), 100);
412
413 cursor = 0;
414 let mut result = std::iter::repeat_n(0u8, 100).collect::<Vec<_>>();
415 file.read(&mut result, &mut cursor)?;
416 assert_eq!(&result, &std::iter::repeat_n(56u8, 100).collect::<Vec<_>>());
417
418 cursor = 50;
419 file.write(OffloadWrite::Buffer(&test_data2), &mut cursor)?;
420
421 assert_eq!(file.len(), 150);
422
423 cursor = 0;
424 let mut result = std::iter::repeat_n(0u8, 150).collect::<Vec<_>>();
425 file.read(&mut result, &mut cursor)?;
426 assert_eq!(
427 &result,
428 &std::iter::repeat_n(56u8, 50)
429 .chain(std::iter::repeat_n(12u8, 100))
430 .collect::<Vec<_>>()
431 );
432
433 file.resize(200, 99u8);
434 assert_eq!(file.len(), 200);
435
436 cursor = 0;
437 let mut result = std::iter::repeat_n(0u8, 200).collect::<Vec<_>>();
438 file.read(&mut result, &mut cursor)?;
439 assert_eq!(
440 &result,
441 &std::iter::repeat_n(56u8, 50)
442 .chain(std::iter::repeat_n(12u8, 100))
443 .chain(std::iter::repeat_n(99u8, 50))
444 .collect::<Vec<_>>()
445 );
446
447 file.resize(33, 1u8);
448
449 cursor = 0;
450 let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
451 file.read(&mut result, &mut cursor)?;
452 assert_eq!(&result, &std::iter::repeat_n(56u8, 33).collect::<Vec<_>>());
453
454 let mut cursor = 10u64;
455 let test_data = std::iter::repeat_n(74u8, 10).collect::<Vec<_>>();
456 file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
457
458 assert_eq!(file.len(), 33);
459
460 cursor = 0;
461 let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
462 file.read(&mut result, &mut cursor)?;
463 assert_eq!(
464 &result,
465 &std::iter::repeat_n(56u8, 10)
466 .chain(std::iter::repeat_n(74u8, 10))
467 .chain(std::iter::repeat_n(56u8, 13))
468 .collect::<Vec<_>>()
469 );
470
471 Ok(())
472 }
473}