1#![allow(unused_imports)]
2#![allow(dead_code)]
3
4pub mod cconst;
5
6use std::{
7 borrow::Cow,
8 collections::HashMap,
9 io::Write,
10 ops::{Deref, DerefMut},
11 path::{Path, PathBuf},
12 sync::{Arc, Mutex, atomic::AtomicBool},
13};
14
15use futures::future::Either;
16use linked_hash_set::LinkedHashSet;
17use tokio::sync::{RwLock, mpsc};
18#[allow(unused_imports, dead_code)]
19use tracing::{debug, error, info, trace, warn};
20use virtual_fs::{
21 ArcBoxFile, ArcFile, AsyncWriteExt, CombineFile, DeviceFile, DuplexPipe, FileSystem, Pipe,
22 PipeRx, PipeTx, RootFileSystemBuilder, StaticFile, VirtualFile,
23};
24#[cfg(feature = "sys")]
25use wasmer::Engine;
26use wasmer_config::package::PackageSource;
27use wasmer_wasix_types::{types::__WASI_STDIN_FILENO, wasi::Errno};
28
29use super::{cconst::ConsoleConst, common::*, task::TaskJoinHandle};
30use crate::{
31 Runtime, SpawnError, WasiEnv, WasiEnvBuilder, WasiRuntimeError,
32 bin_factory::{BinFactory, BinaryPackage, spawn_exec},
33 capabilities::Capabilities,
34 os::task::{control_plane::WasiControlPlane, process::WasiProcess},
35 runners::wasi::{PackageOrHash, RuntimeOrEngine},
36 runtime::task_manager::InlineWaker,
37};
38
39#[derive(Debug)]
40pub struct Console {
41 user_agent: Option<String>,
42 boot_cmd: String,
43 uses: LinkedHashSet<String>,
44 is_mobile: bool,
45 is_ssh: bool,
46 whitelabel: bool,
47 token: Option<String>,
48 no_welcome: bool,
49 prompt: String,
50 env: HashMap<String, String>,
51 runtime: Arc<dyn Runtime + Send + Sync>,
52 stdin: ArcBoxFile,
53 stdout: ArcBoxFile,
54 stderr: ArcBoxFile,
55 capabilities: Capabilities,
56 ro_files: HashMap<String, Cow<'static, [u8]>>,
57 memfs_memory_limiter: Option<virtual_fs::limiter::DynFsMemoryLimiter>,
58}
59
60impl Console {
61 pub fn new(webc_boot_package: &str, runtime: Arc<dyn Runtime + Send + Sync + 'static>) -> Self {
62 Self {
63 boot_cmd: webc_boot_package.to_string(),
64 uses: LinkedHashSet::new(),
65 is_mobile: false,
66 is_ssh: false,
67 user_agent: None,
68 whitelabel: false,
69 token: None,
70 no_welcome: false,
71 env: HashMap::new(),
72 runtime,
73 prompt: "wasmer.sh".to_string(),
74 stdin: ArcBoxFile::new(Box::new(Pipe::channel().0)),
75 stdout: ArcBoxFile::new(Box::new(Pipe::channel().0)),
76 stderr: ArcBoxFile::new(Box::new(Pipe::channel().0)),
77 capabilities: Default::default(),
78 memfs_memory_limiter: None,
79 ro_files: Default::default(),
80 }
81 }
82
83 pub fn with_prompt(mut self, prompt: String) -> Self {
84 self.prompt = prompt;
85 self
86 }
87
88 pub fn with_boot_cmd(mut self, cmd: String) -> Self {
89 let prog = cmd.split_once(' ').map(|a| a.0).unwrap_or(cmd.as_str());
90 self.uses.insert(prog.to_string());
91 self.boot_cmd = cmd;
92 self
93 }
94
95 pub fn with_uses(mut self, uses: Vec<String>) -> Self {
96 self.uses = uses.into_iter().collect();
97 self
98 }
99
100 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
101 self.env = env;
102 self
103 }
104
105 pub fn with_user_agent(mut self, user_agent: &str) -> Self {
106 self.is_mobile = is_mobile(user_agent);
107 self.is_ssh = is_ssh(user_agent);
108 self.user_agent = Some(user_agent.to_string());
109 self
110 }
111
112 pub fn with_no_welcome(mut self, no_welcome: bool) -> Self {
113 self.no_welcome = no_welcome;
114 self
115 }
116
117 pub fn with_token(mut self, token: String) -> Self {
118 self.token = Some(token);
119 self
120 }
121
122 pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
123 self.capabilities = caps;
124 self
125 }
126
127 pub fn with_stdin(mut self, stdin: Box<dyn VirtualFile + Send + Sync + 'static>) -> Self {
128 self.stdin = ArcBoxFile::new(stdin);
129 self
130 }
131
132 pub fn with_stdout(mut self, stdout: Box<dyn VirtualFile + Send + Sync + 'static>) -> Self {
133 self.stdout = ArcBoxFile::new(stdout);
134 self
135 }
136
137 pub fn with_stderr(mut self, stderr: Box<dyn VirtualFile + Send + Sync + 'static>) -> Self {
138 self.stderr = ArcBoxFile::new(stderr);
139 self
140 }
141
142 pub fn with_ro_files(mut self, ro_files: HashMap<String, Cow<'static, [u8]>>) -> Self {
143 self.ro_files = ro_files;
144 self
145 }
146
147 pub fn with_mem_fs_memory_limiter(
148 mut self,
149 limiter: virtual_fs::limiter::DynFsMemoryLimiter,
150 ) -> Self {
151 self.memfs_memory_limiter = Some(limiter);
152 self
153 }
154
155 pub fn run(&mut self) -> Result<(TaskJoinHandle, WasiProcess), SpawnError> {
156 let empty_args: Vec<&str> = Vec::new();
158 let (webc, prog, args) = match self.boot_cmd.split_once(' ') {
159 Some((webc, args)) => (
160 webc,
161 webc.split_once('/').map(|a| a.1).unwrap_or(webc),
162 args.split(' ').collect::<Vec<_>>(),
163 ),
164 None => (
165 self.boot_cmd.as_str(),
166 self.boot_cmd
167 .split_once('/')
168 .map(|a| a.1)
169 .unwrap_or(self.boot_cmd.as_str()),
170 empty_args,
171 ),
172 };
173
174 let webc_ident: PackageSource = match webc.parse() {
175 Ok(ident) => ident,
176 Err(e) => {
177 tracing::debug!(webc, error = ?e, "Unable to parse the WEBC identifier");
178 return Err(SpawnError::BadRequest);
179 }
180 };
181
182 let resolved_package = InlineWaker::block_on(BinaryPackage::from_registry(
183 &webc_ident,
184 self.runtime.as_ref(),
185 ));
186
187 let pkg = match resolved_package {
188 Ok(pkg) => pkg,
189 Err(e) => {
190 let mut stderr = self.stderr.clone();
191 InlineWaker::block_on(async {
192 let mut buffer = Vec::new();
193 writeln!(buffer, "Error: {e}").ok();
194 let mut source = e.source();
195 while let Some(s) = source {
196 writeln!(buffer, " Caused by: {s}").ok();
197 source = s.source();
198 }
199
200 virtual_fs::AsyncWriteExt::write_all(&mut stderr, &buffer)
201 .await
202 .ok();
203 });
204 tracing::debug!(error=?e, %webc, "failed to get webc dependency");
205 return Err(SpawnError::NotFound {
206 message: e.to_string(),
207 });
208 }
209 };
210
211 let wasi_opts = webc::metadata::annotations::Wasi::new(prog);
212
213 let root_fs = RootFileSystemBuilder::new()
214 .with_tty(Box::new(CombineFile::new(
215 Box::new(self.stdout.clone()),
216 Box::new(self.stdin.clone()),
217 )))
218 .build();
219
220 if let Some(limiter) = &self.memfs_memory_limiter {
221 root_fs.set_memory_limiter(limiter.clone());
222 }
223
224 let builder = crate::runners::wasi::WasiRunner::new()
225 .with_envs(self.env.clone().into_iter())
226 .with_args(args)
227 .with_capabilities(self.capabilities.clone())
228 .with_stdin(Box::new(self.stdin.clone()))
229 .with_stdout(Box::new(self.stdout.clone()))
230 .with_stderr(Box::new(self.stderr.clone()))
231 .prepare_webc_env(
232 prog,
233 &wasi_opts,
234 PackageOrHash::Package(&pkg),
235 RuntimeOrEngine::Runtime(self.runtime.clone()),
236 Some(root_fs),
237 )
238 .map_err(|err| SpawnError::Other(err.into()))?;
240
241 let env = builder.build()?;
242
243 if !self.whitelabel && !self.no_welcome {
245 InlineWaker::block_on(self.draw_welcome());
246 }
247
248 let wasi_process = env.process.clone();
249
250 if let Err(err) = env.uses(self.uses.clone()) {
251 let mut stderr = self.stderr.clone();
252 InlineWaker::block_on(async {
253 virtual_fs::AsyncWriteExt::write_all(&mut stderr, format!("{err}\r\n").as_bytes())
254 .await
255 .ok();
256 });
257 tracing::debug!("failed to load used dependency - {}", err);
258 return Err(SpawnError::BadRequest);
259 }
260
261 for (path, data) in self.ro_files.clone() {
264 let path = PathBuf::from(path);
265 env.fs_root().remove_file(&path).ok();
266 let mut file = env
267 .fs_root()
268 .new_open_options()
269 .create(true)
270 .truncate(true)
271 .write(true)
272 .open(&path)
273 .map_err(|err| SpawnError::Other(err.into()))?;
274 InlineWaker::block_on(file.copy_reference(Box::new(StaticFile::new(data))))
275 .map_err(|err| SpawnError::Other(err.into()))?;
276 }
277
278 let process = InlineWaker::block_on(spawn_exec(pkg, prog, env, &self.runtime))?;
281
282 Ok((process, wasi_process))
284 }
285
286 pub async fn draw_welcome(&self) {
287 let welcome = match (self.is_mobile, self.is_ssh) {
288 (true, _) => ConsoleConst::WELCOME_MEDIUM,
289 (_, true) => ConsoleConst::WELCOME_SMALL,
290 (_, _) => ConsoleConst::WELCOME_LARGE,
291 };
292 let mut data = welcome
293 .replace("\\x1B", "\x1B")
294 .replace("\\r", "\r")
295 .replace("\\n", "\n");
296 data.insert_str(0, ConsoleConst::TERM_NO_WRAPAROUND);
297
298 let mut stderr = self.stderr.clone();
299 virtual_fs::AsyncWriteExt::write_all(&mut stderr, data.as_bytes())
300 .await
301 .ok();
302 }
303}
304
305#[cfg(all(test, not(target_family = "wasm")))]
306mod tests {
307 use virtual_fs::{AsyncSeekExt, BufferFile, Pipe};
308
309 use super::*;
310
311 use std::{io::Read, sync::Arc};
312
313 use crate::{
314 PluggableRuntime,
315 runtime::{package_loader::BuiltinPackageLoader, task_manager::tokio::TokioTaskManager},
316 };
317
318 #[test]
327 #[cfg_attr(not(feature = "host-reqwest"), ignore = "Requires a HTTP client")]
328 #[ignore = "Unconditionally aborts (CC #4284)"]
329 fn test_console_dash_tty_with_args_and_env() {
330 let tokio_rt = tokio::runtime::Runtime::new().unwrap();
331 let rt_handle = tokio_rt.handle().clone();
332 let _guard = rt_handle.enter();
333
334 let tm = TokioTaskManager::new(tokio_rt);
335 let mut rt = PluggableRuntime::new(Arc::new(tm));
336 let client = rt.http_client().unwrap().clone();
337 rt.set_package_loader(BuiltinPackageLoader::new().with_shared_http_client(client));
338
339 let env: HashMap<String, String> = [("MYENV1".to_string(), "VAL1".to_string())]
340 .into_iter()
341 .collect();
342
343 let cmd = "sharrattj/dash -s stdin";
345
346 let (mut stdin_tx, stdin_rx) = Pipe::channel();
347 let (stdout_tx, mut stdout_rx) = Pipe::channel();
348
349 let (mut handle, _proc) = Console::new(cmd, Arc::new(rt))
350 .with_env(env)
351 .with_stdin(Box::new(stdin_rx))
352 .with_stdout(Box::new(stdout_tx))
353 .run()
354 .unwrap();
355
356 let code = rt_handle
357 .block_on(async move {
358 virtual_fs::AsyncWriteExt::write_all(
359 &mut stdin_tx,
360 b"echo hello $MYENV1 > /dev/tty; exit\n",
361 )
362 .await?;
363
364 std::mem::drop(stdin_tx);
365
366 let res = handle.wait_finished().await?;
367 Ok::<_, anyhow::Error>(res)
368 })
369 .unwrap();
370
371 assert_eq!(code.raw(), 78);
372
373 let mut out = String::new();
374 stdout_rx.read_to_string(&mut out).unwrap();
375
376 assert_eq!(out, "hello VAL1\n");
377 }
378
379 #[test]
381 #[ignore = "must be re-enabled after backend is deployed"]
382 fn test_console_python_merge() {
383 let tokio_rt = tokio::runtime::Runtime::new().unwrap();
384 let rt_handle = tokio_rt.handle().clone();
385 let _guard = rt_handle.enter();
386
387 let tm = TokioTaskManager::new(tokio_rt);
388 let mut rt = PluggableRuntime::new(Arc::new(tm));
389 let client = rt.http_client().unwrap().clone();
390 rt.set_package_loader(BuiltinPackageLoader::new().with_shared_http_client(client));
391
392 let cmd = "wasmer-tests/python-env-dump --help";
393
394 let (mut handle, _proc) = Console::new(cmd, Arc::new(rt)).run().unwrap();
395
396 let code = rt_handle
397 .block_on(async move {
398 let res = handle.wait_finished().await?;
399 Ok::<_, anyhow::Error>(res)
400 })
401 .unwrap();
402
403 assert_eq!(code.raw(), 0);
404 }
405}