wasmer_wasix/runners/dproxy/hyper_proxy/
connector.rs

1use std::sync::Arc;
2
3use hyper_util::rt::TokioIo;
4use tokio_stream::wrappers::BroadcastStream;
5
6use super::socket_manager::SocketManager;
7
8use super::*;
9
10/// A Connector for the WASM processes behind a socket.
11#[derive(Debug, Clone)]
12pub struct HyperProxyConnector {
13    pub(super) socket_manager: Arc<SocketManager>,
14}
15
16impl HyperProxyConnector {
17    pub fn shutdown(&self) {
18        self.socket_manager.shutdown();
19    }
20}
21
22impl tower::Service<Uri> for HyperProxyConnector {
23    type Response = HyperProxyStream;
24    type Error = BoxError;
25
26    #[allow(clippy::type_complexity)]
27    type Future = Pin<Box<dyn Future<Output = Result<HyperProxyStream, BoxError>> + Send>>;
28
29    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30        Poll::Ready(Ok(()))
31    }
32
33    fn call(&mut self, _dst: Uri) -> Self::Future {
34        let this = self.clone();
35        Box::pin(async move {
36            let terminate_rx = this.socket_manager.terminate_rx();
37            let socket = this.socket_manager.acquire_http_socket().await?;
38            let (tx, rx) = socket.split();
39            Ok(HyperProxyStream {
40                tx,
41                rx: TokioIo::new(rx),
42                terminate: BroadcastStream::new(terminate_rx),
43                terminated: false,
44            })
45        })
46    }
47}