wasmer_argus/argus/
packages.rs1use super::Argus;
2use crate::ArgusConfig;
3use futures::StreamExt;
4use indicatif::{ProgressBar, ProgressStyle};
5use reqwest::{Client, header};
6use std::{path::PathBuf, sync::Arc, time::Duration};
7use tokio::{
8 fs::File,
9 io::AsyncWriteExt,
10 sync::mpsc::{UnboundedReceiver, UnboundedSender},
11};
12use tracing::*;
13use url::Url;
14use wasmer_backend_api::{
15 query::get_package_versions_stream,
16 types::{AllPackageVersionsVars, PackageVersionSortBy, PackageVersionWithPackage},
17};
18
19impl Argus {
20 #[tracing::instrument(skip(self, s, p))]
22 pub async fn fetch_packages(
23 &self,
24 s: UnboundedSender<PackageVersionWithPackage>,
25 p: ProgressBar,
26 config: Arc<ArgusConfig>,
27 successes_rx: UnboundedReceiver<()>,
28 failures_rx: UnboundedReceiver<()>,
29 ) -> anyhow::Result<()> {
30 info!("starting to fetch packages..");
31 let vars = AllPackageVersionsVars {
32 sort_by: Some(PackageVersionSortBy::Oldest),
33 ..Default::default()
34 };
35
36 p.set_style(
37 ProgressStyle::with_template("{spinner:.blue} {msg}")
38 .unwrap()
39 .tick_strings(&["✶", "✸", "✹", "✺", "✹", "✷", "✶"]),
40 );
41 p.enable_steady_tick(Duration::from_millis(1000));
42
43 let mut count = 0;
44
45 let call = get_package_versions_stream(&self.client, vars.clone());
46 futures::pin_mut!(call);
47 p.set_message("starting to fetch packages..".to_string());
48
49 while let Some(pkgs) = call.next().await {
50 let pkgs = match pkgs {
51 Ok(pkgs) => pkgs,
52 Err(e) => {
53 error!("failed to fetch packages: {e}");
54 p.finish_and_clear();
55 anyhow::bail!("failed to fetch packages: {e}")
56 }
57 };
58 p.set_message(format!(
59 "fetched {} packages [ok: {}, err: {}]",
60 count,
61 successes_rx.len(),
62 failures_rx.len()
63 ));
64 count += pkgs.len();
65
66 for pkg in pkgs {
67 if self.to_test(&pkg).await {
68 if let Err(e) = s.send(pkg) {
69 error!("failed to send packages: {e}");
70 p.finish_and_clear();
71 anyhow::bail!("failed to send packages: {e}")
72 };
73 }
74 }
75 }
76
77 p.finish_with_message(format!("fetched {count} packages"));
78 info!("finished fetching packages: fetched {count} packages, closing channel");
79 drop(s);
80 Ok(())
81 }
82
83 #[tracing::instrument(skip(p))]
84 pub(crate) async fn download_webcs<'a>(
85 test_id: u64,
86 path: &'a PathBuf,
87 webc_v2_url: &'a Url,
88 webc_v3_url: &'a Url,
89 p: &'a ProgressBar,
90 ) -> anyhow::Result<()> {
91 Argus::download_package(test_id, &path.join("package_v2.webc"), webc_v2_url, p).await?;
92 Argus::download_package(test_id, &path.join("package_v3.webc"), webc_v3_url, p).await?;
93 Ok(())
94 }
95
96 async fn download_package<'a>(
97 test_id: u64,
98 path: &'a PathBuf,
99 url: &'a Url,
100 p: &'a ProgressBar,
101 ) -> anyhow::Result<()> {
102 info!("downloading package from {} to file {:?}", url, path);
103 static APP_USER_AGENT: &str =
104 concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
105
106 let mut dir_path = path.clone();
107 dir_path.pop();
108
109 if !dir_path.exists() {
110 tokio::fs::create_dir_all(dir_path).await?;
111 } else if dir_path.exists() && !dir_path.is_dir() {
112 anyhow::bail!("path {path:?} exists, but it is not a directory!")
113 }
114
115 let client = Client::builder().user_agent(APP_USER_AGENT).build()?;
116
117 let download_size = {
118 let resp = client.head(url.as_str()).send().await?;
119 if resp.status().is_success() {
120 resp.headers()
121 .get(header::CONTENT_LENGTH)
122 .and_then(|ct_len| ct_len.to_str().ok())
123 .and_then(|ct_len| ct_len.parse().ok())
124 .unwrap_or(0) } else {
126 let status = resp.status();
127 anyhow::bail!("Couldn't fetch head from URL {url}. Error: {status:?}")
128 }
129 };
130
131 let request = client.get(url.as_str());
132
133 p.set_length(download_size);
134
135 p.set_style(
136 ProgressStyle::default_bar()
137 .template(&format!(
138 "[{test_id}] [{{bar:40.cyan/blue}}] {{bytes}}/{{total_bytes}} - {{msg}}"
139 ))
140 .unwrap()
141 .progress_chars("#>-"),
142 );
143
144 p.set_message(format!("downloading from {url}"));
145
146 let mut outfile = match File::create(&path).await {
147 Ok(o) => o,
148 Err(e) => {
149 error!(
150 "[{test_id}] failed to create file at {:?}. Error: {e}",
151 path.display()
152 );
153
154 p.finish_and_clear();
155
156 let path_display = path.display();
157 anyhow::bail!("[{test_id}] failed to create file at {path_display}. Error: {e}");
158 }
159 };
160 let mut download = match request.send().await {
161 Ok(d) => d,
162 Err(e) => {
163 error!("[{test_id}] failed to download from URL {url}. Error: {e}");
164 p.finish_and_clear();
165 anyhow::bail!("[{test_id}] failed to download from URL {url}. Error: {e}");
166 }
167 };
168
169 loop {
170 match download.chunk().await {
171 Err(e) => {
172 error!(
173 "[{test_id}] failed to download chunk from {:?}. Error: {e}",
174 download
175 );
176 p.finish_and_clear();
177 anyhow::bail!(
178 "[{test_id}] failed to download chunk from {download:?}. Error: {e}"
179 );
180 }
181 Ok(chunk) => {
182 if let Some(chunk) = chunk {
183 p.inc(chunk.len() as u64);
184 if let Err(e) = outfile.write(&chunk).await {
185 error!(
186 "[{test_id}] failed to write chunk to file {:?}. Error: {e}",
187 outfile
188 );
189 p.finish_and_clear();
190 anyhow::bail!(
191 "[{test_id}] failed to write chunk to file {outfile:?}. Error: {e}"
192 );
193 };
194 } else {
195 break;
196 }
197 }
198 }
199 }
200
201 outfile.flush().await?;
202 drop(outfile);
203
204 Ok(())
205 }
206
207 pub async fn get_path(config: Arc<ArgusConfig>, pkg: &PackageVersionWithPackage) -> PathBuf {
210 let hash = match &pkg.distribution_v2.pirita_sha256_hash {
211 Some(hash) => hash,
212 None => {
213 unreachable!("no package without an hash should reach this function!")
214 }
215 };
216
217 let _namespace = match &pkg.package.namespace {
218 Some(ns) => ns.replace('/', "_"),
219 None => "unknown_namespace".to_owned(),
220 };
221
222 config.outdir.join(hash)
223 }
224
225 pub fn get_package_id(pkg: &PackageVersionWithPackage) -> String {
226 let namespace = match &pkg.package.namespace {
227 Some(namespace) => namespace.replace('/', "_"),
228 None => String::from("unknown_namespace"),
229 };
230 format!(
231 "{}/{}_v{}",
232 namespace,
233 pkg.package.package_name.replace('/', "_"),
234 pkg.version
235 )
236 }
237}