1use std::{
2 collections::HashMap,
3 io::{ErrorKind, Write as _},
4 path::PathBuf,
5 sync::{Arc, RwLock},
6};
7
8use anyhow::{Context, Error};
9use bytes::Bytes;
10use http::{HeaderMap, Method};
11use tempfile::NamedTempFile;
12use url::Url;
13use wasmer_package::{
14 package::WasmerPackageError,
15 utils::{from_bytes, from_disk},
16};
17use webc::DetectError;
18use webc::{Container, ContainerError};
19
20use crate::{
21 bin_factory::BinaryPackage,
22 http::{HttpClient, HttpRequest, USER_AGENT},
23 runtime::{
24 package_loader::PackageLoader,
25 resolver::{DistributionInfo, PackageSummary, Resolution, WebcHash},
26 },
27};
28
29#[derive(Debug)]
32pub struct BuiltinPackageLoader {
33 client: Arc<dyn HttpClient + Send + Sync>,
34 in_memory: InMemoryCache,
35 cache: Option<FileSystemCache>,
36 tokens: HashMap<String, String>,
38
39 hash_validation: HashIntegrityValidationMode,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44pub enum HashIntegrityValidationMode {
45 NoValidate,
48 WarnOnHashMismatch,
50 FailOnHashMismatch,
52}
53
54impl BuiltinPackageLoader {
55 pub fn new() -> Self {
56 BuiltinPackageLoader {
57 in_memory: InMemoryCache::default(),
58 client: Arc::new(crate::http::default_http_client().unwrap()),
59 cache: None,
60 hash_validation: HashIntegrityValidationMode::NoValidate,
61 tokens: HashMap::new(),
62 }
63 }
64
65 pub fn with_hash_validation_mode(mut self, mode: HashIntegrityValidationMode) -> Self {
69 self.hash_validation = mode;
70 self
71 }
72
73 pub fn with_cache_dir(self, cache_dir: impl Into<PathBuf>) -> Self {
74 BuiltinPackageLoader {
75 cache: Some(FileSystemCache {
76 cache_dir: cache_dir.into(),
77 }),
78 ..self
79 }
80 }
81
82 pub fn cache(&self) -> Option<&FileSystemCache> {
83 self.cache.as_ref()
84 }
85
86 pub fn validate_cache(
87 &self,
88 mode: CacheValidationMode,
89 ) -> Result<Vec<ImageHashMismatchError>, anyhow::Error> {
90 let cache = self
91 .cache
92 .as_ref()
93 .context("can not validate cache - no cache configured")?;
94
95 let items = cache.validate_hashes()?;
96 let mut errors = Vec::new();
97 for (path, error) in items {
98 match mode {
99 CacheValidationMode::WarnOnMismatch => {
100 tracing::warn!(?error, "hash mismatch in cached image file");
101 }
102 CacheValidationMode::PruneOnMismatch => {
103 tracing::warn!(?error, "deleting cached image file due to hash mismatch");
104 match std::fs::remove_file(&path) {
105 Ok(()) => {}
106 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
107 Err(fs_err) => {
108 tracing::error!(
109 path=%error.source,
110 ?fs_err,
111 "could not delete cached image file with hash mismatch"
112 );
113 }
114 }
115 }
116 }
117
118 errors.push(error);
119 }
120
121 Ok(errors)
122 }
123
124 pub fn with_http_client(self, client: impl HttpClient + Send + Sync + 'static) -> Self {
125 self.with_shared_http_client(Arc::new(client))
126 }
127
128 pub fn with_shared_http_client(self, client: Arc<dyn HttpClient + Send + Sync>) -> Self {
129 BuiltinPackageLoader { client, ..self }
130 }
131
132 pub fn with_tokens<I, K, V>(mut self, tokens: I) -> Self
133 where
134 I: IntoIterator<Item = (K, V)>,
135 K: Into<String>,
136 V: Into<String>,
137 {
138 for (hostname, token) in tokens {
139 self = self.with_token(hostname, token);
140 }
141
142 self
143 }
144
145 pub fn with_token(mut self, hostname: impl Into<String>, token: impl Into<String>) -> Self {
152 self.tokens.insert(hostname.into(), token.into());
153 self
154 }
155
156 pub fn insert_cached(&self, hash: WebcHash, container: &Container) {
158 self.in_memory.save(container, hash);
159 }
160
161 #[tracing::instrument(level = "debug", skip_all, fields(pkg.hash=%hash))]
162 async fn get_cached(&self, hash: &WebcHash) -> Result<Option<Container>, Error> {
163 if let Some(cached) = self.in_memory.lookup(hash) {
164 return Ok(Some(cached));
165 }
166
167 if let Some(cache) = self.cache.as_ref()
168 && let Some(cached) = cache.lookup(hash).await?
169 {
170 tracing::debug!("Copying from the filesystem cache to the in-memory cache");
172 self.in_memory.save(&cached, *hash);
173 return Ok(Some(cached));
174 }
175
176 Ok(None)
177 }
178
179 async fn validate_hash(
181 image: &bytes::Bytes,
182 mode: HashIntegrityValidationMode,
183 info: &DistributionInfo,
184 ) -> Result<(), anyhow::Error> {
185 let info = info.clone();
186 let image = image.clone();
187 crate::spawn_blocking(move || Self::validate_hash_sync(&image, mode, &info))
188 .await
189 .context("tokio runtime failed")?
190 }
191
192 fn validate_hash_sync(
194 image: &[u8],
195 mode: HashIntegrityValidationMode,
196 info: &DistributionInfo,
197 ) -> Result<(), anyhow::Error> {
198 match mode {
199 HashIntegrityValidationMode::NoValidate => {
200 Ok(())
202 }
203 HashIntegrityValidationMode::WarnOnHashMismatch => {
204 let actual_hash = WebcHash::sha256(image);
205 if actual_hash != info.webc_sha256 {
206 tracing::warn!(%info.webc_sha256, %actual_hash, "image hash mismatch - actual image hash does not match the expected hash!");
207 }
208 Ok(())
209 }
210 HashIntegrityValidationMode::FailOnHashMismatch => {
211 let actual_hash = WebcHash::sha256(image);
212 if actual_hash != info.webc_sha256 {
213 Err(ImageHashMismatchError {
214 source: info.webc.to_string(),
215 actual_hash,
216 expected_hash: info.webc_sha256,
217 }
218 .into())
219 } else {
220 Ok(())
221 }
222 }
223 }
224 }
225
226 #[tracing::instrument(level = "debug", skip_all, fields(%dist.webc, %dist.webc_sha256))]
227 async fn download(&self, dist: &DistributionInfo) -> Result<Bytes, Error> {
228 if dist.webc.scheme() == "file" {
229 match crate::runtime::resolver::utils::file_path_from_url(&dist.webc) {
230 Ok(path) => {
231 let bytes = crate::spawn_blocking({
232 let path = path.clone();
233 move || std::fs::read(path)
234 })
235 .await?
236 .with_context(|| format!("Unable to read \"{}\"", path.display()))?;
237
238 let bytes = bytes::Bytes::from(bytes);
239
240 Self::validate_hash(&bytes, self.hash_validation, dist).await?;
241
242 return Ok(bytes);
243 }
244 Err(e) => {
245 tracing::debug!(
246 url=%dist.webc,
247 error=&*e,
248 "Unable to convert the file:// URL to a path",
249 );
250 }
251 }
252 }
253
254 let request = HttpRequest {
255 headers: self.headers(&dist.webc),
256 url: dist.webc.clone(),
257 method: Method::GET,
258 body: None,
259 options: Default::default(),
260 };
261
262 tracing::debug!(%request.url, %request.method, "webc_package_download_start");
263 tracing::trace!(?request.headers);
264
265 let response = self.client.request(request).await?;
266
267 tracing::trace!(
268 %response.status,
269 %response.redirected,
270 ?response.headers,
271 response.len=response.body.as_ref().map(|body| body.len()),
272 "Received a response",
273 );
274
275 let url = &dist.webc;
276 if !response.is_ok() {
277 return Err(
278 crate::runtime::resolver::utils::http_error(&response).context(format!(
279 "package download failed: GET request to \"{}\" failed with status {}",
280 url, response.status
281 )),
282 );
283 }
284
285 let body = response.body.context("package download failed")?;
286 tracing::debug!(%url, "package_download_succeeded");
287
288 let body = bytes::Bytes::from(body);
289
290 Self::validate_hash(&body, self.hash_validation, dist).await?;
291
292 Ok(body)
293 }
294
295 fn headers(&self, url: &Url) -> HeaderMap {
296 let mut headers = HeaderMap::new();
297 headers.insert("Accept", "application/webc".parse().unwrap());
298 headers.insert("User-Agent", USER_AGENT.parse().unwrap());
299
300 if url.has_authority()
301 && let Some(token) = self.tokens.get(url.authority())
302 {
303 let header = format!("Bearer {token}");
304 match header.parse() {
305 Ok(header) => {
306 headers.insert(http::header::AUTHORIZATION, header);
307 }
308 Err(e) => {
309 tracing::warn!(
310 error = &e as &dyn std::error::Error,
311 "An error occurred while parsing the authorization header",
312 );
313 }
314 }
315 }
316
317 headers
318 }
319}
320
321impl Default for BuiltinPackageLoader {
322 fn default() -> Self {
323 BuiltinPackageLoader::new()
324 }
325}
326
327#[async_trait::async_trait]
328impl PackageLoader for BuiltinPackageLoader {
329 #[tracing::instrument(
330 level="debug",
331 skip_all,
332 fields(
333 pkg=%summary.pkg.id,
334 ),
335 )]
336 async fn load(&self, summary: &PackageSummary) -> Result<Container, Error> {
337 if let Some(container) = self.get_cached(&summary.dist.webc_sha256).await? {
338 tracing::debug!("Cache hit!");
339 return Ok(container);
340 }
341
342 let bytes = self
344 .download(&summary.dist)
345 .await
346 .with_context(|| format!("Unable to download \"{}\"", summary.dist.webc))?;
347
348 if let Some(cache) = &self.cache {
352 match cache
353 .save_and_load_as_mmapped(bytes.clone(), &summary.dist)
354 .await
355 {
356 Ok(container) => {
357 tracing::debug!("Cached to disk");
358 self.in_memory.save(&container, summary.dist.webc_sha256);
359 return Ok(container);
362 }
363 Err(e) => {
364 tracing::warn!(
365 error=&*e,
366 pkg=%summary.pkg.id,
367 pkg.hash=%summary.dist.webc_sha256,
368 pkg.url=%summary.dist.webc,
369 "Unable to save the downloaded package to disk",
370 );
371 }
372 }
373 }
374
375 let container = crate::spawn_blocking(move || from_bytes(bytes)).await??;
378 self.in_memory.save(&container, summary.dist.webc_sha256);
380 Ok(container)
381 }
382
383 async fn load_package_tree(
384 &self,
385 root: &Container,
386 resolution: &Resolution,
387 root_is_local_dir: bool,
388 ) -> Result<BinaryPackage, Error> {
389 super::load_package_tree(root, self, resolution, root_is_local_dir).await
390 }
391}
392
393#[derive(Clone, Debug)]
394pub struct ImageHashMismatchError {
395 source: String,
396 expected_hash: WebcHash,
397 actual_hash: WebcHash,
398}
399
400impl std::fmt::Display for ImageHashMismatchError {
401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402 write!(
403 f,
404 "image hash mismatch! expected hash '{}', but the computed hash is '{}' (source '{}')",
405 self.expected_hash, self.actual_hash, self.source,
406 )
407 }
408}
409
410impl std::error::Error for ImageHashMismatchError {}
411
412#[derive(Clone, Copy, Debug, PartialEq, Eq)]
413pub enum CacheValidationMode {
414 WarnOnMismatch,
417 PruneOnMismatch,
420}
421
422#[derive(Debug)]
425pub struct FileSystemCache {
426 cache_dir: PathBuf,
427}
428
429impl FileSystemCache {
430 const FILE_SUFFIX: &'static str = ".bin";
431
432 fn temp_dir(&self) -> PathBuf {
433 self.cache_dir.join("__temp__")
434 }
435
436 fn validate_hashes(&self) -> Result<Vec<(PathBuf, ImageHashMismatchError)>, anyhow::Error> {
439 let mut items = Vec::<(PathBuf, ImageHashMismatchError)>::new();
440
441 let iter = match std::fs::read_dir(&self.cache_dir) {
442 Ok(v) => v,
443 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
444 return Ok(Vec::new());
446 }
447 Err(err) => {
448 return Err(err).with_context(|| {
449 format!(
450 "Could not read image cache dir: '{}'",
451 self.cache_dir.display()
452 )
453 });
454 }
455 };
456
457 for res in iter {
458 let entry = res?;
459 if !entry.file_type()?.is_file() {
460 continue;
461 }
462
463 let hash_opt = entry
466 .file_name()
467 .to_str()
468 .and_then(|x| {
469 let (raw_hash, _) = x.split_once(Self::FILE_SUFFIX)?;
470 Some(raw_hash)
471 })
472 .and_then(|x| WebcHash::parse_hex(x).ok());
473 let Some(expected_hash) = hash_opt else {
474 continue;
475 };
476
477 let path = entry.path();
479 let actual_hash = WebcHash::for_file(&path)?;
480
481 if actual_hash != expected_hash {
482 let err = ImageHashMismatchError {
483 source: path.to_string_lossy().to_string(),
484 actual_hash,
485 expected_hash,
486 };
487 items.push((path, err));
488 }
489 }
490
491 Ok(items)
492 }
493
494 async fn lookup(&self, hash: &WebcHash) -> Result<Option<Container>, Error> {
495 let path = self.path(hash);
496
497 let container = crate::spawn_blocking({
498 let path = path.clone();
499 move || from_disk(path)
500 })
501 .await?;
502 match container {
503 Ok(c) => Ok(Some(c)),
504 Err(WasmerPackageError::ContainerError(ContainerError::Open { error, .. }))
505 | Err(WasmerPackageError::ContainerError(ContainerError::Read { error, .. }))
506 | Err(WasmerPackageError::ContainerError(ContainerError::Detect(DetectError::Io(
507 error,
508 )))) if error.kind() == ErrorKind::NotFound => Ok(None),
509 Err(e) => {
510 let msg = format!("Unable to read \"{}\"", path.display());
511 Err(Error::new(e).context(msg))
512 }
513 }
514 }
515
516 async fn save(&self, webc: Bytes, dist: &DistributionInfo) -> Result<PathBuf, Error> {
517 let path = self.path(&dist.webc_sha256);
518 let dist = dist.clone();
519 let temp_dir = self.temp_dir();
520
521 let path2 = path.clone();
522 crate::spawn_blocking(move || {
523 std::fs::create_dir_all(&temp_dir)
527 .with_context(|| format!("Unable to create directory '{}'", temp_dir.display()))?;
528
529 let mut temp = NamedTempFile::new_in(&temp_dir)?;
530 temp.write_all(&webc)?;
531 temp.flush()?;
532 temp.as_file_mut().sync_all()?;
533
534 temp.persist(&path)?;
536
537 tracing::debug!(
538 pkg.hash=%dist.webc_sha256,
539 pkg.url=%dist.webc,
540 path=%path.display(),
541 num_bytes=webc.len(),
542 "Saved to disk",
543 );
544 Result::<_, Error>::Ok(())
545 })
546 .await??;
547
548 Ok(path2)
549 }
550
551 #[tracing::instrument(level = "debug", skip_all)]
552 async fn save_and_load_as_mmapped(
553 &self,
554 webc: Bytes,
555 dist: &DistributionInfo,
556 ) -> Result<Container, Error> {
557 self.save(webc, dist).await?;
559
560 match self.lookup(&dist.webc_sha256).await? {
563 Some(container) => Ok(container),
564 None => {
565 Err(Error::msg("Unable to load the downloaded memory from disk"))
569 }
570 }
571 }
572
573 fn path(&self, hash: &WebcHash) -> PathBuf {
574 self.cache_dir.join(format!(
575 "{}{}",
576 hex::encode(hash.as_bytes()),
577 Self::FILE_SUFFIX
578 ))
579 }
580
581 pub async fn scan<S, F>(&self, state: S, callback: F) -> Result<S, Error>
583 where
584 S: Send + 'static,
585 F: Fn(&mut S, &std::fs::DirEntry) -> Result<(), Error> + Send + 'static,
586 {
587 let cache_dir = self.cache_dir.clone();
588 tokio::task::spawn_blocking(move || -> Result<S, anyhow::Error> {
589 let mut state = state;
590
591 let iter = match std::fs::read_dir(&cache_dir) {
592 Ok(v) => v,
593 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
594 return Ok(state);
596 }
597 Err(err) => {
598 return Err(err).with_context(|| {
599 format!("Could not read image cache dir: '{}'", cache_dir.display())
600 });
601 }
602 };
603
604 for res in iter {
605 let entry = res?;
606 if !entry.file_type()?.is_file() {
607 continue;
608 }
609
610 callback(&mut state, &entry)?;
611 }
612
613 Ok(state)
614 })
615 .await?
616 .context("tokio runtime failed")
617 }
618
619 pub async fn retain<S, F>(&self, state: S, filter: F) -> Result<S, Error>
621 where
622 S: Send + 'static,
623 F: Fn(&mut S, &std::fs::DirEntry) -> Result<bool, anyhow::Error> + Send + 'static,
624 {
625 let cache_dir = self.cache_dir.clone();
626 tokio::task::spawn_blocking(move || {
627 let iter = match std::fs::read_dir(&cache_dir) {
628 Ok(v) => v,
629 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
630 return Ok(state);
632 }
633 Err(err) => {
634 return Err(err).with_context(|| {
635 format!("Could not read image cache dir: '{}'", cache_dir.display())
636 });
637 }
638 };
639
640 let mut state = state;
641 for res in iter {
642 let entry = res?;
643 if !entry.file_type()?.is_file() {
644 continue;
645 }
646
647 if !filter(&mut state, &entry)? {
648 tracing::debug!(
649 path=%entry.path().display(),
650 "Removing cached image file - does not pass the filter",
651 );
652 match std::fs::remove_file(entry.path()) {
653 Ok(()) => {}
654 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
655 Err(fs_err) => {
656 tracing::warn!(
657 path=%entry.path().display(),
658 ?fs_err,
659 "Could not delete cached image file",
660 );
661 }
662 }
663 }
664 }
665
666 Ok(state)
667 })
668 .await?
669 .context("tokio runtime failed")
670 }
671}
672
673#[derive(Debug, Default)]
674struct InMemoryCache(RwLock<HashMap<WebcHash, Container>>);
675
676impl InMemoryCache {
677 fn lookup(&self, hash: &WebcHash) -> Option<Container> {
678 self.0.read().unwrap().get(hash).cloned()
679 }
680
681 fn save(&self, container: &Container, hash: WebcHash) {
682 let mut cache = self.0.write().unwrap();
683 cache.entry(hash).or_insert_with(|| container.clone());
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use std::{collections::VecDeque, sync::Mutex};
690
691 use futures::future::BoxFuture;
692 use http::{HeaderMap, StatusCode};
693 use tempfile::TempDir;
694 use wasmer_config::package::PackageId;
695
696 use crate::{
697 http::{HttpRequest, HttpResponse},
698 runtime::resolver::PackageInfo,
699 };
700
701 use super::*;
702
703 const PYTHON: &[u8] = include_bytes!("../../../../c-api/examples/assets/python-0.1.0.wasmer");
704
705 #[derive(Debug)]
706 pub(crate) struct DummyClient {
707 requests: Mutex<Vec<HttpRequest>>,
708 responses: Mutex<VecDeque<HttpResponse>>,
709 }
710
711 impl DummyClient {
712 pub fn with_responses(responses: impl IntoIterator<Item = HttpResponse>) -> Self {
713 DummyClient {
714 requests: Mutex::new(Vec::new()),
715 responses: Mutex::new(responses.into_iter().collect()),
716 }
717 }
718 }
719
720 impl HttpClient for DummyClient {
721 fn request(
722 &self,
723 request: HttpRequest,
724 ) -> BoxFuture<'_, Result<HttpResponse, anyhow::Error>> {
725 let response = self.responses.lock().unwrap().pop_front().unwrap();
726 self.requests.lock().unwrap().push(request);
727 Box::pin(async { Ok(response) })
728 }
729 }
730
731 async fn cache_misses_will_trigger_a_download_internal() {
732 let temp = TempDir::new().unwrap();
733 let client = Arc::new(DummyClient::with_responses([HttpResponse {
734 body: Some(PYTHON.to_vec()),
735 redirected: false,
736 status: StatusCode::OK,
737 headers: HeaderMap::new(),
738 }]));
739 let loader = BuiltinPackageLoader::new()
740 .with_cache_dir(temp.path())
741 .with_shared_http_client(client.clone());
742 let summary = PackageSummary {
743 pkg: PackageInfo {
744 id: PackageId::new_named("python/python", "0.1.0".parse().unwrap()),
745 dependencies: Vec::new(),
746 commands: Vec::new(),
747 entrypoint: Some("asdf".to_string()),
748 filesystem: Vec::new(),
749 },
750 dist: DistributionInfo {
751 webc: "https://wasmer.io/python/python".parse().unwrap(),
752 webc_sha256: [0xaa; 32].into(),
753 },
754 };
755
756 let container = loader.load(&summary).await.unwrap();
757
758 let requests = client.requests.lock().unwrap();
760 let request = &requests[0];
761 assert_eq!(request.url, summary.dist.webc);
762 assert_eq!(request.method, "GET");
763 assert_eq!(request.headers.len(), 2);
764 assert_eq!(request.headers["Accept"], "application/webc");
765 assert_eq!(request.headers["User-Agent"], USER_AGENT);
766 let manifest = container.manifest();
768 assert_eq!(manifest.entrypoint.as_deref(), Some("python"));
769 let path = loader
771 .cache
772 .as_ref()
773 .unwrap()
774 .path(&summary.dist.webc_sha256);
775 assert!(path.exists());
776 assert_eq!(std::fs::read(&path).unwrap(), PYTHON);
777 let in_memory = loader.in_memory.0.read().unwrap();
779 assert!(in_memory.contains_key(&summary.dist.webc_sha256));
780 }
781
782 #[cfg(not(target_arch = "wasm32"))]
783 #[tokio::test(flavor = "multi_thread")]
784 async fn cache_misses_will_trigger_a_download() {
785 cache_misses_will_trigger_a_download_internal().await
786 }
787
788 #[cfg(target_arch = "wasm32")]
789 #[tokio::test()]
790 async fn cache_misses_will_trigger_a_download() {
791 cache_misses_will_trigger_a_download_internal().await
792 }
793}
794
795#[cfg(test)]
796mod test {
797 use super::*;
798
799 #[tokio::test]
802 async fn test_builtin_package_downloader_cache_validation() {
803 let dir = tempfile::tempdir().unwrap();
804 let path = dir.path();
805
806 let contents = "fail";
807 let correct_hash = WebcHash::sha256(contents);
808 let used_hash =
809 WebcHash::parse_hex("0000a28ea38a000f3a3328cb7fabe330638d3258affe1a869e3f92986222d997")
810 .unwrap();
811 let filename = format!("{}{}", used_hash, FileSystemCache::FILE_SUFFIX);
812 let file_path = path.join(filename);
813 std::fs::write(&file_path, contents).unwrap();
814
815 let dl = BuiltinPackageLoader::new().with_cache_dir(path);
816
817 let errors = dl
818 .validate_cache(CacheValidationMode::PruneOnMismatch)
819 .unwrap();
820 assert_eq!(errors.len(), 1);
821 assert_eq!(errors[0].actual_hash, correct_hash);
822 assert_eq!(errors[0].expected_hash, used_hash);
823
824 assert_eq!(file_path.exists(), false);
825 }
826
827 #[tokio::test]
828 async fn test_file_cache_scan_retain() {
829 let dir = tempfile::tempdir().unwrap();
830 let path = dir.path();
831
832 let cache = FileSystemCache {
833 cache_dir: path.to_path_buf(),
834 };
835
836 {
837 let state = cache
838 .scan(0u64, |state: &mut u64, _entry| {
839 *state += 1;
840 Ok(())
841 })
842 .await
843 .unwrap();
844
845 assert_eq!(state, 0);
846 }
847
848 let path1 = cache
849 .save(
850 Bytes::from_static(b"test1"),
851 &DistributionInfo {
852 webc: Url::parse("file:///test1.webc").unwrap(),
853 webc_sha256: WebcHash::sha256(b"test1"),
854 },
855 )
856 .await
857 .unwrap();
858 let path2 = cache
859 .save(
860 Bytes::from_static(b"test2"),
861 &DistributionInfo {
862 webc: Url::parse("file:///test2.webc").unwrap(),
863 webc_sha256: WebcHash::sha256(b"test2"),
864 },
865 )
866 .await
867 .unwrap();
868
869 {
870 let path1 = path1.clone();
871 let path2 = path2.clone();
872 let state = cache
873 .scan(0u64, move |state: &mut u64, entry| {
874 *state += 1;
875 assert!(entry.path() == path1 || entry.path() == path2);
876 Ok(())
877 })
878 .await
879 .unwrap();
880
881 assert_eq!(state, 2);
882 }
883
884 {
885 let path1 = path1.clone();
886 let state = cache
887 .retain(0u64, move |state: &mut u64, entry| {
888 *state += 1;
889 Ok(entry.path() == path1)
890 })
891 .await
892 .unwrap();
893 assert_eq!(state, 2);
894 }
895
896 assert!(path1.exists());
897 assert!(!path2.exists(), "Path 2 should have been deleted");
898
899 {
900 let path1 = path1.clone();
901 let state = cache
902 .scan(0u64, move |state: &mut u64, entry| {
903 *state += 1;
904 assert!(entry.path() == path1);
905 Ok(())
906 })
907 .await
908 .unwrap();
909 assert_eq!(state, 1);
910 }
911 }
912}