1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use std::sync::Arc;

use hyper_util::rt::TokioIo;
use tokio_stream::wrappers::BroadcastStream;

use super::socket_manager::SocketManager;

use super::*;

/// A Connector for the WASM processes behind a socket.
#[derive(Debug, Clone)]
pub struct HyperProxyConnector {
    pub(super) socket_manager: Arc<SocketManager>,
}

impl HyperProxyConnector {
    pub fn shutdown(&self) {
        self.socket_manager.shutdown();
    }
}

impl tower::Service<Uri> for HyperProxyConnector {
    type Response = HyperProxyStream;
    type Error = BoxError;

    #[allow(clippy::type_complexity)]
    type Future = Pin<Box<dyn Future<Output = Result<HyperProxyStream, BoxError>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, _dst: Uri) -> Self::Future {
        let this = self.clone();
        Box::pin(async move {
            let terminate_rx = this.socket_manager.terminate_rx();
            let socket = this.socket_manager.acquire_http_socket().await?;
            let (tx, rx) = socket.split();
            Ok(HyperProxyStream {
                tx,
                rx: TokioIo::new(rx),
                terminate: BroadcastStream::new(terminate_rx),
                terminated: false,
            })
        })
    }
}