wasmer_wasix/runners/dproxy/
factory.rs1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4 task::Context,
5 time::Instant,
6};
7
8use hyper_util::rt::TokioExecutor;
9use wasmer_journal::{DynJournal, RecombinedJournal};
10
11use crate::{
12 runners::Runner,
13 runtime::{DynRuntime, OverriddenRuntime},
14};
15
16use super::{
17 handler::Handler, hyper_proxy::HyperProxyConnectorBuilder, instance::DProxyInstance,
18 networking::LocalWithLoopbackNetworking, shard::Shard, socket_manager::SocketManager,
19};
20
21#[derive(Debug, Default)]
22struct State {
23 instance: HashMap<Shard, DProxyInstance>,
24}
25
26#[derive(Debug, Clone, Default)]
29pub struct DProxyInstanceFactory {
30 state: Arc<Mutex<State>>,
31}
32
33impl DProxyInstanceFactory {
34 pub fn new() -> Self {
35 Default::default()
36 }
37
38 pub async fn acquire(&self, handler: &Handler, shard: Shard) -> anyhow::Result<DProxyInstance> {
39 loop {
40 {
41 let state = self.state.lock().unwrap();
42 if let Some(instance) = state.instance.get(&shard).cloned() {
43 return Ok(instance);
44 }
45 }
46
47 let instance = self.spin_up(handler, shard.clone()).await?;
48
49 let mut state = self.state.lock().unwrap();
50 state.instance.insert(shard.clone(), instance);
51 }
52 }
53
54 pub async fn spin_up(&self, handler: &Handler, shard: Shard) -> anyhow::Result<DProxyInstance> {
55 let runtime = handler.runtime.clone();
57
58 let journals = runtime
61 .writable_journals()
62 .map(|journal| {
63 let rx = journal.as_restarted()?;
64 let combined = RecombinedJournal::new(journal, rx);
65 anyhow::Result::Ok(Arc::new(combined) as Arc<DynJournal>)
66 })
67 .collect::<anyhow::Result<Vec<_>>>()?;
68 let mut runtime = OverriddenRuntime::new(runtime).with_writable_journals(journals);
69
70 let composite_networking = LocalWithLoopbackNetworking::new();
73 let poll_listening = {
74 let networking = composite_networking.clone();
75 Arc::new(move |cx: &mut Context<'_>| networking.poll_listening(cx))
76 };
77 let socket_manager = Arc::new(SocketManager::new(
78 poll_listening,
79 composite_networking.loopback_networking(),
80 handler.config.proxy_connect_init_timeout,
81 handler.config.proxy_connect_nominal_timeout,
82 ));
83 runtime = runtime.with_networking(Arc::new(composite_networking));
84
85 let connector = HyperProxyConnectorBuilder::new(socket_manager.clone())
87 .build()
88 .await;
89
90 #[cfg(feature = "sys")]
92 let handle = tokio::runtime::Handle::current();
93 let this = self.clone();
94 let pkg = handler.config.pkg.clone();
95 let command_name = handler.command_name.clone();
96 let connector_inner = connector.clone();
97 let runtime = Arc::new(runtime) as Arc<DynRuntime>;
98 let mut runner = handler.config.inner.clone();
99 runtime
100 .task_manager()
101 .clone()
102 .task_dedicated(Box::new(move || {
103 #[cfg(feature = "sys")]
104 let _guard = handle.enter();
105 if let Err(err) = Runner::run_command(&mut runner, &command_name, &pkg, runtime) {
106 tracing::error!("Instance Exited: {}", err);
107 } else {
108 tracing::info!("Instance Exited: Nominal");
109 }
110 {
111 let mut state = this.state.lock().unwrap();
112 state.instance.remove(&shard);
113 }
114 connector_inner.shutdown();
115 }))?;
116
117 Ok(DProxyInstance {
119 last_used: Arc::new(Mutex::new(Instant::now())),
120 socket_manager,
121 client: hyper_util::client::legacy::Client::builder(TokioExecutor::new())
122 .build(connector),
123 })
124 }
125}