1use std::{
2 collections::HashMap,
3 fmt::Write as _,
4 io::{ErrorKind, Write as _},
5 path::PathBuf,
6 sync::{Arc, RwLock},
7};
8
9use anyhow::{Context, Error};
10use bytes::Bytes;
11use http::{HeaderMap, Method};
12use tempfile::NamedTempFile;
13use url::Url;
14use wasmer_package::{
15 package::WasmerPackageError,
16 utils::{from_bytes, from_disk},
17};
18use webc::DetectError;
19use webc::{Container, ContainerError};
20
21use crate::{
22 bin_factory::BinaryPackage,
23 http::{HttpClient, HttpRequest, USER_AGENT},
24 runtime::{
25 package_loader::PackageLoader,
26 resolver::{DistributionInfo, PackageSummary, Resolution, WebcHash},
27 },
28};
29
30#[derive(Debug)]
33pub struct BuiltinPackageLoader {
34 client: Arc<dyn HttpClient + Send + Sync>,
35 in_memory: InMemoryCache,
36 cache: Option<FileSystemCache>,
37 tokens: HashMap<String, String>,
39
40 hash_validation: HashIntegrityValidationMode,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum HashIntegrityValidationMode {
46 NoValidate,
49 WarnOnHashMismatch,
51 FailOnHashMismatch,
53}
54
55impl BuiltinPackageLoader {
56 pub fn new() -> Self {
57 BuiltinPackageLoader {
58 in_memory: InMemoryCache::default(),
59 client: Arc::new(crate::http::default_http_client().unwrap()),
60 cache: None,
61 hash_validation: HashIntegrityValidationMode::NoValidate,
62 tokens: HashMap::new(),
63 }
64 }
65
66 pub fn with_hash_validation_mode(mut self, mode: HashIntegrityValidationMode) -> Self {
70 self.hash_validation = mode;
71 self
72 }
73
74 pub fn with_cache_dir(self, cache_dir: impl Into<PathBuf>) -> Self {
75 BuiltinPackageLoader {
76 cache: Some(FileSystemCache {
77 cache_dir: cache_dir.into(),
78 }),
79 ..self
80 }
81 }
82
83 pub fn cache(&self) -> Option<&FileSystemCache> {
84 self.cache.as_ref()
85 }
86
87 pub fn validate_cache(
88 &self,
89 mode: CacheValidationMode,
90 ) -> Result<Vec<ImageHashMismatchError>, anyhow::Error> {
91 let cache = self
92 .cache
93 .as_ref()
94 .context("can not validate cache - no cache configured")?;
95
96 let items = cache.validate_hashes()?;
97 let mut errors = Vec::new();
98 for (path, error) in items {
99 match mode {
100 CacheValidationMode::WarnOnMismatch => {
101 tracing::warn!(?error, "hash mismatch in cached image file");
102 }
103 CacheValidationMode::PruneOnMismatch => {
104 tracing::warn!(?error, "deleting cached image file due to hash mismatch");
105 match std::fs::remove_file(&path) {
106 Ok(()) => {}
107 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
108 Err(fs_err) => {
109 tracing::error!(
110 path=%error.source,
111 ?fs_err,
112 "could not delete cached image file with hash mismatch"
113 );
114 }
115 }
116 }
117 }
118
119 errors.push(error);
120 }
121
122 Ok(errors)
123 }
124
125 pub fn with_http_client(self, client: impl HttpClient + Send + Sync + 'static) -> Self {
126 self.with_shared_http_client(Arc::new(client))
127 }
128
129 pub fn with_shared_http_client(self, client: Arc<dyn HttpClient + Send + Sync>) -> Self {
130 BuiltinPackageLoader { client, ..self }
131 }
132
133 pub fn with_tokens<I, K, V>(mut self, tokens: I) -> Self
134 where
135 I: IntoIterator<Item = (K, V)>,
136 K: Into<String>,
137 V: Into<String>,
138 {
139 for (hostname, token) in tokens {
140 self = self.with_token(hostname, token);
141 }
142
143 self
144 }
145
146 pub fn with_token(mut self, hostname: impl Into<String>, token: impl Into<String>) -> Self {
153 self.tokens.insert(hostname.into(), token.into());
154 self
155 }
156
157 pub fn insert_cached(&self, hash: WebcHash, container: &Container) {
159 self.in_memory.save(container, hash);
160 }
161
162 #[tracing::instrument(level = "debug", skip_all, fields(pkg.hash=%hash))]
163 async fn get_cached(&self, hash: &WebcHash) -> Result<Option<Container>, Error> {
164 if let Some(cached) = self.in_memory.lookup(hash) {
165 return Ok(Some(cached));
166 }
167
168 if let Some(cache) = self.cache.as_ref() {
169 if let Some(cached) = cache.lookup(hash).await? {
170 tracing::debug!("Copying from the filesystem cache to the in-memory cache");
172 self.in_memory.save(&cached, *hash);
173 return Ok(Some(cached));
174 }
175 }
176
177 Ok(None)
178 }
179
180 async fn validate_hash(
182 image: &bytes::Bytes,
183 mode: HashIntegrityValidationMode,
184 info: &DistributionInfo,
185 ) -> Result<(), anyhow::Error> {
186 let info = info.clone();
187 let image = image.clone();
188 crate::spawn_blocking(move || Self::validate_hash_sync(&image, mode, &info))
189 .await
190 .context("tokio runtime failed")?
191 }
192
193 fn validate_hash_sync(
195 image: &[u8],
196 mode: HashIntegrityValidationMode,
197 info: &DistributionInfo,
198 ) -> Result<(), anyhow::Error> {
199 match mode {
200 HashIntegrityValidationMode::NoValidate => {
201 Ok(())
203 }
204 HashIntegrityValidationMode::WarnOnHashMismatch => {
205 let actual_hash = WebcHash::sha256(image);
206 if actual_hash != info.webc_sha256 {
207 tracing::warn!(%info.webc_sha256, %actual_hash, "image hash mismatch - actual image hash does not match the expected hash!");
208 }
209 Ok(())
210 }
211 HashIntegrityValidationMode::FailOnHashMismatch => {
212 let actual_hash = WebcHash::sha256(image);
213 if actual_hash != info.webc_sha256 {
214 Err(ImageHashMismatchError {
215 source: info.webc.to_string(),
216 actual_hash,
217 expected_hash: info.webc_sha256,
218 }
219 .into())
220 } else {
221 Ok(())
222 }
223 }
224 }
225 }
226
227 #[tracing::instrument(level = "debug", skip_all, fields(%dist.webc, %dist.webc_sha256))]
228 async fn download(&self, dist: &DistributionInfo) -> Result<Bytes, Error> {
229 if dist.webc.scheme() == "file" {
230 match crate::runtime::resolver::utils::file_path_from_url(&dist.webc) {
231 Ok(path) => {
232 let bytes = crate::spawn_blocking({
233 let path = path.clone();
234 move || std::fs::read(path)
235 })
236 .await?
237 .with_context(|| format!("Unable to read \"{}\"", path.display()))?;
238
239 let bytes = bytes::Bytes::from(bytes);
240
241 Self::validate_hash(&bytes, self.hash_validation, dist).await?;
242
243 return Ok(bytes);
244 }
245 Err(e) => {
246 tracing::debug!(
247 url=%dist.webc,
248 error=&*e,
249 "Unable to convert the file:// URL to a path",
250 );
251 }
252 }
253 }
254
255 let request = HttpRequest {
256 headers: self.headers(&dist.webc),
257 url: dist.webc.clone(),
258 method: Method::GET,
259 body: None,
260 options: Default::default(),
261 };
262
263 tracing::debug!(%request.url, %request.method, "webc_package_download_start");
264 tracing::trace!(?request.headers);
265
266 let response = self.client.request(request).await?;
267
268 tracing::trace!(
269 %response.status,
270 %response.redirected,
271 ?response.headers,
272 response.len=response.body.as_ref().map(|body| body.len()),
273 "Received a response",
274 );
275
276 let url = &dist.webc;
277 if !response.is_ok() {
278 return Err(
279 crate::runtime::resolver::utils::http_error(&response).context(format!(
280 "package download failed: GET request to \"{}\" failed with status {}",
281 url, response.status
282 )),
283 );
284 }
285
286 let body = response.body.context("package download failed")?;
287 tracing::debug!(%url, "package_download_succeeded");
288
289 let body = bytes::Bytes::from(body);
290
291 Self::validate_hash(&body, self.hash_validation, dist).await?;
292
293 Ok(body)
294 }
295
296 fn headers(&self, url: &Url) -> HeaderMap {
297 let mut headers = HeaderMap::new();
298 headers.insert("Accept", "application/webc".parse().unwrap());
299 headers.insert("User-Agent", USER_AGENT.parse().unwrap());
300
301 if url.has_authority() {
302 if let Some(token) = self.tokens.get(url.authority()) {
303 let header = format!("Bearer {token}");
304 match header.parse() {
305 Ok(header) => {
306 headers.insert(http::header::AUTHORIZATION, header);
307 }
308 Err(e) => {
309 tracing::warn!(
310 error = &e as &dyn std::error::Error,
311 "An error occurred while parsing the authorization header",
312 );
313 }
314 }
315 }
316 }
317
318 headers
319 }
320}
321
322impl Default for BuiltinPackageLoader {
323 fn default() -> Self {
324 BuiltinPackageLoader::new()
325 }
326}
327
328#[async_trait::async_trait]
329impl PackageLoader for BuiltinPackageLoader {
330 #[tracing::instrument(
331 level="debug",
332 skip_all,
333 fields(
334 pkg=%summary.pkg.id,
335 ),
336 )]
337 async fn load(&self, summary: &PackageSummary) -> Result<Container, Error> {
338 if let Some(container) = self.get_cached(&summary.dist.webc_sha256).await? {
339 tracing::debug!("Cache hit!");
340 return Ok(container);
341 }
342
343 let bytes = self
345 .download(&summary.dist)
346 .await
347 .with_context(|| format!("Unable to download \"{}\"", summary.dist.webc))?;
348
349 if let Some(cache) = &self.cache {
353 match cache
354 .save_and_load_as_mmapped(bytes.clone(), &summary.dist)
355 .await
356 {
357 Ok(container) => {
358 tracing::debug!("Cached to disk");
359 self.in_memory.save(&container, summary.dist.webc_sha256);
360 return Ok(container);
363 }
364 Err(e) => {
365 tracing::warn!(
366 error=&*e,
367 pkg=%summary.pkg.id,
368 pkg.hash=%summary.dist.webc_sha256,
369 pkg.url=%summary.dist.webc,
370 "Unable to save the downloaded package to disk",
371 );
372 }
373 }
374 }
375
376 let container = crate::spawn_blocking(move || from_bytes(bytes)).await??;
379 self.in_memory.save(&container, summary.dist.webc_sha256);
381 Ok(container)
382 }
383
384 async fn load_package_tree(
385 &self,
386 root: &Container,
387 resolution: &Resolution,
388 root_is_local_dir: bool,
389 ) -> Result<BinaryPackage, Error> {
390 super::load_package_tree(root, self, resolution, root_is_local_dir).await
391 }
392}
393
394#[derive(Clone, Debug)]
395pub struct ImageHashMismatchError {
396 source: String,
397 expected_hash: WebcHash,
398 actual_hash: WebcHash,
399}
400
401impl std::fmt::Display for ImageHashMismatchError {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 write!(
404 f,
405 "image hash mismatch! expected hash '{}', but the computed hash is '{}' (source '{}')",
406 self.expected_hash, self.actual_hash, self.source,
407 )
408 }
409}
410
411impl std::error::Error for ImageHashMismatchError {}
412
413#[derive(Clone, Copy, Debug, PartialEq, Eq)]
414pub enum CacheValidationMode {
415 WarnOnMismatch,
418 PruneOnMismatch,
421}
422
423#[derive(Debug)]
426pub struct FileSystemCache {
427 cache_dir: PathBuf,
428}
429
430impl FileSystemCache {
431 const FILE_SUFFIX: &'static str = ".bin";
432
433 fn temp_dir(&self) -> PathBuf {
434 self.cache_dir.join("__temp__")
435 }
436
437 fn validate_hashes(&self) -> Result<Vec<(PathBuf, ImageHashMismatchError)>, anyhow::Error> {
440 let mut items = Vec::<(PathBuf, ImageHashMismatchError)>::new();
441
442 let iter = match std::fs::read_dir(&self.cache_dir) {
443 Ok(v) => v,
444 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
445 return Ok(Vec::new());
447 }
448 Err(err) => {
449 return Err(err).with_context(|| {
450 format!(
451 "Could not read image cache dir: '{}'",
452 self.cache_dir.display()
453 )
454 });
455 }
456 };
457
458 for res in iter {
459 let entry = res?;
460 if !entry.file_type()?.is_file() {
461 continue;
462 }
463
464 let hash_opt = entry
467 .file_name()
468 .to_str()
469 .and_then(|x| {
470 let (raw_hash, _) = x.split_once(Self::FILE_SUFFIX)?;
471 Some(raw_hash)
472 })
473 .and_then(|x| WebcHash::parse_hex(x).ok());
474 let Some(expected_hash) = hash_opt else {
475 continue;
476 };
477
478 let path = entry.path();
480 let actual_hash = WebcHash::for_file(&path)?;
481
482 if actual_hash != expected_hash {
483 let err = ImageHashMismatchError {
484 source: path.to_string_lossy().to_string(),
485 actual_hash,
486 expected_hash,
487 };
488 items.push((path, err));
489 }
490 }
491
492 Ok(items)
493 }
494
495 async fn lookup(&self, hash: &WebcHash) -> Result<Option<Container>, Error> {
496 let path = self.path(hash);
497
498 let container = crate::spawn_blocking({
499 let path = path.clone();
500 move || from_disk(path)
501 })
502 .await?;
503 match container {
504 Ok(c) => Ok(Some(c)),
505 Err(WasmerPackageError::ContainerError(ContainerError::Open { error, .. }))
506 | Err(WasmerPackageError::ContainerError(ContainerError::Read { error, .. }))
507 | Err(WasmerPackageError::ContainerError(ContainerError::Detect(DetectError::Io(
508 error,
509 )))) if error.kind() == ErrorKind::NotFound => Ok(None),
510 Err(e) => {
511 let msg = format!("Unable to read \"{}\"", path.display());
512 Err(Error::new(e).context(msg))
513 }
514 }
515 }
516
517 async fn save(&self, webc: Bytes, dist: &DistributionInfo) -> Result<PathBuf, Error> {
518 let path = self.path(&dist.webc_sha256);
519 let dist = dist.clone();
520 let temp_dir = self.temp_dir();
521
522 let path2 = path.clone();
523 crate::spawn_blocking(move || {
524 std::fs::create_dir_all(&temp_dir)
528 .with_context(|| format!("Unable to create directory '{}'", temp_dir.display()))?;
529
530 let mut temp = NamedTempFile::new_in(&temp_dir)?;
531 temp.write_all(&webc)?;
532 temp.flush()?;
533 temp.as_file_mut().sync_all()?;
534
535 temp.persist(&path)?;
537
538 tracing::debug!(
539 pkg.hash=%dist.webc_sha256,
540 pkg.url=%dist.webc,
541 path=%path.display(),
542 num_bytes=webc.len(),
543 "Saved to disk",
544 );
545 Result::<_, Error>::Ok(())
546 })
547 .await??;
548
549 Ok(path2)
550 }
551
552 #[tracing::instrument(level = "debug", skip_all)]
553 async fn save_and_load_as_mmapped(
554 &self,
555 webc: Bytes,
556 dist: &DistributionInfo,
557 ) -> Result<Container, Error> {
558 self.save(webc, dist).await?;
560
561 match self.lookup(&dist.webc_sha256).await? {
564 Some(container) => Ok(container),
565 None => {
566 Err(Error::msg("Unable to load the downloaded memory from disk"))
570 }
571 }
572 }
573
574 fn path(&self, hash: &WebcHash) -> PathBuf {
575 let hash = hash.as_bytes();
576 let mut filename = String::with_capacity(hash.len() * 2);
577 for b in hash {
578 write!(filename, "{b:02x}").unwrap();
579 }
580 filename.push_str(Self::FILE_SUFFIX);
581
582 self.cache_dir.join(filename)
583 }
584
585 pub async fn scan<S, F>(&self, state: S, callback: F) -> Result<S, Error>
587 where
588 S: Send + 'static,
589 F: Fn(&mut S, &std::fs::DirEntry) -> Result<(), Error> + Send + 'static,
590 {
591 let cache_dir = self.cache_dir.clone();
592 tokio::task::spawn_blocking(move || -> Result<S, anyhow::Error> {
593 let mut state = state;
594
595 let iter = match std::fs::read_dir(&cache_dir) {
596 Ok(v) => v,
597 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
598 return Ok(state);
600 }
601 Err(err) => {
602 return Err(err).with_context(|| {
603 format!("Could not read image cache dir: '{}'", cache_dir.display())
604 });
605 }
606 };
607
608 for res in iter {
609 let entry = res?;
610 if !entry.file_type()?.is_file() {
611 continue;
612 }
613
614 callback(&mut state, &entry)?;
615 }
616
617 Ok(state)
618 })
619 .await?
620 .context("tokio runtime failed")
621 }
622
623 pub async fn retain<S, F>(&self, state: S, filter: F) -> Result<S, Error>
625 where
626 S: Send + 'static,
627 F: Fn(&mut S, &std::fs::DirEntry) -> Result<bool, anyhow::Error> + Send + 'static,
628 {
629 let cache_dir = self.cache_dir.clone();
630 tokio::task::spawn_blocking(move || {
631 let iter = match std::fs::read_dir(&cache_dir) {
632 Ok(v) => v,
633 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
634 return Ok(state);
636 }
637 Err(err) => {
638 return Err(err).with_context(|| {
639 format!("Could not read image cache dir: '{}'", cache_dir.display())
640 });
641 }
642 };
643
644 let mut state = state;
645 for res in iter {
646 let entry = res?;
647 if !entry.file_type()?.is_file() {
648 continue;
649 }
650
651 if !filter(&mut state, &entry)? {
652 tracing::debug!(
653 path=%entry.path().display(),
654 "Removing cached image file - does not pass the filter",
655 );
656 match std::fs::remove_file(entry.path()) {
657 Ok(()) => {}
658 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
659 Err(fs_err) => {
660 tracing::warn!(
661 path=%entry.path().display(),
662 ?fs_err,
663 "Could not delete cached image file",
664 );
665 }
666 }
667 }
668 }
669
670 Ok(state)
671 })
672 .await?
673 .context("tokio runtime failed")
674 }
675}
676
677#[derive(Debug, Default)]
678struct InMemoryCache(RwLock<HashMap<WebcHash, Container>>);
679
680impl InMemoryCache {
681 fn lookup(&self, hash: &WebcHash) -> Option<Container> {
682 self.0.read().unwrap().get(hash).cloned()
683 }
684
685 fn save(&self, container: &Container, hash: WebcHash) {
686 let mut cache = self.0.write().unwrap();
687 cache.entry(hash).or_insert_with(|| container.clone());
688 }
689}
690
691#[cfg(test)]
692mod tests {
693 use std::{collections::VecDeque, sync::Mutex};
694
695 use futures::future::BoxFuture;
696 use http::{HeaderMap, StatusCode};
697 use tempfile::TempDir;
698 use wasmer_config::package::PackageId;
699
700 use crate::{
701 http::{HttpRequest, HttpResponse},
702 runtime::resolver::PackageInfo,
703 };
704
705 use super::*;
706
707 const PYTHON: &[u8] = include_bytes!("../../../../c-api/examples/assets/python-0.1.0.wasmer");
708
709 #[derive(Debug)]
710 pub(crate) struct DummyClient {
711 requests: Mutex<Vec<HttpRequest>>,
712 responses: Mutex<VecDeque<HttpResponse>>,
713 }
714
715 impl DummyClient {
716 pub fn with_responses(responses: impl IntoIterator<Item = HttpResponse>) -> Self {
717 DummyClient {
718 requests: Mutex::new(Vec::new()),
719 responses: Mutex::new(responses.into_iter().collect()),
720 }
721 }
722 }
723
724 impl HttpClient for DummyClient {
725 fn request(
726 &self,
727 request: HttpRequest,
728 ) -> BoxFuture<'_, Result<HttpResponse, anyhow::Error>> {
729 let response = self.responses.lock().unwrap().pop_front().unwrap();
730 self.requests.lock().unwrap().push(request);
731 Box::pin(async { Ok(response) })
732 }
733 }
734
735 async fn cache_misses_will_trigger_a_download_internal() {
736 let temp = TempDir::new().unwrap();
737 let client = Arc::new(DummyClient::with_responses([HttpResponse {
738 body: Some(PYTHON.to_vec()),
739 redirected: false,
740 status: StatusCode::OK,
741 headers: HeaderMap::new(),
742 }]));
743 let loader = BuiltinPackageLoader::new()
744 .with_cache_dir(temp.path())
745 .with_shared_http_client(client.clone());
746 let summary = PackageSummary {
747 pkg: PackageInfo {
748 id: PackageId::new_named("python/python", "0.1.0".parse().unwrap()),
749 dependencies: Vec::new(),
750 commands: Vec::new(),
751 entrypoint: Some("asdf".to_string()),
752 filesystem: Vec::new(),
753 },
754 dist: DistributionInfo {
755 webc: "https://wasmer.io/python/python".parse().unwrap(),
756 webc_sha256: [0xaa; 32].into(),
757 },
758 };
759
760 let container = loader.load(&summary).await.unwrap();
761
762 let requests = client.requests.lock().unwrap();
764 let request = &requests[0];
765 assert_eq!(request.url, summary.dist.webc);
766 assert_eq!(request.method, "GET");
767 assert_eq!(request.headers.len(), 2);
768 assert_eq!(request.headers["Accept"], "application/webc");
769 assert_eq!(request.headers["User-Agent"], USER_AGENT);
770 let manifest = container.manifest();
772 assert_eq!(manifest.entrypoint.as_deref(), Some("python"));
773 let path = loader
775 .cache
776 .as_ref()
777 .unwrap()
778 .path(&summary.dist.webc_sha256);
779 assert!(path.exists());
780 assert_eq!(std::fs::read(&path).unwrap(), PYTHON);
781 let in_memory = loader.in_memory.0.read().unwrap();
783 assert!(in_memory.contains_key(&summary.dist.webc_sha256));
784 }
785
786 #[cfg(not(target_arch = "wasm32"))]
787 #[tokio::test(flavor = "multi_thread")]
788 async fn cache_misses_will_trigger_a_download() {
789 cache_misses_will_trigger_a_download_internal().await
790 }
791
792 #[cfg(target_arch = "wasm32")]
793 #[tokio::test()]
794 async fn cache_misses_will_trigger_a_download() {
795 cache_misses_will_trigger_a_download_internal().await
796 }
797}
798
799#[cfg(test)]
800mod test {
801 use super::*;
802
803 #[tokio::test]
806 async fn test_builtin_package_downloader_cache_validation() {
807 let dir = tempfile::tempdir().unwrap();
808 let path = dir.path();
809
810 let contents = "fail";
811 let correct_hash = WebcHash::sha256(contents);
812 let used_hash =
813 WebcHash::parse_hex("0000a28ea38a000f3a3328cb7fabe330638d3258affe1a869e3f92986222d997")
814 .unwrap();
815 let filename = format!("{}{}", used_hash, FileSystemCache::FILE_SUFFIX);
816 let file_path = path.join(filename);
817 std::fs::write(&file_path, contents).unwrap();
818
819 let dl = BuiltinPackageLoader::new().with_cache_dir(path);
820
821 let errors = dl
822 .validate_cache(CacheValidationMode::PruneOnMismatch)
823 .unwrap();
824 assert_eq!(errors.len(), 1);
825 assert_eq!(errors[0].actual_hash, correct_hash);
826 assert_eq!(errors[0].expected_hash, used_hash);
827
828 assert_eq!(file_path.exists(), false);
829 }
830
831 #[tokio::test]
832 async fn test_file_cache_scan_retain() {
833 let dir = tempfile::tempdir().unwrap();
834 let path = dir.path();
835
836 let cache = FileSystemCache {
837 cache_dir: path.to_path_buf(),
838 };
839
840 {
841 let state = cache
842 .scan(0u64, |state: &mut u64, _entry| {
843 *state += 1;
844 Ok(())
845 })
846 .await
847 .unwrap();
848
849 assert_eq!(state, 0);
850 }
851
852 let path1 = cache
853 .save(
854 Bytes::from_static(b"test1"),
855 &DistributionInfo {
856 webc: Url::parse("file:///test1.webc").unwrap(),
857 webc_sha256: WebcHash::sha256(b"test1"),
858 },
859 )
860 .await
861 .unwrap();
862 let path2 = cache
863 .save(
864 Bytes::from_static(b"test2"),
865 &DistributionInfo {
866 webc: Url::parse("file:///test2.webc").unwrap(),
867 webc_sha256: WebcHash::sha256(b"test2"),
868 },
869 )
870 .await
871 .unwrap();
872
873 {
874 let path1 = path1.clone();
875 let path2 = path2.clone();
876 let state = cache
877 .scan(0u64, move |state: &mut u64, entry| {
878 *state += 1;
879 assert!(entry.path() == path1 || entry.path() == path2);
880 Ok(())
881 })
882 .await
883 .unwrap();
884
885 assert_eq!(state, 2);
886 }
887
888 {
889 let path1 = path1.clone();
890 let state = cache
891 .retain(0u64, move |state: &mut u64, entry| {
892 *state += 1;
893 Ok(entry.path() == path1)
894 })
895 .await
896 .unwrap();
897 assert_eq!(state, 2);
898 }
899
900 assert!(path1.exists());
901 assert!(!path2.exists(), "Path 2 should have been deleted");
902
903 {
904 let path1 = path1.clone();
905 let state = cache
906 .scan(0u64, move |state: &mut u64, entry| {
907 *state += 1;
908 assert!(entry.path() == path1);
909 Ok(())
910 })
911 .await
912 .unwrap();
913 assert_eq!(state, 1);
914 }
915 }
916}