wasmer_wasix/runners/dproxy/
runner.rs

1use std::{net::SocketAddr, sync::Arc, time::Duration};
2
3use anyhow::{Context, Error};
4use futures::{StreamExt, stream::FuturesUnordered};
5use http::Request;
6use tower::ServiceBuilder;
7use tower_http::{catch_panic::CatchPanicLayer, cors::CorsLayer, trace::TraceLayer};
8use webc::metadata::Command;
9
10use crate::{
11    bin_factory::BinaryPackage,
12    runners::wasi::WasiRunner,
13    runtime::{DynRuntime, task_manager::VirtualTaskManagerExt},
14};
15
16use super::factory::DProxyInstanceFactory;
17
18#[derive(Debug)]
19pub struct DProxyRunner {
20    config: Config,
21    factory: DProxyInstanceFactory,
22}
23
24impl DProxyRunner {
25    pub fn new(inner: WasiRunner, pkg: &BinaryPackage) -> Self {
26        Self {
27            config: Config::new(inner, pkg),
28            factory: DProxyInstanceFactory::new(),
29        }
30    }
31
32    pub fn config(&mut self) -> &mut Config {
33        &mut self.config
34    }
35}
36
37/// The base URI used by a [`DProxy`] runner.
38pub const DPROXY_RUNNER_URI: &str = "https://webc.org/runner/dproxy";
39
40impl crate::runners::Runner for DProxyRunner {
41    fn can_run_command(command: &Command) -> Result<bool, Error> {
42        Ok(command.runner.starts_with(DPROXY_RUNNER_URI))
43    }
44
45    fn run_command(
46        &mut self,
47        command_name: &str,
48        _pkg: &BinaryPackage,
49        runtime: Arc<DynRuntime>,
50    ) -> Result<(), Error> {
51        // Create the handler that will process the HTTP requests
52        let handler = super::handler::Handler::new(
53            self.config.clone(),
54            command_name.to_string(),
55            self.factory.clone(),
56            runtime.clone(),
57        );
58
59        // We create a HTTP server which will reverse proxy all the requests
60        // to the proxy workload
61        let service = ServiceBuilder::new()
62            .layer(
63                TraceLayer::new_for_http()
64                    .make_span_with(|request: &Request<hyper::body::Incoming>| {
65                        tracing::info_span!(
66                            "request",
67                            method = %request.method(),
68                            uri = %request.uri(),
69                            status_code = tracing::field::Empty,
70                        )
71                    })
72                    .on_response(super::super::response_tracing::OnResponseTracer),
73            )
74            .layer(CatchPanicLayer::new())
75            .layer(CorsLayer::permissive())
76            .service(handler);
77
78        let address = self.config.addr;
79        tracing::info!(%address, "Starting the DProxy server");
80
81        runtime
82            .task_manager()
83            .spawn_and_block_on(async move {
84                let (mut shutdown, _abort_handle) =
85                    futures::future::abortable(futures::future::pending::<()>());
86
87                let listener = tokio::net::TcpListener::bind(&address).await?;
88                let graceful = hyper_util::server::graceful::GracefulShutdown::new();
89
90                let http = hyper::server::conn::http1::Builder::new();
91
92                let mut futs = FuturesUnordered::new();
93
94                loop {
95                    tokio::select! {
96                        Ok((stream, _addr)) = listener.accept() => {
97                            let io = hyper_util::rt::tokio::TokioIo::new(stream);
98                            let service = hyper_util::service::TowerToHyperService::new(service.clone());
99                            let conn = http.serve_connection(io, service);
100                            // watch this connection
101                            let fut = graceful.watch(conn);
102                            futs.push(async move {
103                                if let Err(e) = fut.await {
104                                    eprintln!("Error serving connection: {e:?}");
105                                }
106                            });
107                        },
108
109                        _ = futs.next() => {}
110
111                        _ = &mut shutdown => {
112                            tracing::info!("Shutting down gracefully");
113                            // stop the accept loop
114                            break;
115                        }
116                    }
117                }
118
119                Ok::<_, anyhow::Error>(())
120            })
121            .context("Unable to start the server")??;
122
123        Ok(())
124    }
125}
126
127#[derive(Debug, Clone)]
128pub struct Config {
129    pub(crate) inner: WasiRunner,
130    pub(crate) addr: SocketAddr,
131    pub(crate) pkg: BinaryPackage,
132    pub(crate) proxy_connect_init_timeout: Duration,
133    pub(crate) proxy_connect_nominal_timeout: Duration,
134}
135
136impl Config {
137    pub fn new(inner: WasiRunner, pkg: &BinaryPackage) -> Self {
138        Self {
139            inner,
140            pkg: pkg.clone(),
141            addr: ([127, 0, 0, 1], 8000).into(),
142            proxy_connect_init_timeout: Duration::from_secs(30),
143            proxy_connect_nominal_timeout: Duration::from_secs(30),
144        }
145    }
146
147    pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
148        self.addr = addr;
149        self
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn send_and_sync() {
159        fn assert_send<T: Send>() {}
160        fn assert_sync<T: Sync>() {}
161
162        assert_send::<DProxyRunner>();
163        assert_sync::<DProxyRunner>();
164    }
165}