wasmer_wasix/runners/dproxy/
factory.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex},
4    task::Context,
5    time::Instant,
6};
7
8use hyper_util::rt::TokioExecutor;
9use wasmer_journal::{DynJournal, RecombinedJournal};
10
11use crate::{
12    runners::Runner,
13    runtime::{DynRuntime, OverriddenRuntime},
14};
15
16use super::{
17    handler::Handler, hyper_proxy::HyperProxyConnectorBuilder, instance::DProxyInstance,
18    networking::LocalWithLoopbackNetworking, shard::Shard, socket_manager::SocketManager,
19};
20
21#[derive(Debug, Default)]
22struct State {
23    instance: HashMap<Shard, DProxyInstance>,
24}
25
26/// This factory will store and reuse instances between invocations thus
27/// allowing for the instances to be stateful.
28#[derive(Debug, Clone, Default)]
29pub struct DProxyInstanceFactory {
30    state: Arc<Mutex<State>>,
31}
32
33impl DProxyInstanceFactory {
34    pub fn new() -> Self {
35        Default::default()
36    }
37
38    pub async fn acquire(&self, handler: &Handler, shard: Shard) -> anyhow::Result<DProxyInstance> {
39        loop {
40            {
41                let state = self.state.lock().unwrap();
42                if let Some(instance) = state.instance.get(&shard).cloned() {
43                    return Ok(instance);
44                }
45            }
46
47            let instance = self.spin_up(handler, shard.clone()).await?;
48
49            let mut state = self.state.lock().unwrap();
50            state.instance.insert(shard.clone(), instance);
51        }
52    }
53
54    pub async fn spin_up(&self, handler: &Handler, shard: Shard) -> anyhow::Result<DProxyInstance> {
55        // Get the runtime with its already wired local networking
56        let runtime = handler.runtime.clone();
57
58        // DProxy is able to resume execution of the stateful workload using memory
59        // snapshots hence the journals it stores are complete journals
60        let journals = runtime
61            .writable_journals()
62            .map(|journal| {
63                let rx = journal.as_restarted()?;
64                let combined = RecombinedJournal::new(journal, rx);
65                anyhow::Result::Ok(Arc::new(combined) as Arc<DynJournal>)
66            })
67            .collect::<anyhow::Result<Vec<_>>>()?;
68        let mut runtime = OverriddenRuntime::new(runtime).with_writable_journals(journals);
69
70        // We attach a composite networking to the runtime which includes a loopback
71        // networking implementation connected to a socket manager
72        let composite_networking = LocalWithLoopbackNetworking::new();
73        let poll_listening = {
74            let networking = composite_networking.clone();
75            Arc::new(move |cx: &mut Context<'_>| networking.poll_listening(cx))
76        };
77        let socket_manager = Arc::new(SocketManager::new(
78            poll_listening,
79            composite_networking.loopback_networking(),
80            handler.config.proxy_connect_init_timeout,
81            handler.config.proxy_connect_nominal_timeout,
82        ));
83        runtime = runtime.with_networking(Arc::new(composite_networking));
84
85        // The connector uses the socket manager to open sockets to the instance
86        let connector = HyperProxyConnectorBuilder::new(socket_manager.clone())
87            .build()
88            .await;
89
90        // Now we run the actual instance under a WasiRunner
91        #[cfg(feature = "sys")]
92        let handle = tokio::runtime::Handle::current();
93        let this = self.clone();
94        let pkg = handler.config.pkg.clone();
95        let command_name = handler.command_name.clone();
96        let connector_inner = connector.clone();
97        let runtime = Arc::new(runtime) as Arc<DynRuntime>;
98        let mut runner = handler.config.inner.clone();
99        runtime
100            .task_manager()
101            .clone()
102            .task_dedicated(Box::new(move || {
103                #[cfg(feature = "sys")]
104                let _guard = handle.enter();
105                if let Err(err) = Runner::run_command(&mut runner, &command_name, &pkg, runtime) {
106                    tracing::error!("Instance Exited: {}", err);
107                } else {
108                    tracing::info!("Instance Exited: Nominal");
109                }
110                {
111                    let mut state = this.state.lock().unwrap();
112                    state.instance.remove(&shard);
113                }
114                connector_inner.shutdown();
115            }))?;
116
117        // Return an instance
118        Ok(DProxyInstance {
119            last_used: Arc::new(Mutex::new(Instant::now())),
120            socket_manager,
121            client: hyper_util::client::legacy::Client::builder(TokioExecutor::new())
122                .build(connector),
123        })
124    }
125}