wasmer_wasix/runtime/module_cache/
fallback.rs1use wasmer::{Engine, Module};
2
3use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash};
4
5#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct FallbackCache<Primary, Fallback> {
28 primary: Primary,
29 fallback: Fallback,
30}
31
32impl<Primary, Fallback> FallbackCache<Primary, Fallback> {
33 pub(crate) fn new(primary: Primary, fallback: Fallback) -> Self {
34 FallbackCache { primary, fallback }
35 }
36
37 pub fn primary(&self) -> &Primary {
38 &self.primary
39 }
40
41 pub fn primary_mut(&mut self) -> &mut Primary {
42 &mut self.primary
43 }
44
45 pub fn fallback(&self) -> &Fallback {
46 &self.fallback
47 }
48
49 pub fn fallback_mut(&mut self) -> &mut Fallback {
50 &mut self.fallback
51 }
52
53 pub fn into_inner(self) -> (Primary, Fallback) {
54 let FallbackCache { primary, fallback } = self;
55 (primary, fallback)
56 }
57}
58
59#[async_trait::async_trait]
60impl<Primary, Fallback> ModuleCache for FallbackCache<Primary, Fallback>
61where
62 Primary: ModuleCache + Send + Sync,
63 Fallback: ModuleCache + Send + Sync,
64{
65 async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
66 let primary_error = match self.primary.load(key, engine).await {
67 Ok(m) => return Ok(m),
68 Err(e) => e,
69 };
70
71 if let Ok(m) = self.fallback.load(key, engine).await {
72 if let Err(e) = self.primary.save(key, engine, &m).await {
75 tracing::warn!(
76 %key,
77 error = &e as &dyn std::error::Error,
78 "Unable to promote a module to the primary cache",
79 );
80 }
81
82 return Ok(m);
83 }
84
85 Err(primary_error)
86 }
87
88 async fn contains(&self, key: ModuleHash, engine: &Engine) -> Result<bool, CacheError> {
89 if self.primary.contains(key, engine).await? {
90 return Ok(true);
91 }
92
93 self.fallback.contains(key, engine).await
94 }
95
96 async fn save(
97 &self,
98 key: ModuleHash,
99 engine: &Engine,
100 module: &Module,
101 ) -> Result<(), CacheError> {
102 futures::try_join!(
103 self.primary.save(key, engine, module),
104 self.fallback.save(key, engine, module)
105 )?;
106 Ok(())
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use std::sync::atomic::{AtomicUsize, Ordering};
113
114 use super::*;
115 use crate::runtime::module_cache::SharedCache;
116
117 const ADD_WAT: &[u8] = br#"(
118 module
119 (func
120 (export "add")
121 (param $x i64)
122 (param $y i64)
123 (result i64)
124 (i64.add (local.get $x) (local.get $y)))
125 )"#;
126
127 #[derive(Debug)]
128 struct Spy<I> {
129 inner: I,
130 success: AtomicUsize,
131 failures: AtomicUsize,
132 }
133
134 impl<I> Spy<I> {
135 fn new(inner: I) -> Self {
136 Spy {
137 inner,
138 success: AtomicUsize::new(0),
139 failures: AtomicUsize::new(0),
140 }
141 }
142
143 fn success(&self) -> usize {
144 self.success.load(Ordering::SeqCst)
145 }
146
147 fn failures(&self) -> usize {
148 self.failures.load(Ordering::SeqCst)
149 }
150 }
151
152 #[async_trait::async_trait]
153 impl<I: ModuleCache + Send + Sync> ModuleCache for Spy<I> {
154 async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
155 match self.inner.load(key, engine).await {
156 Ok(m) => {
157 self.success.fetch_add(1, Ordering::SeqCst);
158 Ok(m)
159 }
160 Err(e) => {
161 self.failures.fetch_add(1, Ordering::SeqCst);
162 Err(e)
163 }
164 }
165 }
166
167 async fn contains(&self, key: ModuleHash, engine: &Engine) -> Result<bool, CacheError> {
168 self.inner.contains(key, engine).await
169 }
170
171 async fn save(
172 &self,
173 key: ModuleHash,
174 engine: &Engine,
175 module: &Module,
176 ) -> Result<(), CacheError> {
177 match self.inner.save(key, engine, module).await {
178 Ok(_) => {
179 self.success.fetch_add(1, Ordering::SeqCst);
180 Ok(())
181 }
182 Err(e) => {
183 self.failures.fetch_add(1, Ordering::SeqCst);
184 Err(e)
185 }
186 }
187 }
188 }
189
190 #[tokio::test]
191 async fn load_from_primary() {
192 let engine = Engine::default();
193 let module = Module::new(&engine, ADD_WAT).unwrap();
194 let key = ModuleHash::xxhash_from_bytes([0; 8]);
195 let primary = SharedCache::default();
196 let fallback = SharedCache::default();
197 primary.save(key, &engine, &module).await.unwrap();
198 let primary = Spy::new(primary);
199 let fallback = Spy::new(fallback);
200 let cache = FallbackCache::new(&primary, &fallback);
201
202 let got = cache.load(key, &engine).await.unwrap();
203
204 assert_eq!(module, got);
206 assert_eq!(primary.success(), 1);
207 assert_eq!(primary.failures(), 0);
208 assert_eq!(fallback.success(), 0);
210 assert_eq!(fallback.failures(), 0);
211 assert!(fallback.load(key, &engine).await.is_err());
213 }
214
215 #[tokio::test]
216 async fn loading_from_fallback_also_populates_primary() {
217 let engine = Engine::default();
218 let module = Module::new(&engine, ADD_WAT).unwrap();
219 let key = ModuleHash::xxhash_from_bytes([0; 8]);
220 let primary = SharedCache::default();
221 let fallback = SharedCache::default();
222 fallback.save(key, &engine, &module).await.unwrap();
223 let primary = Spy::new(primary);
224 let fallback = Spy::new(fallback);
225 let cache = FallbackCache::new(&primary, &fallback);
226
227 let got = cache.load(key, &engine).await.unwrap();
228
229 assert_eq!(module, got);
231 assert_eq!(fallback.success(), 1);
233 assert_eq!(fallback.failures(), 0);
234 assert_eq!(primary.failures(), 1);
236 assert_eq!(primary.success(), 1);
238 assert_eq!(primary.load(key, &engine).await.unwrap(), module);
239 }
240
241 #[tokio::test]
242 async fn saving_will_update_both() {
243 let engine = Engine::default();
244 let module = Module::new(&engine, ADD_WAT).unwrap();
245 let key = ModuleHash::xxhash_from_bytes([0; 8]);
246 let primary = SharedCache::default();
247 let fallback = SharedCache::default();
248 let cache = FallbackCache::new(&primary, &fallback);
249
250 cache.save(key, &engine, &module).await.unwrap();
251
252 assert_eq!(primary.load(key, &engine).await.unwrap(), module);
253 assert_eq!(fallback.load(key, &engine).await.unwrap(), module);
254 }
255}