1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::{ops::Deref, pin::Pin, sync::Arc};

use anyhow::Error;
use futures::{Future, FutureExt};
use http::{Request, Response};

use crate::runners::wcgi;

use super::DcgiInstanceFactory;

use super::super::Body;

/// The shared object that manages the instantiaion of WASI executables and
/// communicating with them via the CGI protocol.
#[derive(Clone, Debug)]
pub(crate) struct Handler {
    state: Arc<SharedState>,
    inner: wcgi::Handler,
}

impl Handler {
    pub(crate) fn new(handler: wcgi::Handler) -> Self {
        Handler {
            state: Arc::new(SharedState {
                inner: handler.deref().clone(),
                factory: DcgiInstanceFactory::new(),
                master_lock: Default::default(),
            }),
            inner: handler,
        }
    }

    #[tracing::instrument(level = "debug", skip_all, err)]
    pub(crate) async fn handle(
        &self,
        req: Request<hyper::body::Incoming>,
    ) -> Result<Response<Body>, Error> {
        // we acquire a guard token so that only one request at a time can be processed
        // which effectively means that DCGI is single-threaded. This is a limitation
        // of the MVP which should be rectified in future releases.
        let guard_token = self.state.master_lock.clone().lock_owned().await;

        // Process the request as a normal WCGI request
        self.inner.handle(req, guard_token).await
    }
}

impl Deref for Handler {
    type Target = Arc<SharedState>;

    fn deref(&self) -> &Self::Target {
        &self.state
    }
}

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct SharedState {
    #[allow(dead_code)]
    pub(crate) inner: Arc<wcgi::SharedState>,
    factory: DcgiInstanceFactory,
    master_lock: Arc<tokio::sync::Mutex<()>>,
}

impl tower::Service<Request<hyper::body::Incoming>> for Handler {
    type Response = Response<Body>;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: Request<hyper::body::Incoming>) -> Self::Future {
        // Note: all fields are reference-counted so cloning is pretty cheap
        let handler = self.clone();
        let fut = async move { handler.handle(request).await };
        fut.boxed()
    }
}