1use std::{
2 collections::{BTreeSet, HashMap},
3 path::{Path, PathBuf},
4 str::FromStr,
5 sync::{Arc, mpsc::Sender},
6 time::Duration,
7};
8
9use anyhow::{Context, Result, bail};
10use bytes::Bytes;
11use clap::Parser;
12use itertools::Itertools;
13use tokio::runtime::Handle;
14use url::Url;
15use virtual_fs::{
16 ArcFileSystem, DeviceFile, FileSystem, MountFileSystem, OverlayFileSystem,
17 RootFileSystemBuilder,
18};
19use virtual_net::ruleset::Ruleset;
20use wasmer::{Engine, Function, Instance, Memory32, Memory64, Module, RuntimeError, Store, Value};
21use wasmer_config::package::PackageSource as PackageSpecifier;
22use wasmer_types::ModuleHash;
23#[cfg(feature = "journal")]
24use wasmer_wasix::journal::{LogFileJournal, SnapshotTrigger};
25use wasmer_wasix::{
26 PluggableRuntime, RewindState, Runtime, WasiEnv, WasiEnvBuilder, WasiError, WasiFunctionEnv,
27 WasiVersion,
28 bin_factory::BinaryPackage,
29 capabilities::Capabilities,
30 get_wasi_versions,
31 http::HttpClient,
32 journal::{CompactingLogFileJournal, DynJournal, DynReadableJournal},
33 os::{TtyBridge, tty_sys::SysTty},
34 rewind_ext,
35 runners::MAPPED_CURRENT_DIR_DEFAULT_PATH,
36 runners::{MappedCommand, MappedDirectory, MountedDirectory},
37 runtime::{
38 module_cache::{FileSystemCache, ModuleCache},
39 package_loader::{BuiltinPackageLoader, PackageLoader},
40 resolver::{
41 BackendSource, FileSystemSource, InMemorySource, MultiSource, Source, WebSource,
42 },
43 task_manager::{
44 VirtualTaskManagerExt,
45 tokio::{RuntimeOrHandle, TokioTaskManager},
46 },
47 },
48 types::__WASI_STDIN_FILENO,
49 wasmer_wasix_types::wasi::Errno,
50};
51
52use crate::{
53 config::{UserRegistry, WasmerEnv},
54 utils::{parse_envvar, parse_mapdir, parse_volume},
55};
56
57use super::{
58 CliPackageSource, ExecutableTarget,
59 capabilities::{self, PkgCapabilityCache},
60};
61
62const WAPM_SOURCE_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
63
64#[derive(Debug, Parser, Clone, Default)]
65pub struct Wasi {
67 #[clap(
69 long = "volume",
70 name = "[HOST_DIR:]GUEST_DIR",
71 value_parser = parse_volume,
72 )]
73 pub(crate) volumes: Vec<MappedDirectory>,
74
75 #[clap(long = "dir", group = "wasi", hide = true)]
77 pub(crate) pre_opened_directories: Vec<PathBuf>,
78
79 #[clap(
81 long = "mapdir",
82 value_parser = parse_mapdir,
83 hide = true
84 )]
85 pub(crate) mapped_dirs: Vec<MappedDirectory>,
86
87 #[clap(long = "cwd")]
90 pub(crate) cwd: Option<PathBuf>,
91
92 #[clap(
94 long = "env",
95 name = "KEY=VALUE",
96 value_parser=parse_envvar,
97 )]
98 pub(crate) env_vars: Vec<(String, String)>,
99
100 #[clap(long, env)]
102 pub(crate) forward_host_env: bool,
103
104 #[clap(long = "use", name = "USE")]
106 pub(crate) uses: Vec<String>,
107
108 #[clap(long = "include-webc", name = "WEBC")]
111 pub(super) include_webcs: Vec<PathBuf>,
112
113 #[clap(long = "map-command", name = "MAPCMD")]
115 pub(super) map_commands: Vec<String>,
116
117 #[clap(long = "net", require_equals = true)]
136 pub networking: Option<Option<String>>,
139
140 #[clap(long = "no-tty")]
142 pub no_tty: bool,
143
144 #[clap(
148 long = "enable-async-threads",
149 require_equals = true,
150 default_missing_value = "true",
151 num_args = 0..=1,
152 action = clap::ArgAction::Set
153 )]
154 pub enable_async_threads: Option<bool>,
155
156 #[clap(long = "enable-cpu-backoff")]
161 pub enable_cpu_backoff: Option<u64>,
162
163 #[cfg(feature = "journal")]
169 #[clap(long = "journal")]
170 pub read_only_journals: Vec<PathBuf>,
171
172 #[cfg(feature = "journal")]
182 #[clap(long = "journal-writable")]
183 pub writable_journals: Vec<PathBuf>,
184
185 #[cfg(feature = "journal")]
188 #[clap(long = "enable-compaction")]
189 pub enable_compaction: bool,
190
191 #[cfg(feature = "journal")]
193 #[clap(long = "without-compact-on-drop")]
194 pub without_compact_on_drop: bool,
195
196 #[cfg(feature = "journal")]
202 #[clap(long = "with-compact-on-growth", default_value = "0.15")]
203 pub with_compact_on_growth: f32,
204
205 #[cfg(feature = "journal")]
216 #[clap(long = "snapshot-on")]
217 pub snapshot_on: Vec<SnapshotTrigger>,
218
219 #[cfg(feature = "journal")]
223 #[clap(long = "snapshot-period")]
224 pub snapshot_interval: Option<u64>,
225
226 #[cfg(feature = "journal")]
229 #[clap(long = "stop-after-snapshot")]
230 pub stop_after_snapshot: bool,
231
232 #[cfg(feature = "journal")]
234 #[clap(long = "skip-journal-stdio")]
235 pub skip_stdio_during_bootstrap: bool,
236
237 #[clap(long)]
241 pub http_client: bool,
242
243 #[clap(long = "deny-multiple-wasi-versions")]
245 pub deny_multiple_wasi_versions: bool,
246
247 #[clap(long = "disable-cache")]
252 disable_cache: bool,
253}
254
255pub struct RunProperties {
256 pub ctx: WasiFunctionEnv,
257 pub path: PathBuf,
258 pub invoke: Option<String>,
259 pub args: Vec<String>,
260}
261
262fn endpoint_to_folder(url: &Url) -> String {
263 url.to_string()
264 .replace("registry.wasmer.io", "wasmer.io")
265 .replace("registry.wasmer.wtf", "wasmer.wtf")
266 .replace(|c| "/:?&=#%\\".contains(c), "_")
267}
268
269#[allow(dead_code)]
270impl Wasi {
271 pub fn map_dir(&mut self, alias: &str, target_on_disk: PathBuf) {
272 self.volumes.push(MappedDirectory {
273 guest: alias.to_string(),
274 host: target_on_disk,
275 });
276 }
277
278 pub fn set_env(&mut self, key: &str, value: &str) {
279 self.env_vars.push((key.to_string(), value.to_string()));
280 }
281
282 pub fn get_versions(module: &Module) -> Option<BTreeSet<WasiVersion>> {
284 get_wasi_versions(module, false)
289 }
290
291 pub fn has_wasi_imports(module: &Module) -> bool {
293 get_wasi_versions(module, false).is_some()
296 }
297
298 pub(crate) fn all_volumes(&self) -> Vec<MappedDirectory> {
299 self.volumes
300 .iter()
301 .cloned()
302 .chain(self.pre_opened_directories.iter().map(|d| MappedDirectory {
303 host: d.clone(),
304 guest: d.to_str().expect("must be a valid path string").to_string(),
305 }))
306 .chain(self.mapped_dirs.iter().cloned())
307 .collect_vec()
308 }
309
310 pub fn prepare(
311 &self,
312 module: &Module,
313 program_name: String,
314 args: Vec<String>,
315 rt: Arc<dyn Runtime + Send + Sync>,
316 ) -> Result<WasiEnvBuilder> {
317 let args = args.into_iter().map(|arg| arg.into_bytes());
318
319 let map_commands = self
320 .map_commands
321 .iter()
322 .map(|map| map.split_once('=').unwrap())
323 .map(|(a, b)| (a.to_string(), b.to_string()))
324 .collect::<HashMap<_, _>>();
325
326 let mut uses = Vec::new();
327 for name in &self.uses {
328 let specifier = PackageSpecifier::from_str(name)
329 .with_context(|| format!("Unable to parse \"{name}\" as a package specifier"))?;
330 let pkg = {
331 let inner_rt = rt.clone();
332 rt.task_manager()
333 .spawn_and_block_on(async move {
334 BinaryPackage::from_registry(&specifier, &*inner_rt).await
335 })
336 .with_context(|| format!("Unable to load \"{name}\""))??
337 };
338 uses.push(pkg);
339 }
340
341 let mut builder = WasiEnv::builder(program_name)
342 .runtime(Arc::clone(&rt))
343 .args(args)
344 .envs(self.env_vars.clone())
345 .uses(uses)
346 .map_commands(map_commands);
347
348 let mut builder = {
349 let mount_fs = RootFileSystemBuilder::new()
350 .with_tty(Box::new(DeviceFile::new(__WASI_STDIN_FILENO)))
351 .build();
352 let (have_current_dir, mapped_dirs) = self.build_mapped_directories(false)?;
353 let mut root_layers: Vec<Arc<dyn FileSystem + Send + Sync>> = Vec::new();
354
355 for mapped in mapped_dirs {
356 let MountedDirectory { guest, fs } = MountedDirectory::from(mapped);
357 if guest == "/" {
358 root_layers.push(fs);
359 } else {
360 mount_fs.mount(&guest, Arc::new(fs))?;
361 }
362 }
363
364 if !root_layers.is_empty() {
365 let existing_root = mount_fs
366 .filesystem_at(Path::new("/"))
367 .expect("root fs builder should always mount /");
368 mount_fs.set_mount(
369 Path::new("/"),
370 Arc::new(OverlayFileSystem::new(
371 ArcFileSystem::new(existing_root),
372 root_layers,
373 )),
374 )?;
375 };
376
377 if let Some(cwd) = self.cwd.as_ref() {
378 if !cwd.starts_with("/") {
379 bail!("The argument to --cwd must be an absolute path");
380 }
381 builder = builder.current_dir(cwd.clone());
382 }
383
384 builder = builder
386 .mount_fs(mount_fs)
387 .preopen_dir(Path::new("/"))
388 .unwrap();
389
390 let dot_path = if have_current_dir {
391 PathBuf::from(MAPPED_CURRENT_DIR_DEFAULT_PATH)
392 } else {
393 PathBuf::from("/")
394 };
395
396 builder.add_preopen_build(|p| {
397 p.directory(&dot_path)
398 .alias(".")
399 .read(true)
400 .write(true)
401 .create(true)
402 })?;
403
404 builder
405 };
406
407 *builder.capabilities_mut() = self.capabilities();
408
409 #[cfg(feature = "journal")]
410 {
411 for trigger in self.snapshot_on.iter().cloned() {
412 builder.add_snapshot_trigger(trigger);
413 }
414 if let Some(interval) = self.snapshot_interval {
415 builder.with_snapshot_interval(std::time::Duration::from_millis(interval));
416 }
417 if self.stop_after_snapshot {
418 builder.with_stop_running_after_snapshot(true);
419 }
420 let (r, w) = self.build_journals()?;
421 for journal in r {
422 builder.add_read_only_journal(journal);
423 }
424 for journal in w {
425 builder.add_writable_journal(journal);
426 }
427 builder.with_skip_stdio_during_bootstrap(self.skip_stdio_during_bootstrap);
428 }
429
430 Ok(builder)
431 }
432
433 #[cfg(feature = "journal")]
434 #[allow(clippy::type_complexity)]
435 pub fn build_journals(
436 &self,
437 ) -> anyhow::Result<(Vec<Arc<DynReadableJournal>>, Vec<Arc<DynJournal>>)> {
438 let mut readable = Vec::new();
439 for journal in self.read_only_journals.clone() {
440 if matches!(std::fs::metadata(&journal), Err(e) if e.kind() == std::io::ErrorKind::NotFound)
441 {
442 bail!("Read-only journal file does not exist: {journal:?}");
443 }
444
445 readable
446 .push(Arc::new(LogFileJournal::new_readonly(journal)?) as Arc<DynReadableJournal>);
447 }
448
449 let mut writable = Vec::new();
450 for journal in self.writable_journals.clone() {
451 if self.enable_compaction {
452 let mut journal = CompactingLogFileJournal::new(journal)?;
453 if !self.without_compact_on_drop {
454 journal = journal.with_compact_on_drop()
455 }
456 if self.with_compact_on_growth.is_normal() && self.with_compact_on_growth != 0f32 {
457 journal = journal.with_compact_on_factor_size(self.with_compact_on_growth);
458 }
459 writable.push(Arc::new(journal) as Arc<DynJournal>);
460 } else {
461 writable.push(Arc::new(LogFileJournal::new(journal)?));
462 }
463 }
464 Ok((readable, writable))
465 }
466
467 #[cfg(not(feature = "journal"))]
468 pub fn build_journals(&self) -> anyhow::Result<Vec<Arc<DynJournal>>> {
469 Ok(Vec::new())
470 }
471
472 pub fn build_mapped_directories(
473 &self,
474 is_wasix: bool,
475 ) -> Result<(bool, Vec<MappedDirectory>), anyhow::Error> {
476 let mut mapped_dirs = Vec::new();
477
478 let mut have_current_dir = false;
480 for MappedDirectory { host, guest } in &self.all_volumes() {
481 let resolved_host = host.canonicalize().with_context(|| {
482 format!(
483 "could not canonicalize path for argument '--volume {}:{}'",
484 host.display(),
485 guest,
486 )
487 })?;
488
489 if guest == "/" && is_wasix {
490 tracing::warn!(
493 "Mounting on the guest's virtual root with --volume <HOST_DIR>:/ breaks WASIX modules' filesystems"
494 );
495 }
496
497 let mapping = if guest == "." {
498 if have_current_dir {
499 bail!(
500 "Cannot pre-open the current directory twice: '--volume=.' must only be specified once"
501 );
502 }
503 have_current_dir = true;
504
505 let host = if host == Path::new(".") {
506 std::env::current_dir().context("could not determine current directory")?
507 } else {
508 host.clone()
509 };
510 MappedDirectory {
511 host: resolved_host,
512 guest: if is_wasix {
513 MAPPED_CURRENT_DIR_DEFAULT_PATH.to_string()
514 } else {
515 "/".to_string()
516 },
517 }
518 } else {
519 MappedDirectory {
520 host: resolved_host,
521 guest: guest.clone(),
522 }
523 };
524 mapped_dirs.push(mapping);
525 }
526
527 Ok((have_current_dir, mapped_dirs))
528 }
529
530 pub fn build_mapped_commands(&self) -> Result<Vec<MappedCommand>, anyhow::Error> {
531 self.map_commands
532 .iter()
533 .map(|item| {
534 let (a, b) = item.split_once('=').with_context(|| {
535 format!(
536 "Invalid --map-command flag: expected <ALIAS>=<HOST_PATH>, got '{item}'"
537 )
538 })?;
539
540 let a = a.trim();
541 let b = b.trim();
542
543 if a.is_empty() {
544 bail!("Invalid --map-command flag - alias cannot be empty: '{item}'");
545 }
546 if b.is_empty() {
548 bail!("Invalid --map-command flag - host path cannot be empty: '{item}'");
549 }
550
551 Ok(MappedCommand {
552 alias: a.to_string(),
553 target: b.to_string(),
554 })
555 })
556 .collect::<Result<Vec<_>, anyhow::Error>>()
557 }
558
559 pub fn capabilities(&self) -> Capabilities {
560 let mut caps = Capabilities::default();
561
562 if self.http_client {
563 caps.http_client = wasmer_wasix::http::HttpClientCapabilityV1::new_allow_all();
564 }
565
566 if let Some(enable_async_threads) = self.enable_async_threads {
567 caps.threading.enable_asynchronous_threading = enable_async_threads;
568 }
569 caps.threading.enable_exponential_cpu_backoff =
570 self.enable_cpu_backoff.map(Duration::from_millis);
571
572 caps
573 }
574
575 pub fn prepare_runtime<I>(
576 &self,
577 engine: Engine,
578 env: &WasmerEnv,
579 pkg_cache_path: &Path,
580 rt_or_handle: I,
581 preferred_webc_version: webc::Version,
582 compiler_debug_dir_used: bool,
583 ) -> Result<impl Runtime + Send + Sync + use<I>>
584 where
585 I: Into<RuntimeOrHandle>,
586 {
587 let tokio_task_manager = Arc::new(TokioTaskManager::new(rt_or_handle.into()));
588 let mut rt = PluggableRuntime::new(tokio_task_manager.clone());
589
590 let has_networking = self.networking.is_some()
591 || capabilities::get_cached_capability(pkg_cache_path)
592 .ok()
593 .is_some_and(|v| v.enable_networking);
594
595 let ruleset = self
596 .networking
597 .clone()
598 .flatten()
599 .map(|ruleset| Ruleset::from_str(&ruleset))
600 .transpose()?;
601
602 let network = if let Some(ruleset) = ruleset {
603 virtual_net::host::LocalNetworking::with_ruleset(ruleset)
604 } else {
605 virtual_net::host::LocalNetworking::default()
606 };
607
608 if has_networking {
609 rt.set_networking_implementation(network);
610 } else {
611 let net = super::capabilities::net::AskingNetworking::new(
612 pkg_cache_path.to_path_buf(),
613 Arc::new(network),
614 );
615
616 rt.set_networking_implementation(net);
617 }
618
619 #[cfg(feature = "journal")]
620 {
621 let (r, w) = self.build_journals()?;
622 for journal in r {
623 rt.add_read_only_journal(journal);
624 }
625 for journal in w {
626 rt.add_writable_journal(journal);
627 }
628 }
629
630 if !self.no_tty {
631 let tty = Arc::new(SysTty);
632 tty.reset();
633 rt.set_tty(tty);
634 }
635
636 let client =
637 wasmer_wasix::http::default_http_client().context("No HTTP client available")?;
638 let client = Arc::new(client);
639
640 let package_loader = self
641 .prepare_package_loader(env, client.clone())
642 .context("Unable to prepare the package loader")?;
643
644 let registry = self.prepare_source(env, client, preferred_webc_version)?;
645
646 if !self.disable_cache && !compiler_debug_dir_used {
647 let cache_dir = env.cache_dir().join("compiled");
648 let module_cache = wasmer_wasix::runtime::module_cache::in_memory()
649 .with_fallback(FileSystemCache::new(cache_dir, tokio_task_manager));
650 rt.set_module_cache(module_cache);
651 }
652
653 rt.set_package_loader(package_loader)
654 .set_source(registry)
655 .set_engine(engine);
656
657 Ok(rt)
658 }
659
660 pub fn instantiate(
662 &self,
663 module: &Module,
664 module_hash: ModuleHash,
665 program_name: String,
666 args: Vec<String>,
667 runtime: Arc<dyn Runtime + Send + Sync>,
668 store: &mut Store,
669 ) -> Result<(WasiFunctionEnv, Instance)> {
670 let builder = self.prepare(module, program_name, args, runtime)?;
671 let (instance, wasi_env) = builder.instantiate_ext(module.clone(), module_hash, store)?;
672
673 Ok((wasi_env, instance))
674 }
675
676 pub fn for_binfmt_interpreter() -> Result<Self> {
677 let dir = std::env::var_os("WASMER_BINFMT_MISC_PREOPEN")
678 .map(Into::into)
679 .unwrap_or_else(|| PathBuf::from("."));
680 Ok(Self {
681 deny_multiple_wasi_versions: true,
682 env_vars: std::env::vars().collect(),
683 volumes: vec![MappedDirectory {
684 host: dir.clone(),
685 guest: dir
686 .to_str()
687 .expect("dir must be a valid string")
688 .to_string(),
689 }],
690 ..Self::default()
691 })
692 }
693
694 fn prepare_package_loader(
695 &self,
696 env: &WasmerEnv,
697 client: Arc<dyn HttpClient + Send + Sync>,
698 ) -> Result<BuiltinPackageLoader> {
699 let checkout_dir = env.cache_dir().join("checkouts");
700 let tokens = tokens_by_authority(env)?;
701
702 let loader = BuiltinPackageLoader::new()
703 .with_cache_dir(checkout_dir)
704 .with_shared_http_client(client)
705 .with_tokens(tokens);
706
707 Ok(loader)
708 }
709
710 fn prepare_source(
711 &self,
712 env: &WasmerEnv,
713 client: Arc<dyn HttpClient + Send + Sync>,
714 preferred_webc_version: webc::Version,
715 ) -> Result<MultiSource> {
716 let mut source = MultiSource::default();
717
718 let mut preloaded = InMemorySource::new();
721 for path in &self.include_webcs {
722 preloaded
723 .add_webc(path)
724 .with_context(|| format!("Unable to load \"{}\"", path.display()))?;
725 }
726 source.add_source(preloaded);
727
728 let graphql_endpoint = self.graphql_endpoint(env)?;
729 let cache_dir = env
730 .cache_dir()
731 .join("queries")
732 .join(endpoint_to_folder(&graphql_endpoint));
733 let mut wapm_source = BackendSource::new(graphql_endpoint, Arc::clone(&client))
734 .with_local_cache(cache_dir, WAPM_SOURCE_CACHE_TIMEOUT)
735 .with_preferred_webc_version(preferred_webc_version);
736 if let Some(token) = env
737 .config()?
738 .registry
739 .get_login_token_for_registry(wapm_source.registry_endpoint().as_str())
740 {
741 wapm_source = wapm_source.with_auth_token(token);
742 }
743 source.add_source(wapm_source);
744
745 let cache_dir = env.cache_dir().join("downloads");
746 source.add_source(WebSource::new(cache_dir, client));
747
748 source.add_source(FileSystemSource::default());
749
750 Ok(source)
751 }
752
753 fn graphql_endpoint(&self, env: &WasmerEnv) -> Result<Url> {
754 if let Ok(endpoint) = env.registry_endpoint() {
755 return Ok(endpoint);
756 }
757
758 let config = env.config()?;
759 let graphql_endpoint = config.registry.get_graphql_url();
760 let graphql_endpoint = graphql_endpoint
761 .parse()
762 .with_context(|| format!("Unable to parse \"{graphql_endpoint}\" as a URL"))?;
763
764 Ok(graphql_endpoint)
765 }
766}
767
768fn parse_registry(r: &str) -> Result<Url> {
769 UserRegistry::from(r).graphql_endpoint()
770}
771
772fn tokens_by_authority(env: &WasmerEnv) -> Result<HashMap<String, String>> {
773 let mut tokens = HashMap::new();
774 let config = env.config()?;
775
776 for credentials in config.registry.tokens {
777 if let Ok(url) = Url::parse(&credentials.registry)
778 && url.has_authority()
779 {
780 tokens.insert(url.authority().to_string(), credentials.token);
781 }
782 }
783
784 if let (Ok(current_registry), Some(token)) = (env.registry_endpoint(), env.token())
785 && current_registry.has_authority()
786 {
787 tokens.insert(current_registry.authority().to_string(), token);
788 }
789
790 let mut frontend_tokens = HashMap::new();
803 for (hostname, token) in &tokens {
804 if let Some(frontend_url) = hostname.strip_prefix("registry.")
805 && !tokens.contains_key(frontend_url)
806 {
807 frontend_tokens.insert(frontend_url.to_string(), token.clone());
808 }
809 }
810 tokens.extend(frontend_tokens);
811
812 Ok(tokens)
813}