wasmer_config/app/
job.rs

1use std::{borrow::Cow, fmt::Display, str::FromStr};
2
3use anyhow::anyhow;
4use serde::{Deserialize, Serialize, de::Error};
5
6use indexmap::IndexMap;
7
8use crate::package::PackageSource;
9
10use super::{AppConfigCapabilityMemoryV1, AppVolume, HttpRequest, pretty_duration::PrettyDuration};
11
12/// Job configuration.
13#[derive(
14    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
15)]
16pub struct Job {
17    name: String,
18    trigger: JobTrigger,
19
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub timeout: Option<PrettyDuration>,
22
23    /// Don't start job if past the due time by this amount,
24    /// instead opting to wait for the next instance of it
25    /// to be triggered.
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub max_schedule_drift: Option<PrettyDuration>,
28
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub retries: Option<u32>,
31
32    /// Maximum percent of "jitter" to introduce between invocations.
33    ///
34    /// Value range: 0-100
35    ///
36    /// Jitter is used to spread out jobs over time.
37    /// The calculation works by multiplying the time between invocations
38    /// by a random amount, and taking the percentage of that random amount.
39    ///
40    /// See also [`Self::jitter_percent_min`] to set a minimum jitter.
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub jitter_percent_max: Option<u8>,
43
44    /// Minimum "jitter" to introduce between invocations.
45    ///
46    /// Value range: 0-100
47    ///
48    /// Jitter is used to spread out jobs over time.
49    /// The calculation works by multiplying the time between invocations
50    /// by a random amount, and taking the percentage of that random amount.
51    ///
52    /// If not specified while `jitter_percent_max` is, it will default to 10%.
53    ///
54    /// See also [`Self::jitter_percent_max`] to set a maximum jitter.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub jitter_percent_min: Option<u8>,
57
58    action: JobAction,
59
60    /// Additional unknown fields.
61    ///
62    /// Exists for forward compatibility for newly added fields.
63    #[serde(flatten)]
64    pub other: IndexMap<String, serde_json::Value>,
65}
66
67// We need this wrapper struct to enable this formatting:
68// job:
69//   action:
70//     execute: ...
71#[derive(
72    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
73)]
74pub struct JobAction {
75    #[serde(flatten)]
76    action: JobActionCase,
77}
78
79#[derive(
80    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
81)]
82#[serde(rename_all = "lowercase")]
83pub enum JobActionCase {
84    Fetch(HttpRequest),
85    Execute(ExecutableJob),
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub struct CronExpression {
90    pub cron: saffron::parse::CronExpr,
91    // Keep the original string form around for serialization purposes.
92    pub parsed_from: String,
93}
94
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub enum JobTrigger {
97    PreDeployment,
98    PostDeployment,
99    Cron(CronExpression),
100    Duration(PrettyDuration),
101}
102
103#[derive(
104    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
105)]
106pub struct ExecutableJob {
107    /// The package that contains the command to run. Defaults to the app config's package.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    package: Option<PackageSource>,
110
111    /// The command to run. Defaults to the package's entrypoint.
112    #[serde(skip_serializing_if = "Option::is_none")]
113    command: Option<String>,
114
115    /// CLI arguments passed to the runner.
116    /// Only applicable for runners that accept CLI arguments.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    cli_args: Option<Vec<String>>,
119
120    /// Environment variables.
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub env: Option<IndexMap<String, String>>,
123
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub capabilities: Option<ExecutableJobCompatibilityMapV1>,
126
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub volumes: Option<Vec<AppVolume>>,
129}
130
131#[derive(
132    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
133)]
134pub struct ExecutableJobCompatibilityMapV1 {
135    /// Instance memory settings.
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub memory: Option<AppConfigCapabilityMemoryV1>,
138
139    /// Additional unknown capabilities.
140    ///
141    /// This provides a small bit of forwards compatibility for newly added
142    /// capabilities.
143    #[serde(flatten)]
144    pub other: IndexMap<String, serde_json::Value>,
145}
146
147impl Serialize for JobTrigger {
148    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149    where
150        S: serde::Serializer,
151    {
152        self.to_string().serialize(serializer)
153    }
154}
155
156impl<'de> Deserialize<'de> for JobTrigger {
157    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
158    where
159        D: serde::Deserializer<'de>,
160    {
161        let repr: Cow<'de, str> = Cow::deserialize(deserializer)?;
162        repr.parse().map_err(D::Error::custom)
163    }
164}
165
166impl Display for JobTrigger {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            Self::PreDeployment => write!(f, "pre-deployment"),
170            Self::PostDeployment => write!(f, "post-deployment"),
171            Self::Cron(cron) => write!(f, "{}", cron.parsed_from),
172            Self::Duration(duration) => write!(f, "{duration}"),
173        }
174    }
175}
176
177impl FromStr for JobTrigger {
178    type Err = anyhow::Error;
179
180    fn from_str(s: &str) -> Result<Self, Self::Err> {
181        if s == "pre-deployment" {
182            Ok(Self::PreDeployment)
183        } else if s == "post-deployment" {
184            Ok(Self::PostDeployment)
185        } else {
186            match s.parse::<CronExpression>() {
187                Ok(expr) => Ok(Self::Cron(expr)),
188                _ => {
189                    if let Ok(duration) = s.parse::<PrettyDuration>() {
190                        Ok(Self::Duration(duration))
191                    } else {
192                        Err(anyhow!(
193                            "Invalid job trigger '{s}'. Must be 'pre-deployment', 'post-deployment', \
194                a valid cron expression such as '0 */5 * * *' or a duration such as '15m'.",
195                        ))
196                    }
197                }
198            }
199        }
200    }
201}
202
203impl FromStr for CronExpression {
204    type Err = Box<dyn std::error::Error + Send + Sync>;
205
206    fn from_str(s: &str) -> Result<Self, Self::Err> {
207        if let Some(predefined_sched) = s.strip_prefix('@') {
208            match predefined_sched {
209                "hourly" => Ok(Self {
210                    cron: "0 * * * *".parse().unwrap(),
211                    parsed_from: s.to_owned(),
212                }),
213                "daily" => Ok(Self {
214                    cron: "0 0 * * *".parse().unwrap(),
215                    parsed_from: s.to_owned(),
216                }),
217                "weekly" => Ok(Self {
218                    cron: "0 0 * * 1".parse().unwrap(),
219                    parsed_from: s.to_owned(),
220                }),
221                "monthly" => Ok(Self {
222                    cron: "0 0 1 * *".parse().unwrap(),
223                    parsed_from: s.to_owned(),
224                }),
225                "yearly" => Ok(Self {
226                    cron: "0 0 1 1 *".parse().unwrap(),
227                    parsed_from: s.to_owned(),
228                }),
229                _ => Err(format!("Invalid cron expression {s}").into()),
230            }
231        } else {
232            // Let's make sure the input string is valid...
233            match s.parse() {
234                Ok(expr) => Ok(Self {
235                    cron: expr,
236                    parsed_from: s.to_owned(),
237                }),
238                Err(_) => Err(format!("Invalid cron expression {s}").into()),
239            }
240        }
241    }
242}
243
244impl schemars::JsonSchema for JobTrigger {
245    fn schema_id() -> Cow<'static, str> {
246        Cow::Borrowed("JobTrigger")
247    }
248
249    fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
250        String::json_schema(generator)
251    }
252
253    fn schema_name() -> Cow<'static, str> {
254        Self::schema_id()
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use std::time::Duration;
261
262    use super::*;
263
264    #[test]
265    pub fn job_trigger_serialization_roundtrip() {
266        fn assert_roundtrip(
267            serialized: &str,
268            description: Option<&str>,
269            duration: Option<Duration>,
270        ) {
271            let parsed = serialized.parse::<JobTrigger>().unwrap();
272            assert_eq!(&parsed.to_string(), serialized);
273
274            if let JobTrigger::Cron(expr) = &parsed {
275                assert_eq!(
276                    &expr
277                        .cron
278                        .describe(saffron::parse::English::default())
279                        .to_string(),
280                    description.unwrap()
281                );
282            } else {
283                assert!(description.is_none());
284            }
285
286            if let JobTrigger::Duration(d) = &parsed {
287                assert_eq!(d.as_duration(), duration.unwrap());
288            } else {
289                assert!(duration.is_none());
290            }
291        }
292
293        assert_roundtrip("pre-deployment", None, None);
294        assert_roundtrip("post-deployment", None, None);
295
296        assert_roundtrip("@hourly", Some("Every hour"), None);
297        assert_roundtrip("@daily", Some("At 12:00 AM"), None);
298        assert_roundtrip("@weekly", Some("At 12:00 AM on Sunday"), None);
299        assert_roundtrip(
300            "@monthly",
301            Some("At 12:00 AM on the 1st of every month"),
302            None,
303        );
304        assert_roundtrip("@yearly", Some("At 12:00 AM on the 1st of January"), None);
305
306        // Note: the parsing code should keep the formatting of the source string.
307        // This is tested in assert_roundtrip.
308        assert_roundtrip(
309            "0/2 12 * JAN-APR 2",
310            Some(
311                "At every 2nd minute from 0 through 59 minutes past the hour, \
312                between 12:00 PM and 12:59 PM on Monday of January to April",
313            ),
314            None,
315        );
316
317        assert_roundtrip("10s", None, Some(Duration::from_secs(10)));
318        assert_roundtrip("15m", None, Some(Duration::from_secs(15 * 60)));
319        assert_roundtrip("20h", None, Some(Duration::from_secs(20 * 60 * 60)));
320        assert_roundtrip("2d", None, Some(Duration::from_secs(2 * 60 * 60 * 24)));
321    }
322
323    #[test]
324    pub fn job_serialization_roundtrip() {
325        fn parse_cron(expr: &str) -> CronExpression {
326            CronExpression {
327                cron: expr.parse().unwrap(),
328                parsed_from: expr.to_owned(),
329            }
330        }
331
332        let job = Job {
333            name: "my-job".to_owned(),
334            trigger: JobTrigger::Cron(parse_cron("0/2 12 * JAN-APR 2")),
335            timeout: Some("1m".parse().unwrap()),
336            max_schedule_drift: Some("2h".parse().unwrap()),
337            jitter_percent_max: None,
338            jitter_percent_min: None,
339            retries: None,
340            action: JobAction {
341                action: JobActionCase::Execute(super::ExecutableJob {
342                    package: Some(crate::package::PackageSource::Ident(
343                        crate::package::PackageIdent::Named(crate::package::NamedPackageIdent {
344                            registry: None,
345                            namespace: Some("ns".to_owned()),
346                            name: "pkg".to_owned(),
347                            tag: None,
348                        }),
349                    )),
350                    command: Some("cmd".to_owned()),
351                    cli_args: Some(vec!["arg-1".to_owned(), "arg-2".to_owned()]),
352                    env: Some([("VAR1".to_owned(), "Value".to_owned())].into()),
353                    capabilities: Some(super::ExecutableJobCompatibilityMapV1 {
354                        memory: Some(crate::app::AppConfigCapabilityMemoryV1 {
355                            limit: Some(bytesize::ByteSize::gib(1)),
356                        }),
357                        other: Default::default(),
358                    }),
359                    volumes: Some(vec![crate::app::AppVolume {
360                        name: "vol".to_owned(),
361                        mount: "/path/to/volume".to_owned(),
362                    }]),
363                }),
364            },
365            other: Default::default(),
366        };
367
368        let serialized = r#"
369name: my-job
370trigger: 0/2 12 * JAN-APR 2
371timeout: 1m
372max_schedule_drift: 2h
373action:
374  execute:
375    package: ns/pkg
376    command: cmd
377    cli_args:
378    - arg-1
379    - arg-2
380    env:
381      VAR1: Value
382    capabilities:
383      memory:
384        limit: 1.0 GiB
385    volumes:
386    - name: vol
387      mount: /path/to/volume"#;
388
389        assert_eq!(
390            serialized.trim(),
391            serde_yaml::to_string(&job).unwrap().trim()
392        );
393        assert_eq!(job, serde_yaml::from_str(serialized).unwrap());
394    }
395}