wasmer_argus/argus/
packages.rs

1use super::Argus;
2use crate::ArgusConfig;
3use futures::StreamExt;
4use indicatif::{ProgressBar, ProgressStyle};
5use reqwest::{Client, header};
6use std::{path::PathBuf, sync::Arc, time::Duration};
7use tokio::{
8    fs::File,
9    io::AsyncWriteExt,
10    sync::mpsc::{UnboundedReceiver, UnboundedSender},
11};
12use tracing::*;
13use url::Url;
14use wasmer_backend_api::{
15    query::get_package_versions_stream,
16    types::{AllPackageVersionsVars, PackageVersionSortBy, PackageVersionWithPackage},
17};
18
19impl Argus {
20    /// Fetch all packages from the registry
21    #[tracing::instrument(skip(self, s, p))]
22    pub async fn fetch_packages(
23        &self,
24        s: UnboundedSender<PackageVersionWithPackage>,
25        p: ProgressBar,
26        config: Arc<ArgusConfig>,
27        successes_rx: UnboundedReceiver<()>,
28        failures_rx: UnboundedReceiver<()>,
29    ) -> anyhow::Result<()> {
30        info!("starting to fetch packages..");
31        let vars = AllPackageVersionsVars {
32            sort_by: Some(PackageVersionSortBy::Oldest),
33            ..Default::default()
34        };
35
36        p.set_style(
37            ProgressStyle::with_template("{spinner:.blue} {msg}")
38                .unwrap()
39                .tick_strings(&["✶", "✸", "✹", "✺", "✹", "✷", "✶"]),
40        );
41        p.enable_steady_tick(Duration::from_millis(1000));
42
43        let mut count = 0;
44
45        let call = get_package_versions_stream(&self.client, vars.clone());
46        futures::pin_mut!(call);
47        p.set_message("starting to fetch packages..".to_string());
48
49        while let Some(pkgs) = call.next().await {
50            let pkgs = match pkgs {
51                Ok(pkgs) => pkgs,
52                Err(e) => {
53                    error!("failed to fetch packages: {e}");
54                    p.finish_and_clear();
55                    anyhow::bail!("failed to fetch packages: {e}")
56                }
57            };
58            p.set_message(format!(
59                "fetched {} packages [ok: {}, err: {}]",
60                count,
61                successes_rx.len(),
62                failures_rx.len()
63            ));
64            count += pkgs.len();
65
66            for pkg in pkgs {
67                if self.to_test(&pkg).await {
68                    if let Err(e) = s.send(pkg) {
69                        error!("failed to send packages: {e}");
70                        p.finish_and_clear();
71                        anyhow::bail!("failed to send packages: {e}")
72                    };
73                }
74            }
75        }
76
77        p.finish_with_message(format!("fetched {count} packages"));
78        info!("finished fetching packages: fetched {count} packages, closing channel");
79        drop(s);
80        Ok(())
81    }
82
83    #[tracing::instrument(skip(p))]
84    pub(crate) async fn download_webcs<'a>(
85        test_id: u64,
86        path: &'a PathBuf,
87        webc_v2_url: &'a Url,
88        webc_v3_url: &'a Url,
89        p: &'a ProgressBar,
90    ) -> anyhow::Result<()> {
91        Argus::download_package(test_id, &path.join("package_v2.webc"), webc_v2_url, p).await?;
92        Argus::download_package(test_id, &path.join("package_v3.webc"), webc_v3_url, p).await?;
93        Ok(())
94    }
95
96    async fn download_package<'a>(
97        test_id: u64,
98        path: &'a PathBuf,
99        url: &'a Url,
100        p: &'a ProgressBar,
101    ) -> anyhow::Result<()> {
102        info!("downloading package from {} to file {:?}", url, path);
103        static APP_USER_AGENT: &str =
104            concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
105
106        let mut dir_path = path.clone();
107        dir_path.pop();
108
109        if !dir_path.exists() {
110            tokio::fs::create_dir_all(dir_path).await?;
111        } else if dir_path.exists() && !dir_path.is_dir() {
112            anyhow::bail!("path {path:?} exists, but it is not a directory!")
113        }
114
115        let client = Client::builder().user_agent(APP_USER_AGENT).build()?;
116
117        let download_size = {
118            let resp = client.head(url.as_str()).send().await?;
119            if resp.status().is_success() {
120                resp.headers()
121                    .get(header::CONTENT_LENGTH)
122                    .and_then(|ct_len| ct_len.to_str().ok())
123                    .and_then(|ct_len| ct_len.parse().ok())
124                    .unwrap_or(0) // Fallback to 0
125            } else {
126                let status = resp.status();
127                anyhow::bail!("Couldn't fetch head from URL {url}. Error: {status:?}")
128            }
129        };
130
131        let request = client.get(url.as_str());
132
133        p.set_length(download_size);
134
135        p.set_style(
136            ProgressStyle::default_bar()
137                .template(&format!(
138                    "[{test_id}] [{{bar:40.cyan/blue}}] {{bytes}}/{{total_bytes}} - {{msg}}"
139                ))
140                .unwrap()
141                .progress_chars("#>-"),
142        );
143
144        p.set_message(format!("downloading from {url}"));
145
146        let mut outfile = match File::create(&path).await {
147            Ok(o) => o,
148            Err(e) => {
149                error!(
150                    "[{test_id}] failed to create file at {:?}. Error: {e}",
151                    path.display()
152                );
153
154                p.finish_and_clear();
155
156                let path_display = path.display();
157                anyhow::bail!("[{test_id}] failed to create file at {path_display}. Error: {e}");
158            }
159        };
160        let mut download = match request.send().await {
161            Ok(d) => d,
162            Err(e) => {
163                error!("[{test_id}] failed to download from URL {url}. Error: {e}");
164                p.finish_and_clear();
165                anyhow::bail!("[{test_id}] failed to download from URL {url}. Error: {e}");
166            }
167        };
168
169        loop {
170            match download.chunk().await {
171                Err(e) => {
172                    error!(
173                        "[{test_id}] failed to download chunk from {:?}. Error: {e}",
174                        download
175                    );
176                    p.finish_and_clear();
177                    anyhow::bail!(
178                        "[{test_id}] failed to download chunk from {download:?}. Error: {e}"
179                    );
180                }
181                Ok(chunk) => {
182                    if let Some(chunk) = chunk {
183                        p.inc(chunk.len() as u64);
184                        if let Err(e) = outfile.write(&chunk).await {
185                            error!(
186                                "[{test_id}] failed to write chunk to file {:?}. Error: {e}",
187                                outfile
188                            );
189                            p.finish_and_clear();
190                            anyhow::bail!(
191                                "[{test_id}] failed to write chunk to file {outfile:?}. Error: {e}"
192                            );
193                        };
194                    } else {
195                        break;
196                    }
197                }
198            }
199        }
200
201        outfile.flush().await?;
202        drop(outfile);
203
204        Ok(())
205    }
206
207    /// Return the complete path to the folder of the test for the package, from the outdir to the
208    /// hash
209    pub async fn get_path(config: Arc<ArgusConfig>, pkg: &PackageVersionWithPackage) -> PathBuf {
210        let hash = match &pkg.distribution_v2.pirita_sha256_hash {
211            Some(hash) => hash,
212            None => {
213                unreachable!("no package without an hash should reach this function!")
214            }
215        };
216
217        let _namespace = match &pkg.package.namespace {
218            Some(ns) => ns.replace('/', "_"),
219            None => "unknown_namespace".to_owned(),
220        };
221
222        config.outdir.join(hash)
223    }
224
225    pub fn get_package_id(pkg: &PackageVersionWithPackage) -> String {
226        let namespace = match &pkg.package.namespace {
227            Some(namespace) => namespace.replace('/', "_"),
228            None => String::from("unknown_namespace"),
229        };
230        format!(
231            "{}/{}_v{}",
232            namespace,
233            pkg.package.package_name.replace('/', "_"),
234            pkg.version
235        )
236    }
237}