1use std::{
2 collections::{BTreeSet, HashMap},
3 path::{Path, PathBuf},
4 str::FromStr,
5 sync::{Arc, mpsc::Sender},
6 time::Duration,
7};
8
9use anyhow::{Context, Result, bail};
10use bytes::Bytes;
11use clap::Parser;
12use itertools::Itertools;
13use tokio::runtime::Handle;
14use url::Url;
15use virtual_fs::{DeviceFile, FileSystem, PassthruFileSystem, RootFileSystemBuilder};
16use virtual_net::ruleset::Ruleset;
17use wasmer::{Engine, Function, Instance, Memory32, Memory64, Module, RuntimeError, Store, Value};
18use wasmer_config::package::PackageSource as PackageSpecifier;
19use wasmer_types::ModuleHash;
20#[cfg(feature = "journal")]
21use wasmer_wasix::journal::{LogFileJournal, SnapshotTrigger};
22use wasmer_wasix::{
23 PluggableRuntime, RewindState, Runtime, WasiEnv, WasiEnvBuilder, WasiError, WasiFunctionEnv,
24 WasiVersion,
25 bin_factory::BinaryPackage,
26 capabilities::Capabilities,
27 default_fs_backing, get_wasi_versions,
28 http::HttpClient,
29 journal::{CompactingLogFileJournal, DynJournal, DynReadableJournal},
30 os::{TtyBridge, tty_sys::SysTty},
31 rewind_ext,
32 runners::MAPPED_CURRENT_DIR_DEFAULT_PATH,
33 runners::{MappedCommand, MappedDirectory},
34 runtime::{
35 module_cache::{FileSystemCache, ModuleCache},
36 package_loader::{BuiltinPackageLoader, PackageLoader},
37 resolver::{
38 BackendSource, FileSystemSource, InMemorySource, MultiSource, Source, WebSource,
39 },
40 task_manager::{
41 VirtualTaskManagerExt,
42 tokio::{RuntimeOrHandle, TokioTaskManager},
43 },
44 },
45 types::__WASI_STDIN_FILENO,
46 wasmer_wasix_types::wasi::Errno,
47};
48
49use crate::{
50 config::{UserRegistry, WasmerEnv},
51 utils::{parse_envvar, parse_mapdir, parse_volume},
52};
53
54use super::{
55 CliPackageSource, ExecutableTarget,
56 capabilities::{self, PkgCapabilityCache},
57};
58
59const WAPM_SOURCE_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
60
61#[derive(Debug, Parser, Clone, Default)]
62pub struct Wasi {
64 #[clap(
66 long = "volume",
67 name = "[HOST_DIR:]GUEST_DIR",
68 value_parser = parse_volume,
69 )]
70 volumes: Vec<MappedDirectory>,
71
72 #[clap(long = "dir", group = "wasi", hide = true)]
74 pub(crate) pre_opened_directories: Vec<PathBuf>,
75
76 #[clap(
78 long = "mapdir",
79 value_parser = parse_mapdir,
80 hide = true
81 )]
82 pub(crate) mapped_dirs: Vec<MappedDirectory>,
83
84 #[clap(long = "cwd")]
87 pub(crate) cwd: Option<PathBuf>,
88
89 #[clap(
91 long = "env",
92 name = "KEY=VALUE",
93 value_parser=parse_envvar,
94 )]
95 pub(crate) env_vars: Vec<(String, String)>,
96
97 #[clap(long, env)]
99 pub(crate) forward_host_env: bool,
100
101 #[clap(long = "use", name = "USE")]
103 pub(crate) uses: Vec<String>,
104
105 #[clap(long = "include-webc", name = "WEBC")]
108 pub(super) include_webcs: Vec<PathBuf>,
109
110 #[clap(long = "map-command", name = "MAPCMD")]
112 pub(super) map_commands: Vec<String>,
113
114 #[clap(long = "net", require_equals = true)]
133 pub networking: Option<Option<String>>,
136
137 #[clap(long = "no-tty")]
139 pub no_tty: bool,
140
141 #[clap(long = "enable-async-threads")]
143 pub enable_async_threads: bool,
144
145 #[clap(long = "enable-cpu-backoff")]
150 pub enable_cpu_backoff: Option<u64>,
151
152 #[cfg(feature = "journal")]
158 #[clap(long = "journal")]
159 pub read_only_journals: Vec<PathBuf>,
160
161 #[cfg(feature = "journal")]
171 #[clap(long = "journal-writable")]
172 pub writable_journals: Vec<PathBuf>,
173
174 #[cfg(feature = "journal")]
177 #[clap(long = "enable-compaction")]
178 pub enable_compaction: bool,
179
180 #[cfg(feature = "journal")]
182 #[clap(long = "without-compact-on-drop")]
183 pub without_compact_on_drop: bool,
184
185 #[cfg(feature = "journal")]
191 #[clap(long = "with-compact-on-growth", default_value = "0.15")]
192 pub with_compact_on_growth: f32,
193
194 #[cfg(feature = "journal")]
205 #[clap(long = "snapshot-on")]
206 pub snapshot_on: Vec<SnapshotTrigger>,
207
208 #[cfg(feature = "journal")]
212 #[clap(long = "snapshot-period")]
213 pub snapshot_interval: Option<u64>,
214
215 #[cfg(feature = "journal")]
218 #[clap(long = "stop-after-snapshot")]
219 pub stop_after_snapshot: bool,
220
221 #[cfg(feature = "journal")]
223 #[clap(long = "skip-journal-stdio")]
224 pub skip_stdio_during_bootstrap: bool,
225
226 #[clap(long)]
230 pub http_client: bool,
231
232 #[clap(long = "deny-multiple-wasi-versions")]
234 pub deny_multiple_wasi_versions: bool,
235
236 #[clap(long = "disable-cache")]
241 disable_cache: bool,
242}
243
244pub struct RunProperties {
245 pub ctx: WasiFunctionEnv,
246 pub path: PathBuf,
247 pub invoke: Option<String>,
248 pub args: Vec<String>,
249}
250
251fn endpoint_to_folder(url: &Url) -> String {
252 url.to_string()
253 .replace("registry.wasmer.io", "wasmer.io")
254 .replace("registry.wasmer.wtf", "wasmer.wtf")
255 .replace(|c| "/:?&=#%\\".contains(c), "_")
256}
257
258#[allow(dead_code)]
259impl Wasi {
260 pub fn map_dir(&mut self, alias: &str, target_on_disk: PathBuf) {
261 self.volumes.push(MappedDirectory {
262 guest: alias.to_string(),
263 host: target_on_disk,
264 });
265 }
266
267 pub fn set_env(&mut self, key: &str, value: &str) {
268 self.env_vars.push((key.to_string(), value.to_string()));
269 }
270
271 pub fn get_versions(module: &Module) -> Option<BTreeSet<WasiVersion>> {
273 get_wasi_versions(module, false)
278 }
279
280 pub fn has_wasi_imports(module: &Module) -> bool {
282 get_wasi_versions(module, false).is_some()
285 }
286
287 pub(crate) fn all_volumes(&self) -> Vec<MappedDirectory> {
288 self.volumes
289 .iter()
290 .cloned()
291 .chain(self.pre_opened_directories.iter().map(|d| MappedDirectory {
292 host: d.clone(),
293 guest: d.to_str().expect("must be a valid path string").to_string(),
294 }))
295 .chain(self.mapped_dirs.iter().cloned())
296 .collect_vec()
297 }
298
299 pub fn prepare(
300 &self,
301 module: &Module,
302 program_name: String,
303 args: Vec<String>,
304 rt: Arc<dyn Runtime + Send + Sync>,
305 ) -> Result<WasiEnvBuilder> {
306 let args = args.into_iter().map(|arg| arg.into_bytes());
307
308 let map_commands = self
309 .map_commands
310 .iter()
311 .map(|map| map.split_once('=').unwrap())
312 .map(|(a, b)| (a.to_string(), b.to_string()))
313 .collect::<HashMap<_, _>>();
314
315 let mut uses = Vec::new();
316 for name in &self.uses {
317 let specifier = PackageSpecifier::from_str(name)
318 .with_context(|| format!("Unable to parse \"{name}\" as a package specifier"))?;
319 let pkg = {
320 let inner_rt = rt.clone();
321 rt.task_manager()
322 .spawn_and_block_on(async move {
323 BinaryPackage::from_registry(&specifier, &*inner_rt).await
324 })
325 .with_context(|| format!("Unable to load \"{name}\""))??
326 };
327 uses.push(pkg);
328 }
329
330 let mut builder = WasiEnv::builder(program_name)
331 .runtime(Arc::clone(&rt))
332 .args(args)
333 .envs(self.env_vars.clone())
334 .uses(uses)
335 .map_commands(map_commands);
336
337 let mut builder = {
338 let root_fs = RootFileSystemBuilder::new()
340 .with_tty(Box::new(DeviceFile::new(__WASI_STDIN_FILENO)))
341 .build();
342
343 let (have_current_dir, mut mapped_dirs) = self.build_mapped_directories(false)?;
344 if !mapped_dirs.is_empty() {
345 let fs_backing: Arc<dyn FileSystem + Send + Sync> =
347 Arc::new(PassthruFileSystem::new_arc(default_fs_backing()));
348 for MappedDirectory { host, guest } in self.all_volumes() {
349 let host = if !host.is_absolute() {
350 Path::new("/").join(host)
351 } else {
352 host
353 };
354 root_fs.mount(guest.into(), &fs_backing, host)?;
355 }
356 }
357
358 if let Some(cwd) = self.cwd.as_ref() {
359 if !cwd.starts_with("/") {
360 bail!("The argument to --cwd must be an absolute path");
361 }
362 builder = builder.current_dir(cwd.clone());
363 }
364
365 builder = builder
367 .sandbox_fs(root_fs)
368 .preopen_dir(Path::new("/"))
369 .unwrap();
370
371 if have_current_dir {
372 builder.map_dir(".", MAPPED_CURRENT_DIR_DEFAULT_PATH)?
373 } else {
374 builder.map_dir(".", "/")?
375 }
376 };
377
378 *builder.capabilities_mut() = self.capabilities();
379
380 #[cfg(feature = "journal")]
381 {
382 for trigger in self.snapshot_on.iter().cloned() {
383 builder.add_snapshot_trigger(trigger);
384 }
385 if let Some(interval) = self.snapshot_interval {
386 builder.with_snapshot_interval(std::time::Duration::from_millis(interval));
387 }
388 if self.stop_after_snapshot {
389 builder.with_stop_running_after_snapshot(true);
390 }
391 let (r, w) = self.build_journals()?;
392 for journal in r {
393 builder.add_read_only_journal(journal);
394 }
395 for journal in w {
396 builder.add_writable_journal(journal);
397 }
398 builder.with_skip_stdio_during_bootstrap(self.skip_stdio_during_bootstrap);
399 }
400
401 Ok(builder)
402 }
403
404 #[cfg(feature = "journal")]
405 #[allow(clippy::type_complexity)]
406 pub fn build_journals(
407 &self,
408 ) -> anyhow::Result<(Vec<Arc<DynReadableJournal>>, Vec<Arc<DynJournal>>)> {
409 let mut readable = Vec::new();
410 for journal in self.read_only_journals.clone() {
411 if matches!(std::fs::metadata(&journal), Err(e) if e.kind() == std::io::ErrorKind::NotFound)
412 {
413 bail!("Read-only journal file does not exist: {journal:?}");
414 }
415
416 readable
417 .push(Arc::new(LogFileJournal::new_readonly(journal)?) as Arc<DynReadableJournal>);
418 }
419
420 let mut writable = Vec::new();
421 for journal in self.writable_journals.clone() {
422 if self.enable_compaction {
423 let mut journal = CompactingLogFileJournal::new(journal)?;
424 if !self.without_compact_on_drop {
425 journal = journal.with_compact_on_drop()
426 }
427 if self.with_compact_on_growth.is_normal() && self.with_compact_on_growth != 0f32 {
428 journal = journal.with_compact_on_factor_size(self.with_compact_on_growth);
429 }
430 writable.push(Arc::new(journal) as Arc<DynJournal>);
431 } else {
432 writable.push(Arc::new(LogFileJournal::new(journal)?));
433 }
434 }
435 Ok((readable, writable))
436 }
437
438 #[cfg(not(feature = "journal"))]
439 pub fn build_journals(&self) -> anyhow::Result<Vec<Arc<DynJournal>>> {
440 Ok(Vec::new())
441 }
442
443 pub fn build_mapped_directories(
444 &self,
445 is_wasix: bool,
446 ) -> Result<(bool, Vec<MappedDirectory>), anyhow::Error> {
447 let mut mapped_dirs = Vec::new();
448
449 let mut have_current_dir = false;
451 for MappedDirectory { host, guest } in &self.all_volumes() {
452 let resolved_host = host.canonicalize().with_context(|| {
453 format!(
454 "could not canonicalize path for argument '--volume {}:{}'",
455 host.display(),
456 guest,
457 )
458 })?;
459
460 if guest == "/" && is_wasix {
461 tracing::warn!(
464 "Mounting on the guest's virtual root with --volume <HOST_DIR>:/ breaks WASIX modules' filesystems"
465 );
466 }
467
468 let mapping = if guest == "." {
469 if have_current_dir {
470 bail!(
471 "Cannot pre-open the current directory twice: '--volume=.' must only be specified once"
472 );
473 }
474 have_current_dir = true;
475
476 let host = if host == Path::new(".") {
477 std::env::current_dir().context("could not determine current directory")?
478 } else {
479 host.clone()
480 };
481 MappedDirectory {
482 host: resolved_host,
483 guest: if is_wasix {
484 MAPPED_CURRENT_DIR_DEFAULT_PATH.to_string()
485 } else {
486 "/".to_string()
487 },
488 }
489 } else {
490 MappedDirectory {
491 host: resolved_host,
492 guest: guest.clone(),
493 }
494 };
495 mapped_dirs.push(mapping);
496 }
497
498 Ok((have_current_dir, mapped_dirs))
499 }
500
501 pub fn build_mapped_commands(&self) -> Result<Vec<MappedCommand>, anyhow::Error> {
502 self.map_commands
503 .iter()
504 .map(|item| {
505 let (a, b) = item.split_once('=').with_context(|| {
506 format!(
507 "Invalid --map-command flag: expected <ALIAS>=<HOST_PATH>, got '{item}'"
508 )
509 })?;
510
511 let a = a.trim();
512 let b = b.trim();
513
514 if a.is_empty() {
515 bail!("Invalid --map-command flag - alias cannot be empty: '{item}'");
516 }
517 if b.is_empty() {
519 bail!("Invalid --map-command flag - host path cannot be empty: '{item}'");
520 }
521
522 Ok(MappedCommand {
523 alias: a.to_string(),
524 target: b.to_string(),
525 })
526 })
527 .collect::<Result<Vec<_>, anyhow::Error>>()
528 }
529
530 pub fn capabilities(&self) -> Capabilities {
531 let mut caps = Capabilities::default();
532
533 if self.http_client {
534 caps.http_client = wasmer_wasix::http::HttpClientCapabilityV1::new_allow_all();
535 }
536
537 caps.threading.enable_asynchronous_threading = self.enable_async_threads;
538 caps.threading.enable_exponential_cpu_backoff =
539 self.enable_cpu_backoff.map(Duration::from_millis);
540
541 caps
542 }
543
544 pub fn prepare_runtime<I>(
545 &self,
546 engine: Engine,
547 env: &WasmerEnv,
548 pkg_cache_path: &Path,
549 rt_or_handle: I,
550 preferred_webc_version: webc::Version,
551 ) -> Result<impl Runtime + Send + Sync + use<I>>
552 where
553 I: Into<RuntimeOrHandle>,
554 {
555 let tokio_task_manager = Arc::new(TokioTaskManager::new(rt_or_handle.into()));
556 let mut rt = PluggableRuntime::new(tokio_task_manager.clone());
557
558 let has_networking = self.networking.is_some()
559 || capabilities::get_cached_capability(pkg_cache_path)
560 .ok()
561 .is_some_and(|v| v.enable_networking);
562
563 let ruleset = self
564 .networking
565 .clone()
566 .flatten()
567 .map(|ruleset| Ruleset::from_str(&ruleset))
568 .transpose()?;
569
570 let network = if let Some(ruleset) = ruleset {
571 virtual_net::host::LocalNetworking::with_ruleset(ruleset)
572 } else {
573 virtual_net::host::LocalNetworking::default()
574 };
575
576 if has_networking {
577 rt.set_networking_implementation(network);
578 } else {
579 let net = super::capabilities::net::AskingNetworking::new(
580 pkg_cache_path.to_path_buf(),
581 Arc::new(network),
582 );
583
584 rt.set_networking_implementation(net);
585 }
586
587 #[cfg(feature = "journal")]
588 {
589 let (r, w) = self.build_journals()?;
590 for journal in r {
591 rt.add_read_only_journal(journal);
592 }
593 for journal in w {
594 rt.add_writable_journal(journal);
595 }
596 }
597
598 if !self.no_tty {
599 let tty = Arc::new(SysTty);
600 tty.reset();
601 rt.set_tty(tty);
602 }
603
604 let client =
605 wasmer_wasix::http::default_http_client().context("No HTTP client available")?;
606 let client = Arc::new(client);
607
608 let package_loader = self
609 .prepare_package_loader(env, client.clone())
610 .context("Unable to prepare the package loader")?;
611
612 let registry = self.prepare_source(env, client, preferred_webc_version)?;
613
614 if !self.disable_cache {
615 let cache_dir = env.cache_dir().join("compiled");
616 let module_cache = wasmer_wasix::runtime::module_cache::in_memory()
617 .with_fallback(FileSystemCache::new(cache_dir, tokio_task_manager));
618 rt.set_module_cache(module_cache);
619 }
620
621 rt.set_package_loader(package_loader)
622 .set_source(registry)
623 .set_engine(engine);
624
625 Ok(rt)
626 }
627
628 pub fn instantiate(
630 &self,
631 module: &Module,
632 module_hash: ModuleHash,
633 program_name: String,
634 args: Vec<String>,
635 runtime: Arc<dyn Runtime + Send + Sync>,
636 store: &mut Store,
637 ) -> Result<(WasiFunctionEnv, Instance)> {
638 let builder = self.prepare(module, program_name, args, runtime)?;
639 let (instance, wasi_env) = builder.instantiate_ext(module.clone(), module_hash, store)?;
640
641 Ok((wasi_env, instance))
642 }
643
644 pub fn for_binfmt_interpreter() -> Result<Self> {
645 let dir = std::env::var_os("WASMER_BINFMT_MISC_PREOPEN")
646 .map(Into::into)
647 .unwrap_or_else(|| PathBuf::from("."));
648 Ok(Self {
649 deny_multiple_wasi_versions: true,
650 env_vars: std::env::vars().collect(),
651 volumes: vec![MappedDirectory {
652 host: dir.clone(),
653 guest: dir
654 .to_str()
655 .expect("dir must be a valid string")
656 .to_string(),
657 }],
658 ..Self::default()
659 })
660 }
661
662 fn prepare_package_loader(
663 &self,
664 env: &WasmerEnv,
665 client: Arc<dyn HttpClient + Send + Sync>,
666 ) -> Result<BuiltinPackageLoader> {
667 let checkout_dir = env.cache_dir().join("checkouts");
668 let tokens = tokens_by_authority(env)?;
669
670 let loader = BuiltinPackageLoader::new()
671 .with_cache_dir(checkout_dir)
672 .with_shared_http_client(client)
673 .with_tokens(tokens);
674
675 Ok(loader)
676 }
677
678 fn prepare_source(
679 &self,
680 env: &WasmerEnv,
681 client: Arc<dyn HttpClient + Send + Sync>,
682 preferred_webc_version: webc::Version,
683 ) -> Result<MultiSource> {
684 let mut source = MultiSource::default();
685
686 let mut preloaded = InMemorySource::new();
689 for path in &self.include_webcs {
690 preloaded
691 .add_webc(path)
692 .with_context(|| format!("Unable to load \"{}\"", path.display()))?;
693 }
694 source.add_source(preloaded);
695
696 let graphql_endpoint = self.graphql_endpoint(env)?;
697 let cache_dir = env
698 .cache_dir()
699 .join("queries")
700 .join(endpoint_to_folder(&graphql_endpoint));
701 let mut wapm_source = BackendSource::new(graphql_endpoint, Arc::clone(&client))
702 .with_local_cache(cache_dir, WAPM_SOURCE_CACHE_TIMEOUT)
703 .with_preferred_webc_version(preferred_webc_version);
704 if let Some(token) = env
705 .config()?
706 .registry
707 .get_login_token_for_registry(wapm_source.registry_endpoint().as_str())
708 {
709 wapm_source = wapm_source.with_auth_token(token);
710 }
711 source.add_source(wapm_source);
712
713 let cache_dir = env.cache_dir().join("downloads");
714 source.add_source(WebSource::new(cache_dir, client));
715
716 source.add_source(FileSystemSource::default());
717
718 Ok(source)
719 }
720
721 fn graphql_endpoint(&self, env: &WasmerEnv) -> Result<Url> {
722 if let Ok(endpoint) = env.registry_endpoint() {
723 return Ok(endpoint);
724 }
725
726 let config = env.config()?;
727 let graphql_endpoint = config.registry.get_graphql_url();
728 let graphql_endpoint = graphql_endpoint
729 .parse()
730 .with_context(|| format!("Unable to parse \"{graphql_endpoint}\" as a URL"))?;
731
732 Ok(graphql_endpoint)
733 }
734}
735
736fn parse_registry(r: &str) -> Result<Url> {
737 UserRegistry::from(r).graphql_endpoint()
738}
739
740fn tokens_by_authority(env: &WasmerEnv) -> Result<HashMap<String, String>> {
741 let mut tokens = HashMap::new();
742 let config = env.config()?;
743
744 for credentials in config.registry.tokens {
745 if let Ok(url) = Url::parse(&credentials.registry)
746 && url.has_authority()
747 {
748 tokens.insert(url.authority().to_string(), credentials.token);
749 }
750 }
751
752 if let (Ok(current_registry), Some(token)) = (env.registry_endpoint(), env.token())
753 && current_registry.has_authority()
754 {
755 tokens.insert(current_registry.authority().to_string(), token);
756 }
757
758 let mut frontend_tokens = HashMap::new();
771 for (hostname, token) in &tokens {
772 if let Some(frontend_url) = hostname.strip_prefix("registry.")
773 && !tokens.contains_key(frontend_url)
774 {
775 frontend_tokens.insert(frontend_url.to_string(), token.clone());
776 }
777 }
778 tokens.extend(frontend_tokens);
779
780 Ok(tokens)
781}