wasmer_wasix/runners/wcgi/
runner.rs

1use std::{net::SocketAddr, sync::Arc};
2
3use super::super::Body;
4use anyhow::{Context, Error};
5use futures::{StreamExt, stream::FuturesUnordered};
6use http::{Request, Response};
7use tower::ServiceBuilder;
8use tower_http::{catch_panic::CatchPanicLayer, cors::CorsLayer, trace::TraceLayer};
9use wcgi_host::CgiDialect;
10use webc::metadata::{
11    Command,
12    annotations::{Wasi, Wcgi},
13};
14
15use crate::{
16    Runtime, WasiEnvBuilder,
17    bin_factory::BinaryPackage,
18    capabilities::Capabilities,
19    runners::{
20        MappedDirectory,
21        wasi_common::CommonWasiOptions,
22        wcgi::handler::{Handler, SharedState},
23    },
24    runtime::task_manager::VirtualTaskManagerExt,
25};
26
27use super::Callbacks;
28
29#[derive(Debug)]
30pub struct WcgiRunner {
31    config: Config,
32}
33
34impl WcgiRunner {
35    pub fn new<C>(callbacks: C) -> Self
36    where
37        C: Callbacks,
38    {
39        Self {
40            config: Config::new(callbacks),
41        }
42    }
43
44    pub fn config(&mut self) -> &mut Config {
45        &mut self.config
46    }
47
48    #[tracing::instrument(skip_all)]
49    pub(crate) fn prepare_handler(
50        &mut self,
51        command_name: &str,
52        pkg: &BinaryPackage,
53        propagate_stderr: bool,
54        default_dialect: CgiDialect,
55        runtime: Arc<dyn Runtime + Send + Sync>,
56    ) -> Result<Handler, Error> {
57        let cmd = pkg
58            .get_command(command_name)
59            .with_context(|| format!("The package doesn't contain a \"{command_name}\" command"))?;
60        let metadata = cmd.metadata();
61        let wasi = metadata
62            .annotation("wasi")?
63            .unwrap_or_else(|| Wasi::new(command_name));
64
65        let module = runtime.load_command_module_sync(cmd)?;
66
67        let Wcgi { dialect, .. } = metadata.annotation("wcgi")?.unwrap_or_default();
68        let dialect = match dialect {
69            Some(d) => d.parse().context("Unable to parse the CGI dialect")?,
70            None => default_dialect,
71        };
72
73        let container_fs = pkg.webc_fs.clone();
74
75        let wasi_common = self.config.wasi.clone();
76        let rt = Arc::clone(&runtime);
77        let setup_builder = move |builder: &mut WasiEnvBuilder| {
78            let container_fs = container_fs.as_ref().map(|x| x.duplicate());
79            wasi_common.prepare_webc_env(builder, container_fs, &wasi, None)?;
80            builder.set_runtime(Arc::clone(&rt));
81            Ok(())
82        };
83
84        let shared = SharedState {
85            module,
86            module_hash: pkg.hash(),
87            dialect,
88            propagate_stderr,
89            program_name: command_name.to_string(),
90            setup_builder: Arc::new(setup_builder),
91            callbacks: Arc::clone(&self.config.callbacks),
92            runtime,
93        };
94
95        Ok(Handler::new(Arc::new(shared)))
96    }
97
98    pub(crate) fn run_command_with_handler<S>(
99        &mut self,
100        handler: S,
101        runtime: Arc<dyn Runtime + Send + Sync>,
102    ) -> Result<(), Error>
103    where
104        S: tower::Service<
105                Request<hyper::body::Incoming>,
106                Response = http::Response<Body>,
107                Error = anyhow::Error,
108                Future = std::pin::Pin<
109                    Box<dyn futures::Future<Output = Result<Response<Body>, Error>> + Send>,
110                >,
111            >,
112        S: Clone + Send + Sync + 'static,
113    {
114        let service = ServiceBuilder::new()
115            .layer(
116                TraceLayer::new_for_http()
117                    .make_span_with(|request: &Request<hyper::body::Incoming>| {
118                        tracing::info_span!(
119                            "request",
120                            method = %request.method(),
121                            uri = %request.uri(),
122                            status_code = tracing::field::Empty,
123                        )
124                    })
125                    .on_response(super::super::response_tracing::OnResponseTracer),
126            )
127            .layer(CatchPanicLayer::new())
128            .layer(CorsLayer::permissive())
129            .service(handler);
130
131        let address = self.config.addr;
132        tracing::info!(%address, "Starting the server");
133
134        let callbacks = Arc::clone(&self.config.callbacks);
135        runtime.task_manager().spawn_and_block_on(async move {
136            let (mut shutdown, abort_handle) =
137                futures::future::abortable(futures::future::pending::<()>());
138
139            callbacks.started(abort_handle);
140
141            let listener = tokio::net::TcpListener::bind(&address).await?;
142            let graceful = hyper_util::server::graceful::GracefulShutdown::new();
143
144            let http = hyper::server::conn::http1::Builder::new();
145
146            let mut futs = FuturesUnordered::new();
147
148            loop {
149                tokio::select! {
150                    Ok((stream, _addr)) = listener.accept() => {
151                        let io = hyper_util::rt::tokio::TokioIo::new(stream);
152                        let service = hyper_util::service::TowerToHyperService::new(service.clone());
153                        let conn = http.serve_connection(io, service);
154                        // watch this connection
155                        let fut = graceful.watch(conn);
156                        futs.push(async move {
157                            if let Err(e) = fut.await {
158                                eprintln!("Error serving connection: {e:?}");
159                            }
160                        });
161                    },
162
163                    _ = futs.next() => {}
164
165                    _ = &mut shutdown => {
166                        eprintln!("graceful shutdown signal received");
167                        // stop the accept loop
168                        break;
169                    }
170                }
171            }
172
173            Ok::<_, anyhow::Error>(())
174        })??;
175
176        Ok(())
177    }
178}
179
180impl crate::runners::Runner for WcgiRunner {
181    fn can_run_command(command: &Command) -> Result<bool, Error> {
182        Ok(command
183            .runner
184            .starts_with(webc::metadata::annotations::WCGI_RUNNER_URI))
185    }
186
187    fn run_command(
188        &mut self,
189        command_name: &str,
190        pkg: &BinaryPackage,
191        runtime: Arc<dyn Runtime + Send + Sync>,
192    ) -> Result<(), Error> {
193        let handler = self.prepare_handler(
194            command_name,
195            pkg,
196            false,
197            CgiDialect::Rfc3875,
198            Arc::clone(&runtime),
199        )?;
200        self.run_command_with_handler(handler, runtime)
201    }
202}
203
204#[derive(Debug)]
205pub struct Config {
206    pub(crate) wasi: CommonWasiOptions,
207    pub(crate) addr: SocketAddr,
208    pub(crate) callbacks: Arc<dyn Callbacks>,
209}
210
211impl Config {
212    pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
213        self.addr = addr;
214        self
215    }
216
217    /// Add an argument to the WASI executable's command-line arguments.
218    pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
219        self.wasi.args.push(arg.into());
220        self
221    }
222
223    /// Add multiple arguments to the WASI executable's command-line arguments.
224    pub fn args<A, S>(&mut self, args: A) -> &mut Self
225    where
226        A: IntoIterator<Item = S>,
227        S: Into<String>,
228    {
229        self.wasi.args.extend(args.into_iter().map(|s| s.into()));
230        self
231    }
232
233    /// Expose an environment variable to the guest.
234    pub fn env(&mut self, name: impl Into<String>, value: impl Into<String>) -> &mut Self {
235        self.wasi.env.insert(name.into(), value.into());
236        self
237    }
238
239    /// Expose multiple environment variables to the guest.
240    pub fn envs<I, K, V>(&mut self, variables: I) -> &mut Self
241    where
242        I: IntoIterator<Item = (K, V)>,
243        K: Into<String>,
244        V: Into<String>,
245    {
246        self.wasi
247            .env
248            .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into())));
249        self
250    }
251
252    /// Forward all of the host's environment variables to the guest.
253    pub fn forward_host_env(&mut self) -> &mut Self {
254        self.wasi.forward_host_env = true;
255        self
256    }
257
258    pub fn map_directory(&mut self, dir: MappedDirectory) -> &mut Self {
259        self.wasi.mounts.push(dir.into());
260        self
261    }
262
263    pub fn map_directories(
264        &mut self,
265        mappings: impl IntoIterator<Item = MappedDirectory>,
266    ) -> &mut Self {
267        for mapping in mappings {
268            self.map_directory(mapping);
269        }
270        self
271    }
272
273    /// Set callbacks that will be triggered at various points in the runner's
274    /// lifecycle.
275    pub fn callbacks(&mut self, callbacks: impl Callbacks + 'static) -> &mut Self {
276        self.callbacks = Arc::new(callbacks);
277        self
278    }
279
280    /// Add a package that should be available to the instance at runtime.
281    pub fn inject_package(&mut self, pkg: BinaryPackage) -> &mut Self {
282        self.wasi.injected_packages.push(pkg);
283        self
284    }
285
286    /// Add packages that should be available to the instance at runtime.
287    pub fn inject_packages(
288        &mut self,
289        packages: impl IntoIterator<Item = BinaryPackage>,
290    ) -> &mut Self {
291        self.wasi.injected_packages.extend(packages);
292        self
293    }
294
295    pub fn capabilities(&mut self) -> &mut Capabilities {
296        &mut self.wasi.capabilities
297    }
298
299    #[cfg(feature = "journal")]
300    pub fn add_snapshot_trigger(&mut self, on: crate::journal::SnapshotTrigger) {
301        self.wasi.snapshot_on.push(on);
302    }
303
304    #[cfg(feature = "journal")]
305    pub fn add_default_snapshot_triggers(&mut self) -> &mut Self {
306        for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
307            if !self.has_snapshot_trigger(on) {
308                self.add_snapshot_trigger(on);
309            }
310        }
311        self
312    }
313
314    #[cfg(feature = "journal")]
315    pub fn has_snapshot_trigger(&self, on: crate::journal::SnapshotTrigger) -> bool {
316        self.wasi.snapshot_on.contains(&on)
317    }
318
319    #[cfg(feature = "journal")]
320    pub fn with_snapshot_interval(&mut self, period: std::time::Duration) -> &mut Self {
321        if !self.has_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval) {
322            self.add_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval);
323        }
324        self.wasi.snapshot_interval.replace(period);
325        self
326    }
327
328    #[cfg(feature = "journal")]
329    pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
330        self.wasi.stop_running_after_snapshot = stop_running;
331    }
332
333    #[cfg(feature = "journal")]
334    pub fn add_read_only_journal(
335        &mut self,
336        journal: Arc<crate::journal::DynReadableJournal>,
337    ) -> &mut Self {
338        self.wasi.read_only_journals.push(journal);
339        self
340    }
341
342    #[cfg(feature = "journal")]
343    pub fn add_writable_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
344        self.wasi.writable_journals.push(journal);
345        self
346    }
347}
348
349impl Config {
350    pub fn new<C>(callbacks: C) -> Self
351    where
352        C: Callbacks,
353    {
354        Self {
355            addr: ([127, 0, 0, 1], 8000).into(),
356            wasi: CommonWasiOptions::default(),
357            callbacks: Arc::new(callbacks),
358        }
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn send_and_sync() {
368        fn assert_send<T: Send>() {}
369        fn assert_sync<T: Sync>() {}
370
371        assert_send::<WcgiRunner>();
372        assert_sync::<WcgiRunner>();
373    }
374}