wasmer_wasix/runners/wcgi/
runner.rs1use std::{borrow::Cow, net::SocketAddr, sync::Arc};
2
3use super::super::Body;
4use anyhow::{Context, Error};
5use futures::{StreamExt, stream::FuturesUnordered};
6use http::{Request, Response};
7use tower::ServiceBuilder;
8use tower_http::{catch_panic::CatchPanicLayer, cors::CorsLayer, trace::TraceLayer};
9use wcgi_host::CgiDialect;
10use webc::metadata::{
11 Command,
12 annotations::{Wasi, Wcgi},
13};
14
15use crate::{
16 Runtime, WasiEnvBuilder,
17 bin_factory::BinaryPackage,
18 capabilities::Capabilities,
19 runners::{
20 MappedDirectory,
21 wasi_common::CommonWasiOptions,
22 wcgi::handler::{Handler, SharedState},
23 },
24 runtime::{ModuleInput, task_manager::VirtualTaskManagerExt},
25};
26
27use super::Callbacks;
28
29#[derive(Debug)]
30pub struct WcgiRunner {
31 config: Config,
32}
33
34impl WcgiRunner {
35 pub fn new<C>(callbacks: C) -> Self
36 where
37 C: Callbacks,
38 {
39 Self {
40 config: Config::new(callbacks),
41 }
42 }
43
44 pub fn config(&mut self) -> &mut Config {
45 &mut self.config
46 }
47
48 #[tracing::instrument(skip_all)]
49 pub(crate) fn prepare_handler(
50 &mut self,
51 command_name: &str,
52 pkg: &BinaryPackage,
53 propagate_stderr: bool,
54 default_dialect: CgiDialect,
55 runtime: Arc<dyn Runtime + Send + Sync>,
56 ) -> Result<Handler, Error> {
57 let cmd = pkg
58 .get_command(command_name)
59 .with_context(|| format!("The package doesn't contain a \"{command_name}\" command"))?;
60 let metadata = cmd.metadata();
61 let wasi = metadata
62 .annotation("wasi")?
63 .unwrap_or_else(|| Wasi::new(command_name));
64
65 let input = ModuleInput::Command(Cow::Borrowed(cmd));
66 let module = runtime.resolve_module_sync(input, None, None)?;
67
68 let Wcgi { dialect, .. } = metadata.annotation("wcgi")?.unwrap_or_default();
69 let dialect = match dialect {
70 Some(d) => d.parse().context("Unable to parse the CGI dialect")?,
71 None => default_dialect,
72 };
73
74 let container_fs = pkg.webc_fs.clone();
75
76 let wasi_common = self.config.wasi.clone();
77 let rt = Arc::clone(&runtime);
78 let setup_builder = move |builder: &mut WasiEnvBuilder| {
79 let container_fs = container_fs.as_ref().map(|x| x.duplicate());
80 wasi_common.prepare_webc_env(builder, container_fs, &wasi, None)?;
81 builder.set_runtime(Arc::clone(&rt));
82 Ok(())
83 };
84
85 let shared = SharedState {
86 module,
87 module_hash: pkg.hash(),
88 dialect,
89 propagate_stderr,
90 program_name: command_name.to_string(),
91 setup_builder: Arc::new(setup_builder),
92 callbacks: Arc::clone(&self.config.callbacks),
93 runtime,
94 };
95
96 Ok(Handler::new(Arc::new(shared)))
97 }
98
99 pub(crate) fn run_command_with_handler<S>(
100 &mut self,
101 handler: S,
102 runtime: Arc<dyn Runtime + Send + Sync>,
103 ) -> Result<(), Error>
104 where
105 S: tower::Service<
106 Request<hyper::body::Incoming>,
107 Response = http::Response<Body>,
108 Error = anyhow::Error,
109 Future = std::pin::Pin<
110 Box<dyn futures::Future<Output = Result<Response<Body>, Error>> + Send>,
111 >,
112 >,
113 S: Clone + Send + Sync + 'static,
114 {
115 let service = ServiceBuilder::new()
116 .layer(
117 TraceLayer::new_for_http()
118 .make_span_with(|request: &Request<hyper::body::Incoming>| {
119 tracing::info_span!(
120 "request",
121 method = %request.method(),
122 uri = %request.uri(),
123 status_code = tracing::field::Empty,
124 )
125 })
126 .on_response(super::super::response_tracing::OnResponseTracer),
127 )
128 .layer(CatchPanicLayer::new())
129 .layer(CorsLayer::permissive())
130 .service(handler);
131
132 let address = self.config.addr;
133 tracing::info!(%address, "Starting the server");
134
135 let callbacks = Arc::clone(&self.config.callbacks);
136 runtime.task_manager().spawn_and_block_on(async move {
137 let (mut shutdown, abort_handle) =
138 futures::future::abortable(futures::future::pending::<()>());
139
140 callbacks.started(abort_handle);
141
142 let listener = tokio::net::TcpListener::bind(&address).await?;
143 let graceful = hyper_util::server::graceful::GracefulShutdown::new();
144
145 let http = hyper::server::conn::http1::Builder::new();
146
147 let mut futs = FuturesUnordered::new();
148
149 loop {
150 tokio::select! {
151 Ok((stream, _addr)) = listener.accept() => {
152 let io = hyper_util::rt::tokio::TokioIo::new(stream);
153 let service = hyper_util::service::TowerToHyperService::new(service.clone());
154 let conn = http.serve_connection(io, service);
155 let fut = graceful.watch(conn);
157 futs.push(async move {
158 if let Err(e) = fut.await {
159 eprintln!("Error serving connection: {e:?}");
160 }
161 });
162 },
163
164 _ = futs.next() => {}
165
166 _ = &mut shutdown => {
167 eprintln!("graceful shutdown signal received");
168 break;
170 }
171 }
172 }
173
174 Ok::<_, anyhow::Error>(())
175 })??;
176
177 Ok(())
178 }
179}
180
181impl crate::runners::Runner for WcgiRunner {
182 fn can_run_command(command: &Command) -> Result<bool, Error> {
183 Ok(command
184 .runner
185 .starts_with(webc::metadata::annotations::WCGI_RUNNER_URI))
186 }
187
188 fn run_command(
189 &mut self,
190 command_name: &str,
191 pkg: &BinaryPackage,
192 runtime: Arc<dyn Runtime + Send + Sync>,
193 ) -> Result<(), Error> {
194 let handler = self.prepare_handler(
195 command_name,
196 pkg,
197 false,
198 CgiDialect::Rfc3875,
199 Arc::clone(&runtime),
200 )?;
201 self.run_command_with_handler(handler, runtime)
202 }
203}
204
205#[derive(Debug)]
206pub struct Config {
207 pub(crate) wasi: CommonWasiOptions,
208 pub(crate) addr: SocketAddr,
209 pub(crate) callbacks: Arc<dyn Callbacks>,
210}
211
212impl Config {
213 pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
214 self.addr = addr;
215 self
216 }
217
218 pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
220 self.wasi.args.push(arg.into());
221 self
222 }
223
224 pub fn args<A, S>(&mut self, args: A) -> &mut Self
226 where
227 A: IntoIterator<Item = S>,
228 S: Into<String>,
229 {
230 self.wasi.args.extend(args.into_iter().map(|s| s.into()));
231 self
232 }
233
234 pub fn env(&mut self, name: impl Into<String>, value: impl Into<String>) -> &mut Self {
236 self.wasi.env.insert(name.into(), value.into());
237 self
238 }
239
240 pub fn envs<I, K, V>(&mut self, variables: I) -> &mut Self
242 where
243 I: IntoIterator<Item = (K, V)>,
244 K: Into<String>,
245 V: Into<String>,
246 {
247 self.wasi
248 .env
249 .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into())));
250 self
251 }
252
253 pub fn forward_host_env(&mut self) -> &mut Self {
255 self.wasi.forward_host_env = true;
256 self
257 }
258
259 pub fn map_directory(&mut self, dir: MappedDirectory) -> &mut Self {
260 self.wasi.mounts.push(dir.into());
261 self
262 }
263
264 pub fn map_directories(
265 &mut self,
266 mappings: impl IntoIterator<Item = MappedDirectory>,
267 ) -> &mut Self {
268 for mapping in mappings {
269 self.map_directory(mapping);
270 }
271 self
272 }
273
274 pub fn callbacks(&mut self, callbacks: impl Callbacks + 'static) -> &mut Self {
277 self.callbacks = Arc::new(callbacks);
278 self
279 }
280
281 pub fn inject_package(&mut self, pkg: BinaryPackage) -> &mut Self {
283 self.wasi.injected_packages.push(pkg);
284 self
285 }
286
287 pub fn inject_packages(
289 &mut self,
290 packages: impl IntoIterator<Item = BinaryPackage>,
291 ) -> &mut Self {
292 self.wasi.injected_packages.extend(packages);
293 self
294 }
295
296 pub fn capabilities(&mut self) -> &mut Capabilities {
297 &mut self.wasi.capabilities
298 }
299
300 #[cfg(feature = "journal")]
301 pub fn add_snapshot_trigger(&mut self, on: crate::journal::SnapshotTrigger) {
302 self.wasi.snapshot_on.push(on);
303 }
304
305 #[cfg(feature = "journal")]
306 pub fn add_default_snapshot_triggers(&mut self) -> &mut Self {
307 for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
308 if !self.has_snapshot_trigger(on) {
309 self.add_snapshot_trigger(on);
310 }
311 }
312 self
313 }
314
315 #[cfg(feature = "journal")]
316 pub fn has_snapshot_trigger(&self, on: crate::journal::SnapshotTrigger) -> bool {
317 self.wasi.snapshot_on.contains(&on)
318 }
319
320 #[cfg(feature = "journal")]
321 pub fn with_snapshot_interval(&mut self, period: std::time::Duration) -> &mut Self {
322 if !self.has_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval) {
323 self.add_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval);
324 }
325 self.wasi.snapshot_interval.replace(period);
326 self
327 }
328
329 #[cfg(feature = "journal")]
330 pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
331 self.wasi.stop_running_after_snapshot = stop_running;
332 }
333
334 #[cfg(feature = "journal")]
335 pub fn add_read_only_journal(
336 &mut self,
337 journal: Arc<crate::journal::DynReadableJournal>,
338 ) -> &mut Self {
339 self.wasi.read_only_journals.push(journal);
340 self
341 }
342
343 #[cfg(feature = "journal")]
344 pub fn add_writable_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
345 self.wasi.writable_journals.push(journal);
346 self
347 }
348}
349
350impl Config {
351 pub fn new<C>(callbacks: C) -> Self
352 where
353 C: Callbacks,
354 {
355 Self {
356 addr: ([127, 0, 0, 1], 8000).into(),
357 wasi: CommonWasiOptions::default(),
358 callbacks: Arc::new(callbacks),
359 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn send_and_sync() {
369 fn assert_send<T: Send>() {}
370 fn assert_sync<T: Sync>() {}
371
372 assert_send::<WcgiRunner>();
373 assert_sync::<WcgiRunner>();
374 }
375}