wasmer_wasix/runners/wcgi/
runner.rs1use std::{net::SocketAddr, sync::Arc};
2
3use super::super::Body;
4use anyhow::{Context, Error};
5use futures::{StreamExt, stream::FuturesUnordered};
6use http::{Request, Response};
7use tower::ServiceBuilder;
8use tower_http::{catch_panic::CatchPanicLayer, cors::CorsLayer, trace::TraceLayer};
9use wcgi_host::CgiDialect;
10use webc::metadata::{
11 Command,
12 annotations::{Wasi, Wcgi},
13};
14
15use crate::{
16 Runtime, WasiEnvBuilder,
17 bin_factory::BinaryPackage,
18 capabilities::Capabilities,
19 runners::{
20 MappedDirectory,
21 wasi_common::CommonWasiOptions,
22 wcgi::handler::{Handler, SharedState},
23 },
24 runtime::task_manager::VirtualTaskManagerExt,
25};
26
27use super::Callbacks;
28
29#[derive(Debug)]
30pub struct WcgiRunner {
31 config: Config,
32}
33
34impl WcgiRunner {
35 pub fn new<C>(callbacks: C) -> Self
36 where
37 C: Callbacks,
38 {
39 Self {
40 config: Config::new(callbacks),
41 }
42 }
43
44 pub fn config(&mut self) -> &mut Config {
45 &mut self.config
46 }
47
48 #[tracing::instrument(skip_all)]
49 pub(crate) fn prepare_handler(
50 &mut self,
51 command_name: &str,
52 pkg: &BinaryPackage,
53 propagate_stderr: bool,
54 default_dialect: CgiDialect,
55 runtime: Arc<dyn Runtime + Send + Sync>,
56 ) -> Result<Handler, Error> {
57 let cmd = pkg
58 .get_command(command_name)
59 .with_context(|| format!("The package doesn't contain a \"{command_name}\" command"))?;
60 let metadata = cmd.metadata();
61 let wasi = metadata
62 .annotation("wasi")?
63 .unwrap_or_else(|| Wasi::new(command_name));
64
65 let module = runtime.load_command_module_sync(cmd)?;
66
67 let Wcgi { dialect, .. } = metadata.annotation("wcgi")?.unwrap_or_default();
68 let dialect = match dialect {
69 Some(d) => d.parse().context("Unable to parse the CGI dialect")?,
70 None => default_dialect,
71 };
72
73 let container_fs = pkg.webc_fs.clone();
74
75 let wasi_common = self.config.wasi.clone();
76 let rt = Arc::clone(&runtime);
77 let setup_builder = move |builder: &mut WasiEnvBuilder| {
78 let container_fs = container_fs.as_ref().map(|x| x.duplicate());
79 wasi_common.prepare_webc_env(builder, container_fs, &wasi, None)?;
80 builder.set_runtime(Arc::clone(&rt));
81 Ok(())
82 };
83
84 let shared = SharedState {
85 module,
86 module_hash: pkg.hash(),
87 dialect,
88 propagate_stderr,
89 program_name: command_name.to_string(),
90 setup_builder: Arc::new(setup_builder),
91 callbacks: Arc::clone(&self.config.callbacks),
92 runtime,
93 };
94
95 Ok(Handler::new(Arc::new(shared)))
96 }
97
98 pub(crate) fn run_command_with_handler<S>(
99 &mut self,
100 handler: S,
101 runtime: Arc<dyn Runtime + Send + Sync>,
102 ) -> Result<(), Error>
103 where
104 S: tower::Service<
105 Request<hyper::body::Incoming>,
106 Response = http::Response<Body>,
107 Error = anyhow::Error,
108 Future = std::pin::Pin<
109 Box<dyn futures::Future<Output = Result<Response<Body>, Error>> + Send>,
110 >,
111 >,
112 S: Clone + Send + Sync + 'static,
113 {
114 let service = ServiceBuilder::new()
115 .layer(
116 TraceLayer::new_for_http()
117 .make_span_with(|request: &Request<hyper::body::Incoming>| {
118 tracing::info_span!(
119 "request",
120 method = %request.method(),
121 uri = %request.uri(),
122 status_code = tracing::field::Empty,
123 )
124 })
125 .on_response(super::super::response_tracing::OnResponseTracer),
126 )
127 .layer(CatchPanicLayer::new())
128 .layer(CorsLayer::permissive())
129 .service(handler);
130
131 let address = self.config.addr;
132 tracing::info!(%address, "Starting the server");
133
134 let callbacks = Arc::clone(&self.config.callbacks);
135 runtime.task_manager().spawn_and_block_on(async move {
136 let (mut shutdown, abort_handle) =
137 futures::future::abortable(futures::future::pending::<()>());
138
139 callbacks.started(abort_handle);
140
141 let listener = tokio::net::TcpListener::bind(&address).await?;
142 let graceful = hyper_util::server::graceful::GracefulShutdown::new();
143
144 let http = hyper::server::conn::http1::Builder::new();
145
146 let mut futs = FuturesUnordered::new();
147
148 loop {
149 tokio::select! {
150 Ok((stream, _addr)) = listener.accept() => {
151 let io = hyper_util::rt::tokio::TokioIo::new(stream);
152 let service = hyper_util::service::TowerToHyperService::new(service.clone());
153 let conn = http.serve_connection(io, service);
154 let fut = graceful.watch(conn);
156 futs.push(async move {
157 if let Err(e) = fut.await {
158 eprintln!("Error serving connection: {e:?}");
159 }
160 });
161 },
162
163 _ = futs.next() => {}
164
165 _ = &mut shutdown => {
166 eprintln!("graceful shutdown signal received");
167 break;
169 }
170 }
171 }
172
173 Ok::<_, anyhow::Error>(())
174 })??;
175
176 Ok(())
177 }
178}
179
180impl crate::runners::Runner for WcgiRunner {
181 fn can_run_command(command: &Command) -> Result<bool, Error> {
182 Ok(command
183 .runner
184 .starts_with(webc::metadata::annotations::WCGI_RUNNER_URI))
185 }
186
187 fn run_command(
188 &mut self,
189 command_name: &str,
190 pkg: &BinaryPackage,
191 runtime: Arc<dyn Runtime + Send + Sync>,
192 ) -> Result<(), Error> {
193 let handler = self.prepare_handler(
194 command_name,
195 pkg,
196 false,
197 CgiDialect::Rfc3875,
198 Arc::clone(&runtime),
199 )?;
200 self.run_command_with_handler(handler, runtime)
201 }
202}
203
204#[derive(Debug)]
205pub struct Config {
206 pub(crate) wasi: CommonWasiOptions,
207 pub(crate) addr: SocketAddr,
208 pub(crate) callbacks: Arc<dyn Callbacks>,
209}
210
211impl Config {
212 pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
213 self.addr = addr;
214 self
215 }
216
217 pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
219 self.wasi.args.push(arg.into());
220 self
221 }
222
223 pub fn args<A, S>(&mut self, args: A) -> &mut Self
225 where
226 A: IntoIterator<Item = S>,
227 S: Into<String>,
228 {
229 self.wasi.args.extend(args.into_iter().map(|s| s.into()));
230 self
231 }
232
233 pub fn env(&mut self, name: impl Into<String>, value: impl Into<String>) -> &mut Self {
235 self.wasi.env.insert(name.into(), value.into());
236 self
237 }
238
239 pub fn envs<I, K, V>(&mut self, variables: I) -> &mut Self
241 where
242 I: IntoIterator<Item = (K, V)>,
243 K: Into<String>,
244 V: Into<String>,
245 {
246 self.wasi
247 .env
248 .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into())));
249 self
250 }
251
252 pub fn forward_host_env(&mut self) -> &mut Self {
254 self.wasi.forward_host_env = true;
255 self
256 }
257
258 pub fn map_directory(&mut self, dir: MappedDirectory) -> &mut Self {
259 self.wasi.mounts.push(dir.into());
260 self
261 }
262
263 pub fn map_directories(
264 &mut self,
265 mappings: impl IntoIterator<Item = MappedDirectory>,
266 ) -> &mut Self {
267 for mapping in mappings {
268 self.map_directory(mapping);
269 }
270 self
271 }
272
273 pub fn callbacks(&mut self, callbacks: impl Callbacks + 'static) -> &mut Self {
276 self.callbacks = Arc::new(callbacks);
277 self
278 }
279
280 pub fn inject_package(&mut self, pkg: BinaryPackage) -> &mut Self {
282 self.wasi.injected_packages.push(pkg);
283 self
284 }
285
286 pub fn inject_packages(
288 &mut self,
289 packages: impl IntoIterator<Item = BinaryPackage>,
290 ) -> &mut Self {
291 self.wasi.injected_packages.extend(packages);
292 self
293 }
294
295 pub fn capabilities(&mut self) -> &mut Capabilities {
296 &mut self.wasi.capabilities
297 }
298
299 #[cfg(feature = "journal")]
300 pub fn add_snapshot_trigger(&mut self, on: crate::journal::SnapshotTrigger) {
301 self.wasi.snapshot_on.push(on);
302 }
303
304 #[cfg(feature = "journal")]
305 pub fn add_default_snapshot_triggers(&mut self) -> &mut Self {
306 for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
307 if !self.has_snapshot_trigger(on) {
308 self.add_snapshot_trigger(on);
309 }
310 }
311 self
312 }
313
314 #[cfg(feature = "journal")]
315 pub fn has_snapshot_trigger(&self, on: crate::journal::SnapshotTrigger) -> bool {
316 self.wasi.snapshot_on.contains(&on)
317 }
318
319 #[cfg(feature = "journal")]
320 pub fn with_snapshot_interval(&mut self, period: std::time::Duration) -> &mut Self {
321 if !self.has_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval) {
322 self.add_snapshot_trigger(crate::journal::SnapshotTrigger::PeriodicInterval);
323 }
324 self.wasi.snapshot_interval.replace(period);
325 self
326 }
327
328 #[cfg(feature = "journal")]
329 pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
330 self.wasi.stop_running_after_snapshot = stop_running;
331 }
332
333 #[cfg(feature = "journal")]
334 pub fn add_read_only_journal(
335 &mut self,
336 journal: Arc<crate::journal::DynReadableJournal>,
337 ) -> &mut Self {
338 self.wasi.read_only_journals.push(journal);
339 self
340 }
341
342 #[cfg(feature = "journal")]
343 pub fn add_writable_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
344 self.wasi.writable_journals.push(journal);
345 self
346 }
347}
348
349impl Config {
350 pub fn new<C>(callbacks: C) -> Self
351 where
352 C: Callbacks,
353 {
354 Self {
355 addr: ([127, 0, 0, 1], 8000).into(),
356 wasi: CommonWasiOptions::default(),
357 callbacks: Arc::new(callbacks),
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn send_and_sync() {
368 fn assert_send<T: Send>() {}
369 fn assert_sync<T: Sync>() {}
370
371 assert_send::<WcgiRunner>();
372 assert_sync::<WcgiRunner>();
373 }
374}