1use bytes::Bytes;
2use shared_buffer::OwnedBuffer;
3use std::{
4 cmp,
5 fs::File,
6 io,
7 ops::Range,
8 sync::{Arc, Mutex, MutexGuard},
9};
10
11use crate::limiter::DynFsMemoryLimiter;
12
13#[derive(Debug)]
14pub enum FileExtent {
15 MmapOffload { offset: u64, size: u64 },
16 RepeatingBytes { value: u8, cnt: u64 },
17 InMemory { data: Bytes },
18}
19
20impl FileExtent {
21 pub fn size(&self) -> u64 {
22 match self {
23 FileExtent::MmapOffload { size, .. } => *size,
24 FileExtent::RepeatingBytes { cnt, .. } => *cnt,
25 FileExtent::InMemory { data } => data.len() as u64,
26 }
27 }
28
29 pub fn resize(&mut self, new_size: u64) {
30 match self {
31 FileExtent::MmapOffload { size, .. } => *size = new_size.min(*size),
32 FileExtent::RepeatingBytes { cnt, .. } => *cnt = new_size,
33 FileExtent::InMemory { data } => {
34 *data = data.slice(..(new_size as usize));
35 }
36 }
37 }
38}
39
40#[derive(Debug)]
41struct OffloadBackingStoreState {
42 mmap_file: Option<File>,
43 mmap_offload: OwnedBuffer,
44}
45
46impl OffloadBackingStoreState {
47 fn get_slice(&mut self, range: Range<u64>) -> io::Result<&[u8]> {
48 let offset = range.start;
49 let size = match range.end {
50 u64::MAX => {
51 let len = self.mmap_offload.len() as u64;
52 if len < offset {
53 tracing::trace!("range out of bounds {} vs {}", len, offset);
54 return Err(io::ErrorKind::UnexpectedEof.into());
55 }
56 len - offset
57 }
58 end => end - offset,
59 };
60
61 let end = offset + size;
62 if end > self.mmap_offload.len() as u64 {
63 let mmap_file = match self.mmap_file.as_ref() {
64 Some(a) => a,
65 None => {
66 tracing::trace!(
67 "mmap buffer out of bounds and no mmap file to reload {} vs {}",
68 end,
69 self.mmap_offload.len()
70 );
71 return Err(io::ErrorKind::UnexpectedEof.into());
72 }
73 };
74 self.mmap_offload = OwnedBuffer::from_file(mmap_file).map_err(io::Error::other)?;
75 if end > self.mmap_offload.len() as u64 {
76 tracing::trace!(
77 "mmap buffer out of bounds {} vs {} for {:?}",
78 end,
79 self.mmap_offload.len(),
80 range
81 );
82 return Err(io::ErrorKind::UnexpectedEof.into());
83 }
84 }
85 Ok(&self.mmap_offload[offset as usize..end as usize])
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct OffloadBackingStore {
91 state: Arc<Mutex<OffloadBackingStoreState>>,
92}
93
94impl OffloadBackingStore {
95 pub fn new(mmap_offload: OwnedBuffer, mmap_file: Option<File>) -> Self {
96 Self {
97 state: Arc::new(Mutex::new(OffloadBackingStoreState {
98 mmap_file,
99 mmap_offload,
100 })),
101 }
102 }
103
104 pub fn from_file(file: &File) -> Self {
105 let file = file.try_clone().unwrap();
106 let buffer = OwnedBuffer::from_file(&file).unwrap();
107 Self::new(buffer, Some(file))
108 }
109
110 pub fn from_buffer(buffer: OwnedBuffer) -> Self {
111 Self::new(buffer, None)
112 }
113
114 pub fn owned_buffer(&self) -> OwnedBuffer {
115 let guard = self.state.lock().unwrap();
116 guard.mmap_offload.clone()
117 }
118
119 fn lock(&self) -> MutexGuard<'_, OffloadBackingStoreState> {
120 self.state.lock().unwrap()
121 }
122}
123
124#[derive(Debug)]
125pub struct OffloadedFile {
126 backing: OffloadBackingStore,
127 #[allow(dead_code)]
128 limiter: Option<DynFsMemoryLimiter>,
129 extents: Vec<FileExtent>,
130 size: u64,
131}
132
133pub enum OffloadWrite<'a> {
134 MmapOffset { offset: u64, size: u64 },
135 Buffer(&'a [u8]),
136}
137
138impl OffloadWrite<'_> {
139 fn len(&self) -> usize {
140 match self {
141 OffloadWrite::MmapOffset { size, .. } => *size as usize,
142 OffloadWrite::Buffer(data) => data.len(),
143 }
144 }
145}
146
147impl OffloadedFile {
148 pub fn new(limiter: Option<DynFsMemoryLimiter>, backing: OffloadBackingStore) -> Self {
149 Self {
150 backing,
151 limiter,
152 extents: Vec::new(),
153 size: 0,
154 }
155 }
156
157 pub fn seek(&self, position: io::SeekFrom, cursor: &mut u64) -> io::Result<u64> {
158 let to_err = |_| io::ErrorKind::InvalidInput;
159
160 let next_cursor: i64 = match position {
162 io::SeekFrom::Start(offset) => offset.try_into().map_err(to_err)?,
163 io::SeekFrom::End(offset) => self.len() as i64 + offset,
164 io::SeekFrom::Current(offset) => {
165 TryInto::<i64>::try_into(*cursor).map_err(to_err)? + offset
166 }
167 };
168
169 if next_cursor < 0 {
171 return Err(io::Error::new(
172 io::ErrorKind::InvalidInput,
173 "seeking before the byte 0",
174 ));
175 }
176
177 let next_cursor = next_cursor.try_into().map_err(to_err)?;
180 *cursor = cmp::min(self.len(), next_cursor);
181 Ok(*cursor)
182 }
183
184 pub fn read(&self, mut buf: &mut [u8], cursor: &mut u64) -> io::Result<usize> {
185 let cursor_start = *cursor;
186
187 let mut extent_offset = cursor_start;
188 let mut extent_index = 0usize;
189 while !buf.is_empty() && extent_index < self.extents.len() {
190 let extent = &self.extents[extent_index];
191
192 if extent_offset >= extent.size() {
193 extent_offset -= extent.size();
194 extent_index += 1;
195 continue;
196 }
197
198 let read = match extent {
199 FileExtent::MmapOffload {
200 offset: mmap_offset,
201 size: extent_size,
202 } => {
203 let mut backing = self.backing.lock();
204 let mmap_offset_plus_extent = mmap_offset + extent_offset;
205 let data = backing.get_slice(
206 mmap_offset_plus_extent
207 ..(mmap_offset_plus_extent + *extent_size - extent_offset),
208 )?;
209 let data_len = cmp::min(buf.len(), data.len());
210 buf[..data_len].copy_from_slice(&data[..data_len]);
211 data_len
212 }
213 FileExtent::RepeatingBytes { value, cnt } => {
214 let cnt = cmp::min(buf.len() as u64, cnt - extent_offset) as usize;
215 buf[..cnt].iter_mut().for_each(|d| {
216 *d = *value;
217 });
218 cnt
219 }
220 FileExtent::InMemory { data } => {
221 let data = &data.as_ref()[extent_offset as usize..];
222 let data_len = cmp::min(buf.len(), data.len());
223 buf[..data_len].copy_from_slice(&data[..data_len]);
224 data_len
225 }
226 };
227
228 *cursor += read as u64;
229 extent_offset = 0;
230 extent_index += 1;
231 buf = &mut buf[read..];
232 }
233 Ok((*cursor - cursor_start) as usize)
234 }
235
236 pub fn write(&mut self, data: OffloadWrite<'_>, cursor: &mut u64) -> io::Result<usize> {
237 let original_extent_offset = *cursor;
238 let mut extent_offset = original_extent_offset;
239 let mut data_len = data.len() as u64;
240
241 let mut split_extents = |mut split_at: u64| {
244 let mut index = 0usize;
245 while split_at > 0 && index < self.extents.len() {
246 let extent = &mut self.extents[index];
247 if split_at >= extent.size() {
248 split_at -= extent.size();
249 index += 1;
250 continue;
251 } else if split_at == 0 {
252 break;
253 } else {
254 let new_extent = match extent {
255 FileExtent::MmapOffload {
256 offset: other_offset,
257 size: other_size,
258 } => FileExtent::MmapOffload {
259 offset: *other_offset + split_at,
260 size: *other_size - split_at,
261 },
262 FileExtent::RepeatingBytes {
263 value: other_value,
264 cnt: other_cnt,
265 } => FileExtent::RepeatingBytes {
266 value: *other_value,
267 cnt: *other_cnt - split_at,
268 },
269 FileExtent::InMemory { data: other_data } => FileExtent::InMemory {
270 data: other_data.slice((split_at as usize)..),
271 },
272 };
273 extent.resize(split_at);
274 self.extents.insert(index + 1, new_extent);
275 break;
276 }
277 }
278 };
279
280 let mut index = if extent_offset < self.size {
282 split_extents(extent_offset);
283 split_extents(extent_offset + data_len);
284
285 let mut index = 0usize;
288 while index < self.extents.len() {
289 let extent = &self.extents[index];
290 if extent_offset >= extent.size() {
291 extent_offset -= extent.size();
292 index += 1;
293 continue;
294 } else {
295 break;
296 }
297 }
298 while index < self.extents.len() {
299 let extent = &self.extents[index];
300 if data_len < extent.size() {
301 break;
302 }
303 data_len -= extent.size();
304 self.extents.remove(index);
305 }
306 index
307 } else {
308 self.extents.len()
309 };
310
311 if extent_offset > self.size {
313 self.extents.insert(
314 index,
315 FileExtent::RepeatingBytes {
316 value: 0,
317 cnt: extent_offset - self.size,
318 },
319 );
320 self.size = extent_offset;
321 index += 1;
322 }
323
324 match data {
326 OffloadWrite::MmapOffset { offset, size } => {
327 self.extents
328 .insert(index, FileExtent::MmapOffload { offset, size });
329 }
330 OffloadWrite::Buffer(data) => {
331 let data_start = data.as_ptr() as u64;
333 let data_end = data_start + data.len() as u64;
334 let mut backing = self.backing.lock();
335 let backing_data = backing.get_slice(0u64..u64::MAX)?;
336
337 let mmap_start = backing_data.as_ptr() as u64;
338 let mmap_end = mmap_start + backing_data.len() as u64;
339
340 let new_extent = if data_start >= mmap_start && data_end <= mmap_end {
343 FileExtent::MmapOffload {
344 offset: data_start - mmap_start,
345 size: data_end - data_start,
346 }
347 } else {
348 FileExtent::InMemory {
349 data: data.to_vec().into(),
350 }
351 };
352 self.extents.insert(index, new_extent);
353 }
354 }
355 self.size = self.size.max(original_extent_offset + data.len() as u64);
356
357 *cursor += data.len() as u64;
359 Ok(data.len())
360 }
361
362 pub fn flush(&mut self) -> io::Result<()> {
363 Ok(())
364 }
365
366 pub fn resize(&mut self, new_len: u64, value: u8) {
367 let mut cur_len = self.len();
368 if new_len > cur_len {
369 self.extents.push(FileExtent::RepeatingBytes {
370 value,
371 cnt: new_len - cur_len,
372 });
373 }
374 while cur_len > new_len && !self.extents.is_empty() {
375 let extent: &mut FileExtent = self.extents.last_mut().unwrap();
376 let diff = extent.size().min(cur_len - new_len);
377 extent.resize(extent.size() - diff);
378 cur_len -= diff;
379 if extent.size() == 0 {
380 self.extents.pop();
381 }
382 }
383 self.size = new_len;
384 }
385
386 pub fn len(&self) -> u64 {
387 self.size
388 }
389
390 pub fn truncate(&mut self) {
391 self.extents.clear();
392 self.size = 0;
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 #[tracing_test::traced_test]
402 pub fn test_offload_file() -> anyhow::Result<()> {
403 let buffer = OwnedBuffer::from_bytes(std::iter::repeat_n(12u8, 100).collect::<Vec<_>>());
404 let test_data2 = buffer.clone();
405
406 let backing = OffloadBackingStore::new(buffer, None);
407 let mut file = OffloadedFile::new(None, backing);
408
409 let mut cursor = 0u64;
410 let test_data = std::iter::repeat_n(56u8, 100).collect::<Vec<_>>();
411 file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
412
413 assert_eq!(file.len(), 100);
414
415 cursor = 0;
416 let mut result = std::iter::repeat_n(0u8, 100).collect::<Vec<_>>();
417 file.read(&mut result, &mut cursor)?;
418 assert_eq!(&result, &std::iter::repeat_n(56u8, 100).collect::<Vec<_>>());
419
420 cursor = 50;
421 file.write(OffloadWrite::Buffer(&test_data2), &mut cursor)?;
422
423 assert_eq!(file.len(), 150);
424
425 cursor = 0;
426 let mut result = std::iter::repeat_n(0u8, 150).collect::<Vec<_>>();
427 file.read(&mut result, &mut cursor)?;
428 assert_eq!(
429 &result,
430 &std::iter::repeat_n(56u8, 50)
431 .chain(std::iter::repeat_n(12u8, 100))
432 .collect::<Vec<_>>()
433 );
434
435 file.resize(200, 99u8);
436 assert_eq!(file.len(), 200);
437
438 cursor = 0;
439 let mut result = std::iter::repeat_n(0u8, 200).collect::<Vec<_>>();
440 file.read(&mut result, &mut cursor)?;
441 assert_eq!(
442 &result,
443 &std::iter::repeat_n(56u8, 50)
444 .chain(std::iter::repeat_n(12u8, 100))
445 .chain(std::iter::repeat_n(99u8, 50))
446 .collect::<Vec<_>>()
447 );
448
449 file.resize(33, 1u8);
450
451 cursor = 0;
452 let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
453 file.read(&mut result, &mut cursor)?;
454 assert_eq!(&result, &std::iter::repeat_n(56u8, 33).collect::<Vec<_>>());
455
456 let mut cursor = 10u64;
457 let test_data = std::iter::repeat_n(74u8, 10).collect::<Vec<_>>();
458 file.write(OffloadWrite::Buffer(&test_data), &mut cursor)?;
459
460 assert_eq!(file.len(), 33);
461
462 cursor = 0;
463 let mut result = std::iter::repeat_n(0u8, 33).collect::<Vec<_>>();
464 file.read(&mut result, &mut cursor)?;
465 assert_eq!(
466 &result,
467 &std::iter::repeat_n(56u8, 10)
468 .chain(std::iter::repeat_n(74u8, 10))
469 .chain(std::iter::repeat_n(56u8, 13))
470 .collect::<Vec<_>>()
471 );
472
473 Ok(())
474 }
475}