wasmer_wasix/runners/wcgi/
runner.rs

1use std::{borrow::Cow, net::SocketAddr, sync::Arc};
2
3use super::super::Body;
4use anyhow::{Context, Error};
5use futures::{StreamExt, stream::FuturesUnordered};
6use http::{Request, Response};
7use tower::ServiceBuilder;
8use tower_http::{catch_panic::CatchPanicLayer, cors::CorsLayer, trace::TraceLayer};
9use wcgi_host::CgiDialect;
10use webc::metadata::{
11    Command,
12    annotations::{Wasi, Wcgi},
13};
14
15use crate::{
16    Runtime, WasiEnvBuilder,
17    bin_factory::BinaryPackage,
18    capabilities::Capabilities,
19    runners::{
20        MappedDirectory,
21        wasi_common::CommonWasiOptions,
22        wcgi::handler::{Handler, SharedState},
23    },
24    runtime::{ModuleInput, task_manager::VirtualTaskManagerExt},
25};
26
27use super::Callbacks;
28
29#[derive(Debug)]
30pub struct WcgiRunner {
31    config: Config,
32}
33
34impl WcgiRunner {
35    pub fn new<C>(callbacks: C) -> Self
36    where
37        C: Callbacks,
38    {
39        Self {
40            config: Config::new(callbacks),
41        }
42    }
43
44    pub fn config(&mut self) -> &mut Config {
45        &mut self.config
46    }
47
48    #[tracing::instrument(skip_all)]
49    pub(crate) fn prepare_handler(
50        &mut self,
51        command_name: &str,
52        pkg: &BinaryPackage,
53        propagate_stderr: bool,
54        default_dialect: CgiDialect,
55        runtime: Arc<dyn Runtime + Send + Sync>,
56    ) -> Result<Handler, Error> {
57        let cmd = pkg
58            .get_command(command_name)
59            .with_context(|| format!("The package doesn't contain a \"{command_name}\" command"))?;
60        let metadata = cmd.metadata();
61        let wasi = metadata
62            .annotation("wasi")?
63            .unwrap_or_else(|| Wasi::new(command_name));
64
65        let input = ModuleInput::Command(Cow::Borrowed(cmd));
66        let module = runtime.resolve_module_sync(input, None, None)?;
67
68        let Wcgi { dialect, .. } = metadata.annotation("wcgi")?.unwrap_or_default();
69        let dialect = match dialect {
70            Some(d) => d.parse().context("Unable to parse the CGI dialect")?,
71            None => default_dialect,
72        };
73
74        let container_fs = pkg.webc_fs.clone();
75
76        let wasi_common = self.config.wasi.clone();
77        let rt = Arc::clone(&runtime);
78        let setup_builder = move |builder: &mut WasiEnvBuilder| {
79            let container_fs = container_fs.as_ref().map(|x| x.duplicate());
80            wasi_common.prepare_webc_env(builder, container_fs, &wasi, None)?;
81            builder.set_runtime(Arc::clone(&rt));
82            Ok(())
83        };
84
85        let shared = SharedState {
86            module,
87            module_hash: pkg.hash(),
88            dialect,
89            propagate_stderr,
90            program_name: command_name.to_string(),
91            setup_builder: Arc::new(setup_builder),
92            callbacks: Arc::clone(&self.config.callbacks),
93            runtime,
94        };
95
96        Ok(Handler::new(Arc::new(shared)))
97    }
98
99    pub(crate) fn run_command_with_handler<S>(
100        &mut self,
101        handler: S,
102        runtime: Arc<dyn Runtime + Send + Sync>,
103    ) -> Result<(), Error>
104    where
105        S: tower::Service<
106                Request<hyper::body::Incoming>,
107                Response = http::Response<Body>,
108                Error = anyhow::Error,
109                Future = std::pin::Pin<
110                    Box<dyn futures::Future<Output = Result<Response<Body>, Error>> + Send>,
111                >,
112            >,
113        S: Clone + Send + Sync + 'static,
114    {
115        let service = ServiceBuilder::new()
116            .layer(
117                TraceLayer::new_for_http()
118                    .make_span_with(|request: &Request<hyper::body::Incoming>| {
119                        tracing::info_span!(
120                            "request",
121                            method = %request.method(),
122                            uri = %request.uri(),
123                            status_code = tracing::field::Empty,
124                        )
125                    })
126                    .on_response(super::super::response_tracing::OnResponseTracer),
127            )
128            .layer(CatchPanicLayer::new())
129            .layer(CorsLayer::permissive())
130            .service(handler);
131
132        let address = self.config.addr;
133        tracing::info!(%address, "Starting the server");
134
135        let callbacks = Arc::clone(&self.config.callbacks);
136        runtime.task_manager().spawn_and_block_on(async move {
137            let (mut shutdown, abort_handle) =
138                futures::future::abortable(futures::future::pending::<()>());
139
140            callbacks.started(abort_handle);
141
142            let listener = tokio::net::TcpListener::bind(&address).await?;
143            let graceful = hyper_util::server::graceful::GracefulShutdown::new();
144
145            let http = hyper::server::conn::http1::Builder::new();
146
147            let mut futs = FuturesUnordered::new();
148
149            loop {
150                tokio::select! {
151                    Ok((stream, _addr)) = listener.accept() => {
152                        let io = hyper_util::rt::tokio::TokioIo::new(stream);
153                        let service = hyper_util::service::TowerToHyperService::new(service.clone());
154                        let conn = http.serve_connection(io, service);
155                        // watch this connection
156                        let fut = graceful.watch(conn);
157                        futs.push(async move {
158                            if let Err(e) = fut.await {
159                                eprintln!("Error serving connection: {e:?}");
160                            }
161                        });
162                    },
163
164                    _ = futs.next() => {}
165
166                    _ = &mut shutdown => {
167                        eprintln!("graceful shutdown signal received");
168                        // stop the accept loop
169                        break;
170                    }
171                }
172            }
173
174            Ok::<_, anyhow::Error>(())
175        })??;
176
177        Ok(())
178    }
179}
180
181impl crate::runners::Runner for WcgiRunner {
182    fn can_run_command(command: &Command) -> Result<bool, Error> {
183        Ok(command
184            .runner
185            .starts_with(webc::metadata::annotations::WCGI_RUNNER_URI))
186    }
187
188    fn run_command(
189        &mut self,
190        command_name: &str,
191        pkg: &BinaryPackage,
192        runtime: Arc<dyn Runtime + Send + Sync>,
193    ) -> Result<(), Error> {
194        let handler = self.prepare_handler(
195            command_name,
196            pkg,
197            false,
198            CgiDialect::Rfc3875,
199            Arc::clone(&runtime),
200        )?;
201        self.run_command_with_handler(handler, runtime)
202    }
203}
204
205#[derive(Debug)]
206pub struct Config {
207    pub(crate) wasi: CommonWasiOptions,
208    pub(crate) addr: SocketAddr,
209    pub(crate) callbacks: Arc<dyn Callbacks>,
210}
211
212impl Config {
213    pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
214        self.addr = addr;
215        self
216    }
217
218    /// Add an argument to the WASI executable's command-line arguments.
219    pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
220        self.wasi.args.push(arg.into());
221        self
222    }
223
224    /// Add multiple arguments to the WASI executable's command-line arguments.
225    pub fn args<A, S>(&mut self, args: A) -> &mut Self
226    where
227        A: IntoIterator<Item = S>,
228        S: Into<String>,
229    {
230        self.wasi.args.extend(args.into_iter().map(|s| s.into()));
231        self
232    }
233
234    /// Expose an environment variable to the guest.
235    pub fn env(&mut self, name: impl Into<String>, value: impl Into<String>) -> &mut Self {
236        self.wasi.env.insert(name.into(), value.into());
237        self
238    }
239
240    /// Expose multiple environment variables to the guest.
241    pub fn envs<I, K, V>(&mut self, variables: I) -> &mut Self
242    where
243        I: IntoIterator<Item = (K, V)>,
244        K: Into<String>,
245        V: Into<String>,
246    {
247        self.wasi
248            .env
249            .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into())));
250        self
251    }
252
253    /// Forward all of the host's environment variables to the guest.
254    pub fn forward_host_env(&mut self) -> &mut Self {
255        self.wasi.forward_host_env = true;
256        self
257    }
258
259    pub fn map_directory(&mut self, dir: MappedDirectory) -> &mut Self {
260        self.wasi.mounts.push(dir.into());
261        self
262    }
263
264    pub fn map_directories(
265        &mut self,
266        mappings: impl IntoIterator<Item = MappedDirectory>,
267    ) -> &mut Self {
268        for mapping in mappings {
269            self.map_directory(mapping);
270        }
271        self
272    }
273
274    /// Set callbacks that will be triggered at various points in the runner's
275    /// lifecycle.
276    pub fn callbacks(&mut self, callbacks: impl Callbacks + 'static) -> &mut Self {
277        self.callbacks = Arc::new(callbacks);
278        self
279    }
280
281    /// Add a package that should be available to the instance at runtime.
282    pub fn inject_package(&mut self, pkg: BinaryPackage) -> &mut Self {
283        self.wasi.injected_packages.push(pkg);
284        self
285    }
286
287    /// Add packages that should be available to the instance at runtime.
288    pub fn inject_packages(
289        &mut self,
290        packages: impl IntoIterator<Item = BinaryPackage>,
291    ) -> &mut Self {
292        self.wasi.injected_packages.extend(packages);
293        self
294    }
295
296    pub fn capabilities(&mut self) -> &mut Capabilities {
297        &mut self.wasi.capabilities
298    }
299
300    #[cfg(feature = "journal")]
301    pub fn add_snapshot_trigger(&mut self, on: crate::journal::SnapshotTrigger) {
302        self.wasi.snapshot_on.push(on);
303    }
304
305    #[cfg(feature = "journal")]
306    pub fn add_default_snapshot_triggers(&mut self) -> &mut Self {
307        for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
308            if !self.has_snapshot_trigger(on) {
309                self.add_snapshot_trigger(on);
310            }
311        }
312        self
313    }
314
315    #[cfg(feature = "journal")]
316    pub fn has_snapshot_trigger(&self, on: crate::journal::SnapshotTrigger) -> bool {
317        self.wasi.snapshot_on.contains(&on)
318    }
319
320    #[cfg(feature = "journal")]
321    pub fn with_snapshot_interval(&mut self, period: std::time::Duration) -> &mut Self {
322        if !self.has_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval) {
323            self.add_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval);
324        }
325        self.wasi.snapshot_interval.replace(period);
326        self
327    }
328
329    #[cfg(feature = "journal")]
330    pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
331        self.wasi.stop_running_after_snapshot = stop_running;
332    }
333
334    #[cfg(feature = "journal")]
335    pub fn add_read_only_journal(
336        &mut self,
337        journal: Arc<crate::journal::DynReadableJournal>,
338    ) -> &mut Self {
339        self.wasi.read_only_journals.push(journal);
340        self
341    }
342
343    #[cfg(feature = "journal")]
344    pub fn add_writable_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
345        self.wasi.writable_journals.push(journal);
346        self
347    }
348}
349
350impl Config {
351    pub fn new<C>(callbacks: C) -> Self
352    where
353        C: Callbacks,
354    {
355        Self {
356            addr: ([127, 0, 0, 1], 8000).into(),
357            wasi: CommonWasiOptions::default(),
358            callbacks: Arc::new(callbacks),
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn send_and_sync() {
369        fn assert_send<T: Send>() {}
370        fn assert_sync<T: Sync>() {}
371
372        assert_send::<WcgiRunner>();
373        assert_sync::<WcgiRunner>();
374    }
375}