1use std::{
2 collections::{BTreeSet, HashMap},
3 path::{Path, PathBuf},
4 str::FromStr,
5 sync::{Arc, mpsc::Sender},
6 time::Duration,
7};
8
9use anyhow::{Context, Result, bail};
10use bytes::Bytes;
11use clap::Parser;
12use itertools::Itertools;
13use tokio::runtime::Handle;
14use url::Url;
15use virtual_fs::{
16 ArcFileSystem, DeviceFile, FileSystem, MountFileSystem, OverlayFileSystem,
17 RootFileSystemBuilder,
18};
19use virtual_net::ruleset::Ruleset;
20use wasmer::{Engine, Function, Instance, Memory32, Memory64, Module, RuntimeError, Store, Value};
21use wasmer_config::package::PackageSource as PackageSpecifier;
22use wasmer_types::ModuleHash;
23#[cfg(feature = "journal")]
24use wasmer_wasix::journal::{LogFileJournal, SnapshotTrigger};
25use wasmer_wasix::{
26 PluggableRuntime, RewindState, Runtime, WasiEnv, WasiEnvBuilder, WasiError, WasiFunctionEnv,
27 WasiVersion,
28 bin_factory::BinaryPackage,
29 capabilities::Capabilities,
30 get_wasi_versions,
31 http::HttpClient,
32 journal::{CompactingLogFileJournal, DynJournal, DynReadableJournal},
33 os::{TtyBridge, tty_sys::SysTty},
34 rewind_ext,
35 runners::MAPPED_CURRENT_DIR_DEFAULT_PATH,
36 runners::{MappedCommand, MappedDirectory, MountedDirectory},
37 runtime::{
38 module_cache::{FileSystemCache, ModuleCache},
39 package_loader::{BuiltinPackageLoader, PackageLoader},
40 resolver::{
41 BackendSource, FileSystemSource, InMemorySource, MultiSource, Source, WebSource,
42 },
43 task_manager::{
44 VirtualTaskManagerExt,
45 tokio::{RuntimeOrHandle, TokioTaskManager},
46 },
47 },
48 types::__WASI_STDIN_FILENO,
49 wasmer_wasix_types::wasi::Errno,
50};
51
52use crate::{
53 config::{UserRegistry, WasmerEnv},
54 utils::{
55 WAPM_SOURCE_CACHE_TIMEOUT, parse_envvar, parse_mapdir, parse_volume,
56 registry_query_cache_dir,
57 },
58};
59
60use super::{
61 CliPackageSource, ExecutableTarget,
62 capabilities::{self, PkgCapabilityCache},
63};
64
65#[derive(Debug, Parser, Clone, Default)]
66pub struct Wasi {
68 #[clap(
70 long = "volume",
71 name = "[HOST_DIR:]GUEST_DIR",
72 value_parser = parse_volume,
73 )]
74 pub(crate) volumes: Vec<MappedDirectory>,
75
76 #[clap(long = "dir", group = "wasi", hide = true)]
78 pub(crate) pre_opened_directories: Vec<PathBuf>,
79
80 #[clap(
82 long = "mapdir",
83 value_parser = parse_mapdir,
84 hide = true
85 )]
86 pub(crate) mapped_dirs: Vec<MappedDirectory>,
87
88 #[clap(long = "cwd")]
91 pub(crate) cwd: Option<PathBuf>,
92
93 #[clap(
95 long = "env",
96 name = "KEY=VALUE",
97 value_parser=parse_envvar,
98 )]
99 pub(crate) env_vars: Vec<(String, String)>,
100
101 #[clap(long, env)]
103 pub(crate) forward_host_env: bool,
104
105 #[clap(long = "use", name = "USE")]
107 pub(crate) uses: Vec<String>,
108
109 #[clap(long = "include-webc", name = "WEBC")]
112 pub(super) include_webcs: Vec<PathBuf>,
113
114 #[clap(long = "map-command", name = "MAPCMD")]
116 pub(super) map_commands: Vec<String>,
117
118 #[clap(long = "net", require_equals = true)]
137 pub networking: Option<Option<String>>,
140
141 #[clap(long = "no-tty")]
143 pub no_tty: bool,
144
145 #[clap(
149 long = "enable-async-threads",
150 require_equals = true,
151 default_missing_value = "true",
152 num_args = 0..=1,
153 action = clap::ArgAction::Set
154 )]
155 pub enable_async_threads: Option<bool>,
156
157 #[clap(long = "enable-cpu-backoff")]
162 pub enable_cpu_backoff: Option<u64>,
163
164 #[cfg(feature = "journal")]
170 #[clap(long = "journal")]
171 pub read_only_journals: Vec<PathBuf>,
172
173 #[cfg(feature = "journal")]
183 #[clap(long = "journal-writable")]
184 pub writable_journals: Vec<PathBuf>,
185
186 #[cfg(feature = "journal")]
189 #[clap(long = "enable-compaction")]
190 pub enable_compaction: bool,
191
192 #[cfg(feature = "journal")]
194 #[clap(long = "without-compact-on-drop")]
195 pub without_compact_on_drop: bool,
196
197 #[cfg(feature = "journal")]
203 #[clap(long = "with-compact-on-growth", default_value = "0.15")]
204 pub with_compact_on_growth: f32,
205
206 #[cfg(feature = "journal")]
217 #[clap(long = "snapshot-on")]
218 pub snapshot_on: Vec<SnapshotTrigger>,
219
220 #[cfg(feature = "journal")]
224 #[clap(long = "snapshot-period")]
225 pub snapshot_interval: Option<u64>,
226
227 #[cfg(feature = "journal")]
230 #[clap(long = "stop-after-snapshot")]
231 pub stop_after_snapshot: bool,
232
233 #[cfg(feature = "journal")]
235 #[clap(long = "skip-journal-stdio")]
236 pub skip_stdio_during_bootstrap: bool,
237
238 #[clap(long)]
242 pub http_client: bool,
243
244 #[clap(long = "deny-multiple-wasi-versions")]
246 pub deny_multiple_wasi_versions: bool,
247
248 #[clap(long = "disable-cache")]
253 disable_cache: bool,
254}
255
256pub struct RunProperties {
257 pub ctx: WasiFunctionEnv,
258 pub path: PathBuf,
259 pub invoke: Option<String>,
260 pub args: Vec<String>,
261}
262
263#[allow(dead_code)]
264impl Wasi {
265 pub fn map_dir(&mut self, alias: &str, target_on_disk: PathBuf) {
266 self.volumes.push(MappedDirectory {
267 guest: alias.to_string(),
268 host: target_on_disk,
269 });
270 }
271
272 pub fn set_env(&mut self, key: &str, value: &str) {
273 self.env_vars.push((key.to_string(), value.to_string()));
274 }
275
276 pub fn get_versions(module: &Module) -> Option<BTreeSet<WasiVersion>> {
278 get_wasi_versions(module, false)
283 }
284
285 pub fn has_wasi_imports(module: &Module) -> bool {
287 get_wasi_versions(module, false).is_some()
290 }
291
292 pub(crate) fn all_volumes(&self) -> Vec<MappedDirectory> {
293 self.volumes
294 .iter()
295 .cloned()
296 .chain(self.pre_opened_directories.iter().map(|d| MappedDirectory {
297 host: d.clone(),
298 guest: d.to_str().expect("must be a valid path string").to_string(),
299 }))
300 .chain(self.mapped_dirs.iter().cloned())
301 .collect_vec()
302 }
303
304 pub fn prepare(
305 &self,
306 module: &Module,
307 program_name: String,
308 args: Vec<String>,
309 rt: Arc<dyn Runtime + Send + Sync>,
310 ) -> Result<WasiEnvBuilder> {
311 let args = args.into_iter().map(|arg| arg.into_bytes());
312
313 let map_commands = self
314 .map_commands
315 .iter()
316 .map(|map| map.split_once('=').unwrap())
317 .map(|(a, b)| (a.to_string(), b.to_string()))
318 .collect::<HashMap<_, _>>();
319
320 let mut uses = Vec::new();
321 for name in &self.uses {
322 let specifier = PackageSpecifier::from_str(name)
323 .with_context(|| format!("Unable to parse \"{name}\" as a package specifier"))?;
324 let pkg = {
325 let inner_rt = rt.clone();
326 rt.task_manager()
327 .spawn_and_block_on(async move {
328 BinaryPackage::from_registry(&specifier, &*inner_rt).await
329 })
330 .with_context(|| format!("Unable to load \"{name}\""))??
331 };
332 uses.push(pkg);
333 }
334
335 let mut builder = WasiEnv::builder(program_name)
336 .runtime(Arc::clone(&rt))
337 .args(args)
338 .envs(self.env_vars.clone())
339 .uses(uses)
340 .map_commands(map_commands);
341
342 let mut builder = {
343 let mount_fs = RootFileSystemBuilder::new()
344 .with_tty(Box::new(DeviceFile::new(__WASI_STDIN_FILENO)))
345 .build();
346 let (have_current_dir, mapped_dirs) = self.build_mapped_directories(false)?;
347 let mut root_layers: Vec<Arc<dyn FileSystem + Send + Sync>> = Vec::new();
348
349 for mapped in mapped_dirs {
350 let MountedDirectory { guest, fs } = MountedDirectory::from(mapped);
351 if guest == "/" {
352 root_layers.push(fs);
353 } else {
354 mount_fs.mount(&guest, Arc::new(fs))?;
355 }
356 }
357
358 if !root_layers.is_empty() {
359 let existing_root = mount_fs
360 .filesystem_at(Path::new("/"))
361 .expect("root fs builder should always mount /");
362 mount_fs.set_mount(
363 Path::new("/"),
364 Arc::new(OverlayFileSystem::new(
365 ArcFileSystem::new(existing_root),
366 root_layers,
367 )),
368 )?;
369 };
370
371 if let Some(cwd) = self.cwd.as_ref() {
372 if !cwd.starts_with("/") {
373 bail!("The argument to --cwd must be an absolute path");
374 }
375 builder = builder.current_dir(cwd.clone());
376 }
377
378 builder = builder
380 .mount_fs(mount_fs)
381 .preopen_dir(Path::new("/"))
382 .unwrap();
383
384 let dot_path = if have_current_dir {
385 PathBuf::from(MAPPED_CURRENT_DIR_DEFAULT_PATH)
386 } else {
387 PathBuf::from("/")
388 };
389
390 builder.add_preopen_build(|p| {
391 p.directory(&dot_path)
392 .alias(".")
393 .read(true)
394 .write(true)
395 .create(true)
396 })?;
397
398 builder
399 };
400
401 *builder.capabilities_mut() = self.capabilities();
402
403 #[cfg(feature = "journal")]
404 {
405 for trigger in self.snapshot_on.iter().cloned() {
406 builder.add_snapshot_trigger(trigger);
407 }
408 if let Some(interval) = self.snapshot_interval {
409 builder.with_snapshot_interval(std::time::Duration::from_millis(interval));
410 }
411 if self.stop_after_snapshot {
412 builder.with_stop_running_after_snapshot(true);
413 }
414 let (r, w) = self.build_journals()?;
415 for journal in r {
416 builder.add_read_only_journal(journal);
417 }
418 for journal in w {
419 builder.add_writable_journal(journal);
420 }
421 builder.with_skip_stdio_during_bootstrap(self.skip_stdio_during_bootstrap);
422 }
423
424 Ok(builder)
425 }
426
427 #[cfg(feature = "journal")]
428 #[allow(clippy::type_complexity)]
429 pub fn build_journals(
430 &self,
431 ) -> anyhow::Result<(Vec<Arc<DynReadableJournal>>, Vec<Arc<DynJournal>>)> {
432 let mut readable = Vec::new();
433 for journal in self.read_only_journals.clone() {
434 if matches!(std::fs::metadata(&journal), Err(e) if e.kind() == std::io::ErrorKind::NotFound)
435 {
436 bail!("Read-only journal file does not exist: {journal:?}");
437 }
438
439 readable
440 .push(Arc::new(LogFileJournal::new_readonly(journal)?) as Arc<DynReadableJournal>);
441 }
442
443 let mut writable = Vec::new();
444 for journal in self.writable_journals.clone() {
445 if self.enable_compaction {
446 let mut journal = CompactingLogFileJournal::new(journal)?;
447 if !self.without_compact_on_drop {
448 journal = journal.with_compact_on_drop()
449 }
450 if self.with_compact_on_growth.is_normal() && self.with_compact_on_growth != 0f32 {
451 journal = journal.with_compact_on_factor_size(self.with_compact_on_growth);
452 }
453 writable.push(Arc::new(journal) as Arc<DynJournal>);
454 } else {
455 writable.push(Arc::new(LogFileJournal::new(journal)?));
456 }
457 }
458 Ok((readable, writable))
459 }
460
461 #[cfg(not(feature = "journal"))]
462 pub fn build_journals(&self) -> anyhow::Result<Vec<Arc<DynJournal>>> {
463 Ok(Vec::new())
464 }
465
466 pub fn build_mapped_directories(
467 &self,
468 is_wasix: bool,
469 ) -> Result<(bool, Vec<MappedDirectory>), anyhow::Error> {
470 let mut mapped_dirs = Vec::new();
471
472 let mut have_current_dir = false;
474 for MappedDirectory { host, guest } in &self.all_volumes() {
475 let resolved_host = host.canonicalize().with_context(|| {
476 format!(
477 "could not canonicalize path for argument '--volume {}:{}'",
478 host.display(),
479 guest,
480 )
481 })?;
482
483 if guest == "/" && is_wasix {
484 tracing::warn!(
487 "Mounting on the guest's virtual root with --volume <HOST_DIR>:/ breaks WASIX modules' filesystems"
488 );
489 }
490
491 let mapping = if guest == "." {
492 if have_current_dir {
493 bail!(
494 "Cannot pre-open the current directory twice: '--volume=.' must only be specified once"
495 );
496 }
497 have_current_dir = true;
498
499 let host = if host == Path::new(".") {
500 std::env::current_dir().context("could not determine current directory")?
501 } else {
502 host.clone()
503 };
504 MappedDirectory {
505 host: resolved_host,
506 guest: if is_wasix {
507 MAPPED_CURRENT_DIR_DEFAULT_PATH.to_string()
508 } else {
509 "/".to_string()
510 },
511 }
512 } else {
513 MappedDirectory {
514 host: resolved_host,
515 guest: guest.clone(),
516 }
517 };
518 mapped_dirs.push(mapping);
519 }
520
521 Ok((have_current_dir, mapped_dirs))
522 }
523
524 pub fn build_mapped_commands(&self) -> Result<Vec<MappedCommand>, anyhow::Error> {
525 self.map_commands
526 .iter()
527 .map(|item| {
528 let (a, b) = item.split_once('=').with_context(|| {
529 format!(
530 "Invalid --map-command flag: expected <ALIAS>=<HOST_PATH>, got '{item}'"
531 )
532 })?;
533
534 let a = a.trim();
535 let b = b.trim();
536
537 if a.is_empty() {
538 bail!("Invalid --map-command flag - alias cannot be empty: '{item}'");
539 }
540 if b.is_empty() {
542 bail!("Invalid --map-command flag - host path cannot be empty: '{item}'");
543 }
544
545 Ok(MappedCommand {
546 alias: a.to_string(),
547 target: b.to_string(),
548 })
549 })
550 .collect::<Result<Vec<_>, anyhow::Error>>()
551 }
552
553 pub fn capabilities(&self) -> Capabilities {
554 let mut caps = Capabilities::default();
555
556 if self.http_client {
557 caps.http_client = wasmer_wasix::http::HttpClientCapabilityV1::new_allow_all();
558 }
559
560 if let Some(enable_async_threads) = self.enable_async_threads {
561 caps.threading.enable_asynchronous_threading = enable_async_threads;
562 }
563 caps.threading.enable_exponential_cpu_backoff =
564 self.enable_cpu_backoff.map(Duration::from_millis);
565
566 caps
567 }
568
569 pub fn prepare_runtime<I>(
570 &self,
571 engine: Engine,
572 env: &WasmerEnv,
573 pkg_cache_path: &Path,
574 rt_or_handle: I,
575 preferred_webc_version: webc::Version,
576 compiler_debug_dir_used: bool,
577 ) -> Result<impl Runtime + Send + Sync + use<I>>
578 where
579 I: Into<RuntimeOrHandle>,
580 {
581 let tokio_task_manager = Arc::new(TokioTaskManager::new(rt_or_handle.into()));
582 let mut rt = PluggableRuntime::new(tokio_task_manager.clone());
583
584 let has_networking = self.networking.is_some()
585 || capabilities::get_cached_capability(pkg_cache_path)
586 .ok()
587 .is_some_and(|v| v.enable_networking);
588
589 let ruleset = self
590 .networking
591 .clone()
592 .flatten()
593 .map(|ruleset| Ruleset::from_str(&ruleset))
594 .transpose()?;
595
596 let network = if let Some(ruleset) = ruleset {
597 virtual_net::host::LocalNetworking::with_ruleset(ruleset)
598 } else {
599 virtual_net::host::LocalNetworking::default()
600 };
601
602 if has_networking {
603 rt.set_networking_implementation(network);
604 } else {
605 let net = super::capabilities::net::AskingNetworking::new(
606 pkg_cache_path.to_path_buf(),
607 Arc::new(network),
608 );
609
610 rt.set_networking_implementation(net);
611 }
612
613 #[cfg(feature = "journal")]
614 {
615 let (r, w) = self.build_journals()?;
616 for journal in r {
617 rt.add_read_only_journal(journal);
618 }
619 for journal in w {
620 rt.add_writable_journal(journal);
621 }
622 }
623
624 if !self.no_tty {
625 let tty = Arc::new(SysTty);
626 tty.reset();
627 rt.set_tty(tty);
628 }
629
630 let client =
631 wasmer_wasix::http::default_http_client().context("No HTTP client available")?;
632 let client = Arc::new(client);
633
634 let package_loader = self
635 .prepare_package_loader(env, client.clone())
636 .context("Unable to prepare the package loader")?;
637
638 let registry = self.prepare_source(env, client, preferred_webc_version)?;
639
640 if !self.disable_cache && !compiler_debug_dir_used {
641 let cache_dir = env.cache_dir().join("compiled");
642 let module_cache = wasmer_wasix::runtime::module_cache::in_memory()
643 .with_fallback(FileSystemCache::new(cache_dir, tokio_task_manager));
644 rt.set_module_cache(module_cache);
645 }
646
647 rt.set_package_loader(package_loader)
648 .set_source(registry)
649 .set_engine(engine);
650
651 Ok(rt)
652 }
653
654 pub fn instantiate(
656 &self,
657 module: &Module,
658 module_hash: ModuleHash,
659 program_name: String,
660 args: Vec<String>,
661 runtime: Arc<dyn Runtime + Send + Sync>,
662 store: &mut Store,
663 ) -> Result<(WasiFunctionEnv, Instance)> {
664 let builder = self.prepare(module, program_name, args, runtime)?;
665 let (instance, wasi_env) = builder.instantiate_ext(module.clone(), module_hash, store)?;
666
667 Ok((wasi_env, instance))
668 }
669
670 pub fn for_binfmt_interpreter() -> Result<Self> {
671 let dir = std::env::var_os("WASMER_BINFMT_MISC_PREOPEN")
672 .map(Into::into)
673 .unwrap_or_else(|| PathBuf::from("."));
674 Ok(Self {
675 deny_multiple_wasi_versions: true,
676 env_vars: std::env::vars().collect(),
677 volumes: vec![MappedDirectory {
678 host: dir.clone(),
679 guest: dir
680 .to_str()
681 .expect("dir must be a valid string")
682 .to_string(),
683 }],
684 ..Self::default()
685 })
686 }
687
688 fn prepare_package_loader(
689 &self,
690 env: &WasmerEnv,
691 client: Arc<dyn HttpClient + Send + Sync>,
692 ) -> Result<BuiltinPackageLoader> {
693 let checkout_dir = env.cache_dir().join("checkouts");
694 let tokens = tokens_by_authority(env)?;
695
696 let loader = BuiltinPackageLoader::new()
697 .with_cache_dir(checkout_dir)
698 .with_shared_http_client(client)
699 .with_tokens(tokens);
700
701 Ok(loader)
702 }
703
704 fn prepare_source(
705 &self,
706 env: &WasmerEnv,
707 client: Arc<dyn HttpClient + Send + Sync>,
708 preferred_webc_version: webc::Version,
709 ) -> Result<MultiSource> {
710 let mut source = MultiSource::default();
711
712 let mut preloaded = InMemorySource::new();
715 for path in &self.include_webcs {
716 preloaded
717 .add_webc(path)
718 .with_context(|| format!("Unable to load \"{}\"", path.display()))?;
719 }
720 source.add_source(preloaded);
721
722 let graphql_endpoint = self.graphql_endpoint(env)?;
723 let cache_dir = registry_query_cache_dir(env.cache_dir(), &graphql_endpoint);
724 let mut wapm_source = BackendSource::new(graphql_endpoint, Arc::clone(&client))
725 .with_local_cache(cache_dir, WAPM_SOURCE_CACHE_TIMEOUT)
726 .with_preferred_webc_version(preferred_webc_version);
727 if let Some(token) = env
728 .config()?
729 .registry
730 .get_login_token_for_registry(wapm_source.registry_endpoint().as_str())
731 {
732 wapm_source = wapm_source.with_auth_token(token);
733 }
734 source.add_source(wapm_source);
735
736 let cache_dir = env.cache_dir().join("downloads");
737 source.add_source(WebSource::new(cache_dir, client));
738
739 source.add_source(FileSystemSource::default());
740
741 Ok(source)
742 }
743
744 fn graphql_endpoint(&self, env: &WasmerEnv) -> Result<Url> {
745 if let Ok(endpoint) = env.registry_endpoint() {
746 return Ok(endpoint);
747 }
748
749 let config = env.config()?;
750 let graphql_endpoint = config.registry.get_graphql_url();
751 let graphql_endpoint = graphql_endpoint
752 .parse()
753 .with_context(|| format!("Unable to parse \"{graphql_endpoint}\" as a URL"))?;
754
755 Ok(graphql_endpoint)
756 }
757}
758
759fn parse_registry(r: &str) -> Result<Url> {
760 UserRegistry::from(r).graphql_endpoint()
761}
762
763fn tokens_by_authority(env: &WasmerEnv) -> Result<HashMap<String, String>> {
764 let mut tokens = HashMap::new();
765 let config = env.config()?;
766
767 for credentials in config.registry.tokens {
768 if let Ok(url) = Url::parse(&credentials.registry)
769 && url.has_authority()
770 {
771 tokens.insert(url.authority().to_string(), credentials.token);
772 }
773 }
774
775 if let (Ok(current_registry), Some(token)) = (env.registry_endpoint(), env.token())
776 && current_registry.has_authority()
777 {
778 tokens.insert(current_registry.authority().to_string(), token);
779 }
780
781 let mut frontend_tokens = HashMap::new();
794 for (hostname, token) in &tokens {
795 if let Some(frontend_url) = hostname.strip_prefix("registry.")
796 && !tokens.contains_key(frontend_url)
797 {
798 frontend_tokens.insert(frontend_url.to_string(), token.clone());
799 }
800 }
801 tokens.extend(frontend_tokens);
802
803 Ok(tokens)
804}