1mod config;
2mod packages;
3mod tester;
4
5use self::tester::{TestReport, Tester};
6pub use config::*;
7use indicatif::{MultiProgress, ProgressBar};
8use reqwest::header::CONTENT_TYPE;
9use std::{fs::OpenOptions, io::Write as _, ops::AddAssign, path::Path, sync::Arc, time::Duration};
10use tokio::{
11 sync::{
12 Mutex, Semaphore,
13 mpsc::{self, UnboundedSender},
14 },
15 task::JoinSet,
16};
17use tracing::*;
18use url::Url;
19use wasmer_backend_api::{WasmerClient, types::PackageVersionWithPackage};
20
21#[derive(Debug, Clone)]
22pub struct Argus {
23 pub config: ArgusConfig,
24 pub client: WasmerClient,
25}
26
27impl TryFrom<ArgusConfig> for Argus {
28 type Error = anyhow::Error;
29
30 fn try_from(config: ArgusConfig) -> Result<Self, Self::Error> {
31 let client = WasmerClient::new(Url::parse(&config.registry_url)?, "wasmer-argus")?;
32
33 let client = client.with_auth_token(config.auth_token.clone());
34 Ok(Argus { client, config })
35 }
36}
37
38impl Argus {
39 pub async fn run(self) -> anyhow::Result<()> {
41 info!("fetching packages from {}", self.config.registry_url);
42
43 let m = MultiProgress::new();
44 let (s, mut r) = mpsc::unbounded_channel();
45 let (successes_sx, successes_rx) = mpsc::unbounded_channel();
46 let (failures_sx, failures_rx) = mpsc::unbounded_channel();
47
48 let mut pool = JoinSet::new();
49 let c = Arc::new(self.config.clone());
50
51 {
52 let this = self.clone();
53 let bar = m.add(ProgressBar::new(0));
54 pool.spawn(async move {
55 this.fetch_packages(s, bar, c.clone(), successes_rx, failures_rx)
56 .await
57 });
58 }
59
60 let mut count = 0;
61 let successes = Arc::new(Mutex::new(0));
62 let failures = Arc::new(Mutex::new(0));
63
64 let c = Arc::new(self.config.clone());
65 let sem = Arc::new(Semaphore::new(self.config.jobs));
66
67 while let Some(pkg) = r.recv().await {
68 let c = c.clone();
69 let bar = m.add(ProgressBar::new(0));
70 let permit = Arc::clone(&sem).acquire_owned().await;
71 let successes_sx = successes_sx.clone();
72 let failures_sx = failures_sx.clone();
73 let failures = failures.clone();
74 let successes = successes.clone();
75
76 pool.spawn(async move {
77 let _permit = permit;
78 match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await {
79 Err(e) => {
80 failures.lock().await.add_assign(1);
81 Err(e)
82 }
83 Ok(true) => {
84 successes.lock().await.add_assign(1);
85 Ok(())
86 }
87 Ok(false) => {
88 failures.lock().await.add_assign(1);
89 Ok(())
90 }
91 }
92 });
93
94 count += 1;
95 }
96
97 while let Some(t) = pool.join_next().await {
98 if let Err(e) = t {
99 error!("{:?}", e)
100 }
101 }
102
103 if let Some(webhook_url) = self.config.webhook_url {
104 let url = url::Url::parse(&webhook_url)?;
105 reqwest::Client::new()
106 .post(url)
107 .header(CONTENT_TYPE, "application/json")
108 .body(format!(
109 r#"{{"text":"Argus run report: {} tests succeeded, {} failed"}}"#,
110 successes.lock().await,
111 failures.lock().await
112 ))
113 .send()
114 .await?;
115 }
116
117 info!("done!");
118 Ok(())
119 }
120
121 async fn test(
123 test_id: u64,
124 config: Arc<ArgusConfig>,
125 package: &PackageVersionWithPackage,
126 p: ProgressBar,
127 successes_sx: UnboundedSender<()>,
128 failures_sx: UnboundedSender<()>,
129 ) -> anyhow::Result<bool> {
130 p.set_style(
131 indicatif::ProgressStyle::with_template(&format!(
132 "[{test_id}] {{spinner:.blue}} {{msg}}"
133 ))
134 .unwrap()
135 .tick_strings(&["✶", "✸", "✹", "✺", "✹", "✷", "✶"]),
136 );
137
138 p.enable_steady_tick(Duration::from_millis(100));
139
140 let package_name = Argus::get_package_id(package);
141 let webc_v2_url: Url = match &package.distribution_v2.pirita_download_url {
142 Some(url) => url.parse().unwrap(),
143 None => {
144 info!("package {} has no download url, skipping", package_name);
145 p.finish_and_clear();
146 return Ok(true);
147 }
148 };
149
150 let webc_v3_url: Url = match &package.distribution_v3.pirita_download_url {
151 Some(url) => url.parse().unwrap(),
152 None => {
153 info!("package {} has no download url, skipping", package_name);
154 p.finish_and_clear();
155 return Ok(true);
156 }
157 };
158
159 p.set_message(format!("[{test_id}] testing package {package_name}"));
160
161 let path = Argus::get_path(config.clone(), package).await;
162 p.set_message(format!(
163 "testing package {package_name} -- path to download to is: {path:?}",
164 ));
165
166 #[cfg(not(feature = "wasmer_lib"))]
167 let runner = Box::new(tester::cli_tester::CLIRunner::new(
168 test_id, config, &p, package,
169 )) as Box<dyn Tester>;
170
171 #[cfg(feature = "wasmer_lib")]
172 let runner = if config.use_lib {
173 Box::new(tester::lib_tester::LibRunner::new(
174 test_id, config, &p, package,
175 )) as Box<dyn Tester>
176 } else {
177 Box::new(tester::cli_tester::CLIRunner::new(
178 test_id, config, &p, package,
179 )) as Box<dyn Tester>
180 };
181
182 if !runner.is_to_test().await {
183 return Ok(true);
184 }
185
186 Argus::download_webcs(test_id, &path, &webc_v2_url, &webc_v3_url, &p).await?;
187
188 info!("package downloaded!");
189
190 p.reset();
191 p.set_style(
192 indicatif::ProgressStyle::with_template(&format!(
193 "[{test_id}/{package_name}] {{spinner:.blue}} {{msg}}"
194 ))
195 .unwrap()
196 .tick_strings(&["✶", "✸", "✹", "✺", "✹", "✷", "✶"]),
197 );
198
199 p.enable_steady_tick(Duration::from_millis(100));
200
201 p.set_message("package downloaded");
202
203 let report = runner.run_test().await?;
204
205 let outcome = report.outcome.is_ok();
206
207 if outcome {
208 successes_sx.send(())?;
209 } else {
210 failures_sx.send(())?;
211 };
212 Argus::write_report(&path, report).await?;
213
214 p.finish_with_message(format!("test for package {package_name} done!"));
215 p.finish_and_clear();
216
217 Ok(outcome)
218 }
219
220 async fn to_test(&self, pkg: &PackageVersionWithPackage) -> bool {
222 let name = Argus::get_package_id(pkg);
223
224 info!("checking if package {name} needs to be tested or not");
225
226 let dir_path = std::path::PathBuf::from(&self.config.outdir);
227
228 if !dir_path.exists() {
229 return true;
230 }
231
232 if pkg.distribution_v2.pirita_sha256_hash.is_none() {
233 info!("skipping test for {name} as it has no hash");
234 return false;
235 }
236
237 true
238 }
239
240 #[tracing::instrument]
241 async fn write_report(path: &Path, result: TestReport) -> anyhow::Result<()> {
242 let test_results_path = path.join(format!(
243 "result-{}-{}--{}-{}.json",
244 result.runner_id,
245 result.runner_version,
246 std::env::consts::ARCH,
247 std::env::consts::OS,
248 ));
249
250 let mut file = OpenOptions::new()
251 .write(true)
252 .create(true)
253 .truncate(true)
254 .open(test_results_path)?;
255
256 file.write_all(serde_json::to_string(&result).unwrap().as_bytes())?;
257 Ok(())
258 }
259}