wasmer_wasix/bin_factory/
mod.rs

1#![allow(clippy::result_large_err)]
2use std::{
3    collections::HashMap,
4    future::Future,
5    ops::Deref,
6    path::Path,
7    pin::Pin,
8    sync::{Arc, RwLock},
9};
10
11use anyhow::Context;
12use shared_buffer::OwnedBuffer;
13use virtual_fs::{AsyncReadExt, FileSystem};
14use wasmer::FunctionEnvMut;
15use wasmer_package::utils::from_bytes;
16
17mod binary_package;
18mod exec;
19
20pub use self::{
21    binary_package::*,
22    exec::{
23        import_package_mounts, package_command_by_name, run_exec, spawn_exec, spawn_exec_module,
24        spawn_exec_wasm, spawn_load_module,
25    },
26};
27use crate::{
28    Runtime, SpawnError, WasiEnv,
29    os::{
30        command::{Commands, VirtualCommand},
31        task::TaskJoinHandle,
32    },
33    runtime::module_cache::HashedModuleData,
34};
35
36#[derive(Debug, Clone)]
37pub struct BinFactory {
38    pub(crate) commands: Commands,
39    runtime: Arc<dyn Runtime + Send + Sync + 'static>,
40    pub(crate) local: Arc<RwLock<HashMap<String, Option<Arc<BinaryPackage>>>>>,
41}
42
43impl BinFactory {
44    pub fn new(runtime: Arc<dyn Runtime + Send + Sync + 'static>) -> BinFactory {
45        BinFactory {
46            commands: Commands::new_with_builtins(runtime.clone()),
47            runtime,
48            local: Arc::new(RwLock::new(HashMap::new())),
49        }
50    }
51
52    pub fn runtime(&self) -> &(dyn Runtime + Send + Sync) {
53        self.runtime.deref()
54    }
55
56    /// Register a builtin command.
57    pub fn register_builtin_command<C>(&mut self, cmd: C)
58    where
59        C: VirtualCommand + Send + Sync + 'static,
60    {
61        self.commands.register_command(cmd);
62    }
63
64    /// Register a builtin command at a custom path.
65    pub fn register_builtin_command_with_path<C, P>(&mut self, cmd: C, path: P)
66    where
67        C: VirtualCommand + Send + Sync + 'static,
68        P: Into<String>,
69    {
70        self.commands.register_command_with_path(cmd, path.into());
71    }
72
73    /// Register a builtin command behind an [`Arc`] at a custom path.
74    pub(crate) fn register_builtin_command_with_path_shared<P>(
75        &mut self,
76        cmd: Arc<dyn VirtualCommand + Send + Sync + 'static>,
77        path: P,
78    ) where
79        P: Into<String>,
80    {
81        self.commands
82            .register_command_with_path_shared(cmd, path.into());
83    }
84
85    /// Remove all registered builtin commands.
86    pub fn clear_builtin_commands(&mut self) {
87        self.commands.clear();
88    }
89
90    pub fn set_binary(&self, name: &str, binary: &Arc<BinaryPackage>) {
91        let mut cache = self.local.write().unwrap();
92        cache.insert(name.to_string(), Some(binary.clone()));
93    }
94
95    #[allow(clippy::await_holding_lock)]
96    pub async fn get_binary(
97        &self,
98        name: &str,
99        fs: Option<&dyn FileSystem>,
100    ) -> Option<Arc<BinaryPackage>> {
101        self.get_executable(name, fs)
102            .await
103            .and_then(|executable| match executable {
104                Executable::Wasm(_) => None,
105                Executable::BinaryPackage(pkg) => Some(pkg),
106            })
107    }
108
109    pub fn spawn<'a>(
110        &'a self,
111        name: String,
112        env: WasiEnv,
113    ) -> Pin<Box<dyn Future<Output = Result<TaskJoinHandle, SpawnError>> + 'a>> {
114        Box::pin(async move {
115            // Find the binary (or die trying) and make the spawn type
116            let res = self
117                .get_executable(name.as_str(), Some(env.fs_root()))
118                .await
119                .ok_or_else(|| SpawnError::BinaryNotFound {
120                    binary: name.clone(),
121                });
122            let executable = res?;
123
124            // Execute
125            match executable {
126                Executable::Wasm(bytes) => {
127                    let data = HashedModuleData::new(bytes.clone());
128                    spawn_exec_wasm(data, name.as_str(), env, &self.runtime).await
129                }
130                Executable::BinaryPackage(pkg) => {
131                    {
132                        let cmd = package_command_by_name(&pkg, name.as_str())?;
133                        env.prepare_spawn(cmd);
134                    }
135
136                    spawn_exec(pkg.as_ref().clone(), name.as_str(), env, &self.runtime).await
137                }
138            }
139        })
140    }
141
142    pub fn try_built_in(
143        &self,
144        name: String,
145        parent_ctx: Option<&FunctionEnvMut<'_, WasiEnv>>,
146        builder: &mut Option<WasiEnv>,
147    ) -> Result<TaskJoinHandle, SpawnError> {
148        // We check for built in commands
149        if let Some(parent_ctx) = parent_ctx {
150            if self.commands.exists(name.as_str()) {
151                return self.commands.exec(parent_ctx, name.as_str(), builder);
152            }
153        } else if self.commands.exists(name.as_str()) {
154            tracing::warn!("builtin command without a parent ctx - {}", name);
155        }
156        Err(SpawnError::BinaryNotFound { binary: name })
157    }
158
159    // TODO: remove allow once BinFactory is refactored
160    // currently fine because a BinFactory is only used by a single process tree
161    #[allow(clippy::await_holding_lock)]
162    pub async fn get_executable(
163        &self,
164        name: &str,
165        fs: Option<&dyn FileSystem>,
166    ) -> Option<Executable> {
167        let name = name.to_string();
168
169        // Return early if the path is already cached
170        {
171            let cache = self.local.read().unwrap();
172            if let Some(data) = cache.get(&name) {
173                data.clone().map(Executable::BinaryPackage);
174            }
175        }
176
177        let mut cache = self.local.write().unwrap();
178
179        // Check the cache again to avoid a race condition where the cache was populated inbetween the fast path and here
180        if let Some(data) = cache.get(&name) {
181            return data.clone().map(Executable::BinaryPackage);
182        }
183
184        // Check the filesystem for the file
185        if name.starts_with('/')
186            && let Some(fs) = fs
187        {
188            match load_executable_from_filesystem(fs, name.as_ref(), self.runtime()).await {
189                Ok(executable) => {
190                    if let Executable::BinaryPackage(pkg) = &executable {
191                        cache.insert(name, Some(pkg.clone()));
192                    }
193
194                    return Some(executable);
195                }
196                Err(e) => {
197                    tracing::warn!(
198                        path = name,
199                        error = &*e,
200                        "Unable to load the package from disk"
201                    );
202                }
203            }
204        }
205
206        // NAK
207        cache.insert(name, None);
208        None
209    }
210}
211
212pub enum Executable {
213    Wasm(OwnedBuffer),
214    BinaryPackage(Arc<BinaryPackage>),
215}
216
217async fn load_executable_from_filesystem(
218    fs: &dyn FileSystem,
219    path: &Path,
220    rt: &(dyn Runtime + Send + Sync),
221) -> Result<Executable, anyhow::Error> {
222    let mut f = fs
223        .new_open_options()
224        .read(true)
225        .open(path)
226        .context("Unable to open the file")?;
227
228    // Fast path if the file is fully available in memory.
229    // Prevents redundant copying of the file data.
230    if let Some(buf) = f.as_owned_buffer() {
231        if wasmer_package::utils::is_container(buf.as_slice()) {
232            let bytes = buf.clone().into_bytes();
233            if let Ok(container) = from_bytes(bytes.clone()) {
234                let pkg = BinaryPackage::from_webc(&container, rt)
235                    .await
236                    .context("Unable to load the package")?;
237
238                return Ok(Executable::BinaryPackage(Arc::new(pkg)));
239            }
240        }
241
242        Ok(Executable::Wasm(buf))
243    } else {
244        let mut data = Vec::with_capacity(f.size() as usize);
245        f.read_to_end(&mut data).await.context("Read failed")?;
246
247        let bytes: bytes::Bytes = data.into();
248
249        if let Ok(container) = from_bytes(bytes.clone()) {
250            let pkg = BinaryPackage::from_webc(&container, rt)
251                .await
252                .context("Unable to load the package")?;
253
254            Ok(Executable::BinaryPackage(Arc::new(pkg)))
255        } else {
256            Ok(Executable::Wasm(OwnedBuffer::from_bytes(bytes)))
257        }
258    }
259}