use std::io::Read;
use bytes::BytesMut;
use crate::{
v2::{
read::{
decoder::{DecodeError, Decoder},
Section,
},
Span,
},
DetectError, Version,
};
const DEFAULT_READ_SIZE: usize = 4 * 1024;
#[derive(Debug)]
pub struct StreamingReader<R> {
inner: R,
buffer: BytesMut,
decoder: Decoder,
}
impl<R: Read> StreamingReader<R> {
pub fn new(mut reader: R) -> Result<Self, StreamingReaderError> {
let version = crate::detect(&mut reader)?;
if version != Version::V2 {
return Err(StreamingReaderError::UnsupportedVersion(version));
}
const BYTES_READ_BY_DETECT: usize = 8;
Ok(StreamingReader {
inner: reader,
buffer: BytesMut::new(),
decoder: Decoder::new(BYTES_READ_BY_DETECT),
})
}
pub fn sections(mut self) -> impl Iterator<Item = Result<Section, StreamingReaderError>> {
std::iter::from_fn(move || self.next_section().transpose())
}
pub fn sections_with_offsets(
mut self,
) -> impl Iterator<Item = Result<(Section, Span), StreamingReaderError>> {
std::iter::from_fn(move || self.next_section_with_offset().transpose())
}
pub fn next_section(&mut self) -> Result<Option<Section>, StreamingReaderError> {
self.next_section_with_offset()
.map(|section| section.map(|(s, _)| s))
}
fn next_section_with_offset(
&mut self,
) -> Result<Option<(Section, Span)>, StreamingReaderError> {
let mut empty_reads = 0;
loop {
let bytes_read = self.fill_buffer()?;
let start = self.decoder.position();
match self.decoder.decode(&mut self.buffer)? {
Some(section) => {
let end = self.decoder.position();
let span = Span::new(start, end - start);
return Ok(Some((section, span)));
}
None if bytes_read == 0 => {
if empty_reads > 3 {
return Ok(None);
}
empty_reads += 1;
}
None => {
empty_reads = 0;
continue;
}
}
}
}
fn fill_buffer(&mut self) -> Result<usize, std::io::Error> {
let original_length = self.buffer.len();
self.buffer.resize(original_length + DEFAULT_READ_SIZE, 0);
let scratch_space = &mut self.buffer[original_length..];
let bytes_read = self.inner.read(scratch_space)?;
self.buffer.truncate(original_length + bytes_read);
Ok(bytes_read)
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum StreamingReaderError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Unable to detect the WEBC version")]
Detect(#[from] DetectError),
#[error("The version, {_0}, isn't supported")]
UnsupportedVersion(Version),
#[error("Decode failed")]
Decode(#[from] DecodeError),
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use crate::{
metadata::Manifest,
v2::{write::Writer, Tag},
};
use super::*;
#[test]
fn section_and_file_offsets() {
let volume = dir_map! {
"file.txt" => b"Hello, World!",
"another" => dir_map! {
"nested.txt" => b"nested",
}
};
let atoms = BTreeMap::from([("atom".parse().unwrap(), b"some-atom".into())]);
let mut writer = Writer::default()
.write_manifest(&Manifest::default())
.unwrap()
.write_atoms(atoms)
.unwrap();
writer.write_volume("some-volume", volume).unwrap();
let webc = writer.finish(crate::v2::SignatureAlgorithm::None).unwrap();
let mut reader = StreamingReader::new(webc.as_ref()).unwrap();
let (section, span) = reader.next_section_with_offset().unwrap().unwrap();
assert!(matches!(section, Section::Index(_)));
assert_eq!(span, Span::new(8, 432));
assert_eq!(Tag::from_u8(webc[span.start]).unwrap(), Tag::Index);
let (section, range) = reader.next_section_with_offset().unwrap().unwrap();
assert!(matches!(section, Section::Manifest(_)));
assert_eq!(range, Span::new(440, 10));
assert_eq!(Tag::from_u8(webc[range.start]).unwrap(), Tag::Manifest);
let (section, range) = reader.next_section_with_offset().unwrap().unwrap();
let atoms = section.as_atoms().unwrap();
assert_eq!(range, Span::new(450, 112));
assert_eq!(Tag::from_u8(webc[range.start]).unwrap(), Tag::Atoms);
let atom_offsets: BTreeMap<_, _> = atoms
.iter_with_offsets()
.map(|result| result.unwrap())
.map(|(name, _, offset)| (name, offset))
.collect();
let atom_offset = atom_offsets["atom"];
assert_eq!(atom_offset, Span::new(103, 9));
let some_atom = atom_offset.with_offset(range.start);
assert_eq!(std::str::from_utf8(&webc[some_atom]).unwrap(), "some-atom");
let (section, range) = reader.next_section_with_offset().unwrap().unwrap();
let volume_section = section.as_volume().unwrap();
assert_eq!(range, Span::new(562, 252));
assert_eq!(Tag::from_u8(webc[range.start]).unwrap(), Tag::Volume);
let root = volume_section.root().unwrap();
let entry = root.find("file.txt").unwrap().unwrap().into_file().unwrap();
let file_txt = &webc[entry.span().with_offset(range.start)];
assert_eq!(String::from_utf8_lossy(file_txt), "Hello, World!");
assert_eq!(entry.checksum(), crate::utils::sha256(file_txt));
let entry = root
.find("another")
.unwrap()
.unwrap()
.into_dir()
.unwrap()
.find("nested.txt")
.unwrap()
.unwrap()
.into_file()
.unwrap();
let nested_txt = &webc[entry.span().with_offset(range.start)];
assert_eq!(String::from_utf8_lossy(nested_txt), "nested");
assert_eq!(entry.checksum(), crate::utils::sha256(nested_txt));
}
}