wasmer_argus/argus/
mod.rs

1mod config;
2mod packages;
3mod tester;
4
5use self::tester::{TestReport, Tester};
6pub use config::*;
7use indicatif::{MultiProgress, ProgressBar};
8use reqwest::header::CONTENT_TYPE;
9use std::{fs::OpenOptions, io::Write as _, ops::AddAssign, path::Path, sync::Arc, time::Duration};
10use tokio::{
11    sync::{
12        Mutex, Semaphore,
13        mpsc::{self, UnboundedSender},
14    },
15    task::JoinSet,
16};
17use tracing::*;
18use url::Url;
19use wasmer_backend_api::{WasmerClient, types::PackageVersionWithPackage};
20
21#[derive(Debug, Clone)]
22pub struct Argus {
23    pub config: ArgusConfig,
24    pub client: WasmerClient,
25}
26
27impl TryFrom<ArgusConfig> for Argus {
28    type Error = anyhow::Error;
29
30    fn try_from(config: ArgusConfig) -> Result<Self, Self::Error> {
31        let client = WasmerClient::new(Url::parse(&config.registry_url)?, "wasmer-argus")?;
32
33        let client = client.with_auth_token(config.auth_token.clone());
34        Ok(Argus { client, config })
35    }
36}
37
38impl Argus {
39    /// Start the testsuite using the configuration in [`Self::config`]
40    pub async fn run(self) -> anyhow::Result<()> {
41        info!("fetching packages from {}", self.config.registry_url);
42
43        let m = MultiProgress::new();
44        let (s, mut r) = mpsc::unbounded_channel();
45        let (successes_sx, successes_rx) = mpsc::unbounded_channel();
46        let (failures_sx, failures_rx) = mpsc::unbounded_channel();
47
48        let mut pool = JoinSet::new();
49        let c = Arc::new(self.config.clone());
50
51        {
52            let this = self.clone();
53            let bar = m.add(ProgressBar::new(0));
54            pool.spawn(async move {
55                this.fetch_packages(s, bar, c.clone(), successes_rx, failures_rx)
56                    .await
57            });
58        }
59
60        let mut count = 0;
61        let successes = Arc::new(Mutex::new(0));
62        let failures = Arc::new(Mutex::new(0));
63
64        let c = Arc::new(self.config.clone());
65        let sem = Arc::new(Semaphore::new(self.config.jobs));
66
67        while let Some(pkg) = r.recv().await {
68            let c = c.clone();
69            let bar = m.add(ProgressBar::new(0));
70            let permit = Arc::clone(&sem).acquire_owned().await;
71            let successes_sx = successes_sx.clone();
72            let failures_sx = failures_sx.clone();
73            let failures = failures.clone();
74            let successes = successes.clone();
75
76            pool.spawn(async move {
77                let _permit = permit;
78                match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await {
79                    Err(e) => {
80                        failures.lock().await.add_assign(1);
81                        Err(e)
82                    }
83                    Ok(true) => {
84                        successes.lock().await.add_assign(1);
85                        Ok(())
86                    }
87                    Ok(false) => {
88                        failures.lock().await.add_assign(1);
89                        Ok(())
90                    }
91                }
92            });
93
94            count += 1;
95        }
96
97        while let Some(t) = pool.join_next().await {
98            if let Err(e) = t {
99                error!("{:?}", e)
100            }
101        }
102
103        if let Some(webhook_url) = self.config.webhook_url {
104            let url = url::Url::parse(&webhook_url)?;
105            reqwest::Client::new()
106                .post(url)
107                .header(CONTENT_TYPE, "application/json")
108                .body(format!(
109                    r#"{{"text":"Argus run report: {} tests succeeded, {} failed"}}"#,
110                    successes.lock().await,
111                    failures.lock().await
112                ))
113                .send()
114                .await?;
115        }
116
117        info!("done!");
118        Ok(())
119    }
120
121    /// Perform the test for a single package
122    async fn test(
123        test_id: u64,
124        config: Arc<ArgusConfig>,
125        package: &PackageVersionWithPackage,
126        p: ProgressBar,
127        successes_sx: UnboundedSender<()>,
128        failures_sx: UnboundedSender<()>,
129    ) -> anyhow::Result<bool> {
130        p.set_style(
131            indicatif::ProgressStyle::with_template(&format!(
132                "[{test_id}] {{spinner:.blue}} {{msg}}"
133            ))
134            .unwrap()
135            .tick_strings(&["✶", "✸", "✹", "✺", "✹", "✷", "✶"]),
136        );
137
138        p.enable_steady_tick(Duration::from_millis(100));
139
140        let package_name = Argus::get_package_id(package);
141        let webc_v2_url: Url = match &package.distribution_v2.pirita_download_url {
142            Some(url) => url.parse().unwrap(),
143            None => {
144                info!("package {} has no download url, skipping", package_name);
145                p.finish_and_clear();
146                return Ok(true);
147            }
148        };
149
150        let webc_v3_url: Url = match &package.distribution_v3.pirita_download_url {
151            Some(url) => url.parse().unwrap(),
152            None => {
153                info!("package {} has no download url, skipping", package_name);
154                p.finish_and_clear();
155                return Ok(true);
156            }
157        };
158
159        p.set_message(format!("[{test_id}] testing package {package_name}"));
160
161        let path = Argus::get_path(config.clone(), package).await;
162        p.set_message(format!(
163            "testing package {package_name} -- path to download to is: {path:?}",
164        ));
165
166        #[cfg(not(feature = "wasmer_lib"))]
167        let runner = Box::new(tester::cli_tester::CLIRunner::new(
168            test_id, config, &p, package,
169        )) as Box<dyn Tester>;
170
171        #[cfg(feature = "wasmer_lib")]
172        let runner = if config.use_lib {
173            Box::new(tester::lib_tester::LibRunner::new(
174                test_id, config, &p, package,
175            )) as Box<dyn Tester>
176        } else {
177            Box::new(tester::cli_tester::CLIRunner::new(
178                test_id, config, &p, package,
179            )) as Box<dyn Tester>
180        };
181
182        if !runner.is_to_test().await {
183            return Ok(true);
184        }
185
186        Argus::download_webcs(test_id, &path, &webc_v2_url, &webc_v3_url, &p).await?;
187
188        info!("package downloaded!");
189
190        p.reset();
191        p.set_style(
192            indicatif::ProgressStyle::with_template(&format!(
193                "[{test_id}/{package_name}] {{spinner:.blue}} {{msg}}"
194            ))
195            .unwrap()
196            .tick_strings(&["✶", "✸", "✹", "✺", "✹", "✷", "✶"]),
197        );
198
199        p.enable_steady_tick(Duration::from_millis(100));
200
201        p.set_message("package downloaded");
202
203        let report = runner.run_test().await?;
204
205        let outcome = report.outcome.is_ok();
206
207        if outcome {
208            successes_sx.send(())?;
209        } else {
210            failures_sx.send(())?;
211        };
212        Argus::write_report(&path, report).await?;
213
214        p.finish_with_message(format!("test for package {package_name} done!"));
215        p.finish_and_clear();
216
217        Ok(outcome)
218    }
219
220    /// Checks whether or not the package should be tested
221    async fn to_test(&self, pkg: &PackageVersionWithPackage) -> bool {
222        let name = Argus::get_package_id(pkg);
223
224        info!("checking if package {name} needs to be tested or not");
225
226        let dir_path = std::path::PathBuf::from(&self.config.outdir);
227
228        if !dir_path.exists() {
229            return true;
230        }
231
232        if pkg.distribution_v2.pirita_sha256_hash.is_none() {
233            info!("skipping test for {name} as it has no hash");
234            return false;
235        }
236
237        true
238    }
239
240    #[tracing::instrument]
241    async fn write_report(path: &Path, result: TestReport) -> anyhow::Result<()> {
242        let test_results_path = path.join(format!(
243            "result-{}-{}--{}-{}.json",
244            result.runner_id,
245            result.runner_version,
246            std::env::consts::ARCH,
247            std::env::consts::OS,
248        ));
249
250        let mut file = OpenOptions::new()
251            .write(true)
252            .create(true)
253            .truncate(true)
254            .open(test_results_path)?;
255
256        file.write_all(serde_json::to_string(&result).unwrap().as_bytes())?;
257        Ok(())
258    }
259}