1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
    task::Context,
    time::Instant,
};

use hyper_util::rt::TokioExecutor;
use wasmer_journal::{DynJournal, RecombinedJournal};

use crate::{
    runners::Runner,
    runtime::{DynRuntime, OverriddenRuntime},
};

use super::{
    handler::Handler, hyper_proxy::HyperProxyConnectorBuilder, instance::DProxyInstance,
    networking::LocalWithLoopbackNetworking, shard::Shard, socket_manager::SocketManager,
};

#[derive(Debug, Default)]
struct State {
    instance: HashMap<Shard, DProxyInstance>,
}

/// This factory will store and reuse instances between invocations thus
/// allowing for the instances to be stateful.
#[derive(Debug, Clone, Default)]
pub struct DProxyInstanceFactory {
    state: Arc<Mutex<State>>,
}

impl DProxyInstanceFactory {
    pub fn new() -> Self {
        Default::default()
    }

    pub async fn acquire(&self, handler: &Handler, shard: Shard) -> anyhow::Result<DProxyInstance> {
        loop {
            {
                let state = self.state.lock().unwrap();
                if let Some(instance) = state.instance.get(&shard).cloned() {
                    return Ok(instance);
                }
            }

            let instance = self.spin_up(handler, shard.clone()).await?;

            let mut state = self.state.lock().unwrap();
            state.instance.insert(shard.clone(), instance);
        }
    }

    pub async fn spin_up(&self, handler: &Handler, shard: Shard) -> anyhow::Result<DProxyInstance> {
        // Get the runtime with its already wired local networking
        let runtime = handler.runtime.clone();

        // DProxy is able to resume execution of the stateful workload using memory
        // snapshots hence the journals it stores are complete journals
        let journals = runtime
            .journals()
            .clone()
            .into_iter()
            .map(|journal| {
                let tx = journal.clone();
                let rx = journal.as_restarted()?;
                let combined = RecombinedJournal::new(tx, rx);
                anyhow::Result::Ok(Arc::new(combined) as Arc<DynJournal>)
            })
            .collect::<anyhow::Result<Vec<_>>>()?;
        let mut runtime = OverriddenRuntime::new(runtime).with_journals(journals);

        // We attach a composite networking to the runtime which includes a loopback
        // networking implementation connected to a socket manager
        let composite_networking = LocalWithLoopbackNetworking::new();
        let poll_listening = {
            let networking = composite_networking.clone();
            Arc::new(move |cx: &mut Context<'_>| networking.poll_listening(cx))
        };
        let socket_manager = Arc::new(SocketManager::new(
            poll_listening,
            composite_networking.loopback_networking(),
            handler.config.proxy_connect_init_timeout,
            handler.config.proxy_connect_nominal_timeout,
        ));
        runtime = runtime.with_networking(Arc::new(composite_networking));

        // The connector uses the socket manager to open sockets to the instance
        let connector = HyperProxyConnectorBuilder::new(socket_manager.clone())
            .build()
            .await;

        // Now we run the actual instance under a WasiRunner
        #[cfg(feature = "sys")]
        let handle = tokio::runtime::Handle::current();
        let this = self.clone();
        let pkg = handler.config.pkg.clone();
        let command_name = handler.command_name.clone();
        let connector_inner = connector.clone();
        let runtime = Arc::new(runtime) as Arc<DynRuntime>;
        let mut runner = handler.config.inner.clone();
        runtime
            .task_manager()
            .clone()
            .task_dedicated(Box::new(move || {
                #[cfg(feature = "sys")]
                let _guard = handle.enter();
                if let Err(err) = runner.run_command(&command_name, &pkg, runtime) {
                    tracing::error!("Instance Exited: {}", err);
                } else {
                    tracing::info!("Instance Exited: Nominal");
                }
                {
                    let mut state = this.state.lock().unwrap();
                    state.instance.remove(&shard);
                }
                connector_inner.shutdown();
            }))?;

        // Return an instance
        Ok(DProxyInstance {
            last_used: Arc::new(Mutex::new(Instant::now())),
            socket_manager,
            client: hyper_util::client::legacy::Client::builder(TokioExecutor::new())
                .build(connector),
        })
    }
}