1pub mod module_cache;
2pub mod package_loader;
3pub mod resolver;
4pub mod task_manager;
5
6use self::module_cache::CacheError;
7pub use self::task_manager::{SpawnType, VirtualTaskManager};
8use module_cache::HashedModuleData;
9use wasmer_types::{CompilationProgressCallback, ModuleHash};
10
11use std::{
12 borrow::Cow,
13 fmt,
14 ops::Deref,
15 sync::{Arc, Mutex},
16};
17
18use futures::future::BoxFuture;
19use virtual_mio::block_on;
20use virtual_net::{DynVirtualNetworking, VirtualNetworking};
21use wasmer::{Engine, Module, RuntimeError};
22use wasmer_wasix_types::wasi::ExitCode;
23
24#[cfg(feature = "journal")]
25use crate::journal::{DynJournal, DynReadableJournal};
26use crate::{
27 SpawnError, WasiTtyState,
28 bin_factory::BinaryPackageCommand,
29 http::{DynHttpClient, HttpClient},
30 os::TtyBridge,
31 runtime::{
32 module_cache::{
33 ModuleCache, ThreadLocalCache,
34 progress::{ModuleLoadProgress, ModuleLoadProgressReporter},
35 },
36 package_loader::{PackageLoader, UnsupportedPackageLoader},
37 resolver::{BackendSource, MultiSource, Source},
38 },
39};
40
41#[derive(Clone)]
42pub enum TaintReason {
43 UnknownWasiVersion,
44 NonZeroExitCode(ExitCode),
45 RuntimeError(RuntimeError),
46 DlSymbolResolutionFailed(String),
47}
48
49pub enum ModuleInput<'a> {
56 Bytes(Cow<'a, [u8]>),
58 Hashed(Cow<'a, HashedModuleData>),
60 Command(Cow<'a, BinaryPackageCommand>),
62}
63
64impl<'a> ModuleInput<'a> {
65 pub fn to_owned(&'a self) -> ModuleInput<'static> {
67 match self {
69 Self::Bytes(Cow::Borrowed(b)) => {
70 let v: Vec<u8> = (*b).to_owned();
71 let c: Cow<'static, [u8]> = Cow::from(v);
72 ModuleInput::Bytes(c)
73 }
74 Self::Bytes(Cow::Owned(b)) => ModuleInput::Bytes(Cow::Owned((*b).clone())),
75 Self::Hashed(Cow::Borrowed(h)) => ModuleInput::Hashed(Cow::Owned((*h).clone())),
76 Self::Hashed(Cow::Owned(h)) => ModuleInput::Hashed(Cow::Owned(h.clone())),
77 Self::Command(Cow::Borrowed(c)) => ModuleInput::Command(Cow::Owned((*c).clone())),
78 Self::Command(Cow::Owned(c)) => ModuleInput::Command(Cow::Owned(c.clone())),
79 }
80 }
81
82 pub fn hash(&self) -> ModuleHash {
86 match self {
87 Self::Bytes(b) => {
88 ModuleHash::new(b)
90 }
91 Self::Hashed(hashed) => *hashed.hash(),
92 Self::Command(cmd) => *cmd.hash(),
93 }
94 }
95
96 pub fn wasm(&self) -> &[u8] {
98 match self {
99 Self::Bytes(b) => b,
100 Self::Hashed(hashed) => hashed.wasm().as_ref(),
101 Self::Command(cmd) => cmd.atom_ref().as_ref(),
102 }
103 }
104
105 pub fn to_hashed(&self) -> HashedModuleData {
109 match self {
110 Self::Bytes(b) => HashedModuleData::new(b.as_ref()),
111 Self::Hashed(hashed) => hashed.as_ref().clone(),
112 Self::Command(cmd) => HashedModuleData::from_command(cmd),
113 }
114 }
115}
116
117#[allow(unused_variables)]
121pub trait Runtime
122where
123 Self: fmt::Debug,
124{
125 fn networking(&self) -> &DynVirtualNetworking;
127
128 fn task_manager(&self) -> &Arc<dyn VirtualTaskManager>;
130
131 fn package_loader(&self) -> Arc<dyn PackageLoader + Send + Sync> {
133 Arc::new(UnsupportedPackageLoader)
134 }
135
136 fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
138 Arc::new(ThreadLocalCache::default())
145 }
146
147 fn source(&self) -> Arc<dyn Source + Send + Sync>;
149
150 fn engine(&self) -> Engine {
152 Engine::default()
153 }
154
155 fn new_store(&self) -> wasmer::Store {
157 cfg_if::cfg_if! {
158 if #[cfg(feature = "sys")] {
159 wasmer::Store::new(self.engine())
160 } else {
161 wasmer::Store::default()
162 }
163 }
164 }
165
166 fn http_client(&self) -> Option<&DynHttpClient> {
168 None
169 }
170
171 fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> {
173 None
174 }
175
176 fn resolve_module<'a>(
183 &'a self,
184 input: ModuleInput<'a>,
185 engine: Option<&Engine>,
186 on_progress: Option<ModuleLoadProgressReporter>,
187 ) -> BoxFuture<'a, Result<Module, SpawnError>> {
188 let data = input.to_hashed();
189
190 let engine = if let Some(e) = engine {
191 e.clone()
192 } else {
193 match &input {
194 ModuleInput::Bytes(_) => self.engine(),
195 ModuleInput::Hashed(_) => self.engine(),
196 ModuleInput::Command(cmd) => self.engine(),
197 }
198 };
199
200 let module_cache = self.module_cache();
201
202 let task = async move { load_module(&engine, &module_cache, input, on_progress).await };
203 Box::pin(task)
204 }
205
206 fn resolve_module_sync(
208 &self,
209 input: ModuleInput<'_>,
210 engine: Option<&Engine>,
211 on_progress: Option<ModuleLoadProgressReporter>,
212 ) -> Result<Module, SpawnError> {
213 block_on(self.resolve_module(input, engine, on_progress))
214 }
215
216 #[deprecated(since = "0.601.0", note = "Use `resolve_module` instead")]
223 fn load_command_module(
224 &self,
225 cmd: &BinaryPackageCommand,
226 ) -> BoxFuture<'_, Result<Module, SpawnError>> {
227 self.resolve_module(ModuleInput::Command(Cow::Owned(cmd.clone())), None, None)
228 }
229
230 #[deprecated(since = "0.601.0", note = "Use `resolve_module_sync` instead")]
232 fn load_command_module_sync(&self, cmd: &BinaryPackageCommand) -> Result<Module, SpawnError> {
233 block_on(self.resolve_module(ModuleInput::Command(Cow::Borrowed(cmd)), None, None))
234 }
235
236 #[deprecated(since = "0.601.0", note = "Use `resolve_module` instead")]
240 fn load_module<'a>(&'a self, wasm: &'a [u8]) -> BoxFuture<'a, Result<Module, SpawnError>> {
241 self.resolve_module(ModuleInput::Bytes(Cow::Borrowed(wasm)), None, None)
242 }
243
244 #[deprecated(
246 since = "0.601.0",
247 note = "Use `load_command_module` or `load_hashed_module` instead - this method can have high overhead"
248 )]
249 fn load_module_sync(&self, wasm: &[u8]) -> Result<Module, SpawnError> {
250 block_on(self.resolve_module(ModuleInput::Bytes(Cow::Borrowed(wasm)), None, None))
251 }
252
253 fn load_hashed_module(
257 &self,
258 module: HashedModuleData,
259 engine: Option<&Engine>,
260 ) -> BoxFuture<'_, Result<Module, SpawnError>> {
261 self.resolve_module(ModuleInput::Hashed(Cow::Owned(module)), engine, None)
262 }
263
264 fn load_hashed_module_sync(
266 &self,
267 wasm: HashedModuleData,
268 engine: Option<&Engine>,
269 ) -> Result<Module, SpawnError> {
270 block_on(self.resolve_module(ModuleInput::Hashed(Cow::Owned(wasm)), engine, None))
271 }
272
273 fn on_taint(&self, _reason: TaintReason) {}
276
277 #[cfg(feature = "journal")]
280 fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
281 Box::new(std::iter::empty())
282 }
283
284 #[cfg(feature = "journal")]
286 fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
287 Box::new(std::iter::empty())
288 }
289
290 #[cfg(feature = "journal")]
293 fn active_journal(&self) -> Option<&'_ DynJournal> {
294 None
295 }
296}
297
298pub type DynRuntime = dyn Runtime + Send + Sync;
299
300#[tracing::instrument(level = "debug", skip_all)]
305pub async fn load_module(
306 engine: &Engine,
307 module_cache: &(dyn ModuleCache + Send + Sync),
308 input: ModuleInput<'_>,
309 on_progress: Option<ModuleLoadProgressReporter>,
310) -> Result<Module, crate::SpawnError> {
311 let wasm_hash = input.hash();
312
313 let result = if let Some(on_progress) = &on_progress {
314 module_cache
315 .load_with_progress(wasm_hash, engine, on_progress.clone())
316 .await
317 } else {
318 module_cache.load(wasm_hash, engine).await
319 };
320
321 match result {
322 Ok(module) => return Ok(module),
323 Err(CacheError::NotFound) => {}
324 Err(other) => {
325 tracing::warn!(
326 %wasm_hash,
327 error=&other as &dyn std::error::Error,
328 "Unable to load the cached module",
329 );
330 }
331 }
332
333 let res = if let Some(progress) = on_progress {
334 #[allow(unused_variables)]
335 let p = CompilationProgressCallback::new(move |p| {
336 progress.notify(ModuleLoadProgress::CompilingModule(p))
337 });
338 #[cfg(feature = "sys-default")]
339 {
340 if engine.is_sys() {
341 use wasmer::sys::NativeEngineExt;
342 engine.new_module_with_progress(input.wasm(), p)
343 } else {
344 Module::new(&engine, input.wasm())
345 }
346 }
347 #[cfg(not(feature = "sys-default"))]
348 {
349 Module::new(&engine, input.wasm())
350 }
351 } else {
352 Module::new(&engine, input.wasm())
353 };
354
355 let module = res.map_err(|err| crate::SpawnError::CompileError {
356 module_hash: wasm_hash,
357 error: err,
358 })?;
359
360 if let Err(e) = module_cache.save(wasm_hash, engine, &module).await {
362 tracing::warn!(
363 %wasm_hash,
364 error=&e as &dyn std::error::Error,
365 "Unable to cache the compiled module",
366 );
367 }
368
369 Ok(module)
370}
371
372#[derive(Debug, Default)]
373pub struct DefaultTty {
374 state: Mutex<WasiTtyState>,
375}
376
377impl TtyBridge for DefaultTty {
378 fn reset(&self) {
379 let mut state = self.state.lock().unwrap();
380 state.echo = false;
381 state.line_buffered = false;
382 state.line_feeds = false
383 }
384
385 fn tty_get(&self) -> WasiTtyState {
386 let state = self.state.lock().unwrap();
387 state.clone()
388 }
389
390 fn tty_set(&self, tty_state: WasiTtyState) {
391 let mut state = self.state.lock().unwrap();
392 *state = tty_state;
393 }
394}
395
396#[derive(Debug, Clone)]
397pub struct PluggableRuntime {
398 pub rt: Arc<dyn VirtualTaskManager>,
399 pub networking: DynVirtualNetworking,
400 pub http_client: Option<DynHttpClient>,
401 pub package_loader: Arc<dyn PackageLoader + Send + Sync>,
402 pub source: Arc<dyn Source + Send + Sync>,
403 pub engine: Engine,
404 pub module_cache: Arc<dyn ModuleCache + Send + Sync>,
405 pub tty: Option<Arc<dyn TtyBridge + Send + Sync>>,
406 #[cfg(feature = "journal")]
407 pub read_only_journals: Vec<Arc<DynReadableJournal>>,
408 #[cfg(feature = "journal")]
409 pub writable_journals: Vec<Arc<DynJournal>>,
410}
411
412impl PluggableRuntime {
413 pub fn new(rt: Arc<dyn VirtualTaskManager>) -> Self {
414 cfg_if::cfg_if! {
416 if #[cfg(feature = "host-vnet")] {
417 let networking = Arc::new(virtual_net::host::LocalNetworking::default());
418 } else {
419 let networking = Arc::new(virtual_net::UnsupportedVirtualNetworking::default());
420 }
421 }
422 let http_client =
423 crate::http::default_http_client().map(|client| Arc::new(client) as DynHttpClient);
424
425 let loader = UnsupportedPackageLoader;
426
427 let mut source = MultiSource::default();
428 if let Some(client) = &http_client {
429 source.add_source(BackendSource::new(
430 BackendSource::WASMER_PROD_ENDPOINT.parse().unwrap(),
431 client.clone(),
432 ));
433 }
434
435 Self {
436 rt,
437 networking,
438 http_client,
439 engine: Default::default(),
440 tty: None,
441 source: Arc::new(source),
442 package_loader: Arc::new(loader),
443 module_cache: Arc::new(module_cache::in_memory()),
444 #[cfg(feature = "journal")]
445 read_only_journals: Vec::new(),
446 #[cfg(feature = "journal")]
447 writable_journals: Vec::new(),
448 }
449 }
450
451 pub fn set_networking_implementation<I>(&mut self, net: I) -> &mut Self
452 where
453 I: VirtualNetworking + Sync,
454 {
455 self.networking = Arc::new(net);
456 self
457 }
458
459 pub fn set_engine(&mut self, engine: Engine) -> &mut Self {
460 self.engine = engine;
461 self
462 }
463
464 pub fn set_tty(&mut self, tty: Arc<dyn TtyBridge + Send + Sync>) -> &mut Self {
465 self.tty = Some(tty);
466 self
467 }
468
469 pub fn set_module_cache(
470 &mut self,
471 module_cache: impl ModuleCache + Send + Sync + 'static,
472 ) -> &mut Self {
473 self.module_cache = Arc::new(module_cache);
474 self
475 }
476
477 pub fn set_source(&mut self, source: impl Source + Send + 'static) -> &mut Self {
478 self.source = Arc::new(source);
479 self
480 }
481
482 pub fn set_package_loader(
483 &mut self,
484 package_loader: impl PackageLoader + 'static,
485 ) -> &mut Self {
486 self.package_loader = Arc::new(package_loader);
487 self
488 }
489
490 pub fn set_http_client(
491 &mut self,
492 client: impl HttpClient + Send + Sync + 'static,
493 ) -> &mut Self {
494 self.http_client = Some(Arc::new(client));
495 self
496 }
497
498 #[cfg(feature = "journal")]
499 pub fn add_read_only_journal(&mut self, journal: Arc<DynReadableJournal>) -> &mut Self {
500 self.read_only_journals.push(journal);
501 self
502 }
503
504 #[cfg(feature = "journal")]
505 pub fn add_writable_journal(&mut self, journal: Arc<DynJournal>) -> &mut Self {
506 self.writable_journals.push(journal);
507 self
508 }
509}
510
511impl Runtime for PluggableRuntime {
512 fn networking(&self) -> &DynVirtualNetworking {
513 &self.networking
514 }
515
516 fn http_client(&self) -> Option<&DynHttpClient> {
517 self.http_client.as_ref()
518 }
519
520 fn package_loader(&self) -> Arc<dyn PackageLoader + Send + Sync> {
521 Arc::clone(&self.package_loader)
522 }
523
524 fn source(&self) -> Arc<dyn Source + Send + Sync> {
525 Arc::clone(&self.source)
526 }
527
528 fn engine(&self) -> Engine {
529 self.engine.clone()
530 }
531
532 fn new_store(&self) -> wasmer::Store {
533 wasmer::Store::new(self.engine.clone())
534 }
535
536 fn task_manager(&self) -> &Arc<dyn VirtualTaskManager> {
537 &self.rt
538 }
539
540 fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> {
541 self.tty.as_deref()
542 }
543
544 fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
545 self.module_cache.clone()
546 }
547
548 #[cfg(feature = "journal")]
549 fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
550 Box::new(self.read_only_journals.iter().cloned())
551 }
552
553 #[cfg(feature = "journal")]
554 fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
555 Box::new(self.writable_journals.iter().cloned())
556 }
557
558 #[cfg(feature = "journal")]
559 fn active_journal(&self) -> Option<&DynJournal> {
560 self.writable_journals.iter().last().map(|a| a.as_ref())
561 }
562}
563
564#[derive(Clone, Debug)]
567pub struct OverriddenRuntime {
568 inner: Arc<DynRuntime>,
569 task_manager: Option<Arc<dyn VirtualTaskManager>>,
570 networking: Option<DynVirtualNetworking>,
571 http_client: Option<DynHttpClient>,
572 package_loader: Option<Arc<dyn PackageLoader + Send + Sync>>,
573 source: Option<Arc<dyn Source + Send + Sync>>,
574 engine: Option<Engine>,
575 module_cache: Option<Arc<dyn ModuleCache + Send + Sync>>,
576 tty: Option<Arc<dyn TtyBridge + Send + Sync>>,
577 #[cfg(feature = "journal")]
578 pub read_only_journals: Option<Vec<Arc<DynReadableJournal>>>,
579 #[cfg(feature = "journal")]
580 pub writable_journals: Option<Vec<Arc<DynJournal>>>,
581}
582
583impl OverriddenRuntime {
584 pub fn new(inner: Arc<DynRuntime>) -> Self {
585 Self {
586 inner,
587 task_manager: None,
588 networking: None,
589 http_client: None,
590 package_loader: None,
591 source: None,
592 engine: None,
593 module_cache: None,
594 tty: None,
595 #[cfg(feature = "journal")]
596 read_only_journals: None,
597 #[cfg(feature = "journal")]
598 writable_journals: None,
599 }
600 }
601
602 pub fn with_task_manager(mut self, task_manager: Arc<dyn VirtualTaskManager>) -> Self {
603 self.task_manager.replace(task_manager);
604 self
605 }
606
607 pub fn with_networking(mut self, networking: DynVirtualNetworking) -> Self {
608 self.networking.replace(networking);
609 self
610 }
611
612 pub fn with_http_client(mut self, http_client: DynHttpClient) -> Self {
613 self.http_client.replace(http_client);
614 self
615 }
616
617 pub fn with_package_loader(
618 mut self,
619 package_loader: Arc<dyn PackageLoader + Send + Sync>,
620 ) -> Self {
621 self.package_loader.replace(package_loader);
622 self
623 }
624
625 pub fn with_source(mut self, source: Arc<dyn Source + Send + Sync>) -> Self {
626 self.source.replace(source);
627 self
628 }
629
630 pub fn with_engine(mut self, engine: Engine) -> Self {
631 self.engine.replace(engine);
632 self
633 }
634
635 pub fn with_module_cache(mut self, module_cache: Arc<dyn ModuleCache + Send + Sync>) -> Self {
636 self.module_cache.replace(module_cache);
637 self
638 }
639
640 pub fn with_tty(mut self, tty: Arc<dyn TtyBridge + Send + Sync>) -> Self {
641 self.tty.replace(tty);
642 self
643 }
644
645 #[cfg(feature = "journal")]
646 pub fn with_read_only_journals(mut self, journals: Vec<Arc<DynReadableJournal>>) -> Self {
647 self.read_only_journals.replace(journals);
648 self
649 }
650
651 #[cfg(feature = "journal")]
652 pub fn with_writable_journals(mut self, journals: Vec<Arc<DynJournal>>) -> Self {
653 self.writable_journals.replace(journals);
654 self
655 }
656}
657
658impl Runtime for OverriddenRuntime {
659 fn networking(&self) -> &DynVirtualNetworking {
660 if let Some(net) = self.networking.as_ref() {
661 net
662 } else {
663 self.inner.networking()
664 }
665 }
666
667 fn task_manager(&self) -> &Arc<dyn VirtualTaskManager> {
668 if let Some(rt) = self.task_manager.as_ref() {
669 rt
670 } else {
671 self.inner.task_manager()
672 }
673 }
674
675 fn source(&self) -> Arc<dyn Source + Send + Sync> {
676 if let Some(source) = self.source.clone() {
677 source
678 } else {
679 self.inner.source()
680 }
681 }
682
683 fn package_loader(&self) -> Arc<dyn PackageLoader + Send + Sync> {
684 if let Some(loader) = self.package_loader.clone() {
685 loader
686 } else {
687 self.inner.package_loader()
688 }
689 }
690
691 fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
692 if let Some(cache) = self.module_cache.clone() {
693 cache
694 } else {
695 self.inner.module_cache()
696 }
697 }
698
699 fn engine(&self) -> Engine {
700 if let Some(engine) = self.engine.clone() {
701 engine
702 } else {
703 self.inner.engine()
704 }
705 }
706
707 fn new_store(&self) -> wasmer::Store {
708 if let Some(engine) = self.engine.clone() {
709 wasmer::Store::new(engine)
710 } else {
711 self.inner.new_store()
712 }
713 }
714
715 fn http_client(&self) -> Option<&DynHttpClient> {
716 if let Some(client) = self.http_client.as_ref() {
717 Some(client)
718 } else {
719 self.inner.http_client()
720 }
721 }
722
723 fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> {
724 if let Some(tty) = self.tty.as_ref() {
725 Some(tty.deref())
726 } else {
727 self.inner.tty()
728 }
729 }
730
731 #[cfg(feature = "journal")]
732 fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
733 if let Some(journals) = self.read_only_journals.as_ref() {
734 Box::new(journals.iter().cloned())
735 } else {
736 self.inner.read_only_journals()
737 }
738 }
739
740 #[cfg(feature = "journal")]
741 fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
742 if let Some(journals) = self.writable_journals.as_ref() {
743 Box::new(journals.iter().cloned())
744 } else {
745 self.inner.writable_journals()
746 }
747 }
748
749 #[cfg(feature = "journal")]
750 fn active_journal(&self) -> Option<&'_ DynJournal> {
751 if let Some(journals) = self.writable_journals.as_ref() {
752 journals.iter().last().map(|a| a.as_ref())
753 } else {
754 self.inner.active_journal()
755 }
756 }
757}