wasmer_wasix/runners/dproxy/
runner.rs1use std::{net::SocketAddr, sync::Arc, time::Duration};
2
3use anyhow::{Context, Error};
4use futures::{StreamExt, stream::FuturesUnordered};
5use http::Request;
6use tower::ServiceBuilder;
7use tower_http::{catch_panic::CatchPanicLayer, cors::CorsLayer, trace::TraceLayer};
8use webc::metadata::Command;
9
10use crate::{
11 bin_factory::BinaryPackage,
12 runners::wasi::WasiRunner,
13 runtime::{DynRuntime, task_manager::VirtualTaskManagerExt},
14};
15
16use super::factory::DProxyInstanceFactory;
17
18#[derive(Debug)]
19pub struct DProxyRunner {
20 config: Config,
21 factory: DProxyInstanceFactory,
22}
23
24impl DProxyRunner {
25 pub fn new(inner: WasiRunner, pkg: &BinaryPackage) -> Self {
26 Self {
27 config: Config::new(inner, pkg),
28 factory: DProxyInstanceFactory::new(),
29 }
30 }
31
32 pub fn config(&mut self) -> &mut Config {
33 &mut self.config
34 }
35}
36
37pub const DPROXY_RUNNER_URI: &str = "https://webc.org/runner/dproxy";
39
40impl crate::runners::Runner for DProxyRunner {
41 fn can_run_command(command: &Command) -> Result<bool, Error> {
42 Ok(command.runner.starts_with(DPROXY_RUNNER_URI))
43 }
44
45 fn run_command(
46 &mut self,
47 command_name: &str,
48 _pkg: &BinaryPackage,
49 runtime: Arc<DynRuntime>,
50 ) -> Result<(), Error> {
51 let handler = super::handler::Handler::new(
53 self.config.clone(),
54 command_name.to_string(),
55 self.factory.clone(),
56 runtime.clone(),
57 );
58
59 let service = ServiceBuilder::new()
62 .layer(
63 TraceLayer::new_for_http()
64 .make_span_with(|request: &Request<hyper::body::Incoming>| {
65 tracing::info_span!(
66 "request",
67 method = %request.method(),
68 uri = %request.uri(),
69 status_code = tracing::field::Empty,
70 )
71 })
72 .on_response(super::super::response_tracing::OnResponseTracer),
73 )
74 .layer(CatchPanicLayer::new())
75 .layer(CorsLayer::permissive())
76 .service(handler);
77
78 let address = self.config.addr;
79 tracing::info!(%address, "Starting the DProxy server");
80
81 runtime
82 .task_manager()
83 .spawn_and_block_on(async move {
84 let (mut shutdown, _abort_handle) =
85 futures::future::abortable(futures::future::pending::<()>());
86
87 let listener = tokio::net::TcpListener::bind(&address).await?;
88 let graceful = hyper_util::server::graceful::GracefulShutdown::new();
89
90 let http = hyper::server::conn::http1::Builder::new();
91
92 let mut futs = FuturesUnordered::new();
93
94 loop {
95 tokio::select! {
96 Ok((stream, _addr)) = listener.accept() => {
97 let io = hyper_util::rt::tokio::TokioIo::new(stream);
98 let service = hyper_util::service::TowerToHyperService::new(service.clone());
99 let conn = http.serve_connection(io, service);
100 let fut = graceful.watch(conn);
102 futs.push(async move {
103 if let Err(e) = fut.await {
104 eprintln!("Error serving connection: {e:?}");
105 }
106 });
107 },
108
109 _ = futs.next() => {}
110
111 _ = &mut shutdown => {
112 tracing::info!("Shutting down gracefully");
113 break;
115 }
116 }
117 }
118
119 Ok::<_, anyhow::Error>(())
120 })
121 .context("Unable to start the server")??;
122
123 Ok(())
124 }
125}
126
127#[derive(Debug, Clone)]
128pub struct Config {
129 pub(crate) inner: WasiRunner,
130 pub(crate) addr: SocketAddr,
131 pub(crate) pkg: BinaryPackage,
132 pub(crate) proxy_connect_init_timeout: Duration,
133 pub(crate) proxy_connect_nominal_timeout: Duration,
134}
135
136impl Config {
137 pub fn new(inner: WasiRunner, pkg: &BinaryPackage) -> Self {
138 Self {
139 inner,
140 pkg: pkg.clone(),
141 addr: ([127, 0, 0, 1], 8000).into(),
142 proxy_connect_init_timeout: Duration::from_secs(30),
143 proxy_connect_nominal_timeout: Duration::from_secs(30),
144 }
145 }
146
147 pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
148 self.addr = addr;
149 self
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn send_and_sync() {
159 fn assert_send<T: Send>() {}
160 fn assert_sync<T: Sync>() {}
161
162 assert_send::<DProxyRunner>();
163 assert_sync::<DProxyRunner>();
164 }
165}