wasmer_config/app/
job.rs

1use std::{borrow::Cow, fmt::Display, str::FromStr};
2
3use anyhow::anyhow;
4use serde::{Deserialize, Serialize, de::Error};
5
6use indexmap::IndexMap;
7
8use crate::package::PackageSource;
9
10use super::{AppConfigCapabilityMemoryV1, AppVolume, HttpRequest, pretty_duration::PrettyDuration};
11
12/// Job configuration.
13#[derive(
14    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
15)]
16pub struct Job {
17    name: String,
18    trigger: JobTrigger,
19
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub timeout: Option<PrettyDuration>,
22
23    /// Don't start job if past the due time by this amount,
24    /// instead opting to wait for the next instance of it
25    /// to be triggered.
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub max_schedule_drift: Option<PrettyDuration>,
28
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub retries: Option<u32>,
31
32    /// Maximum percent of "jitter" to introduce between invocations.
33    ///
34    /// Value range: 0-100
35    ///
36    /// Jitter is used to spread out jobs over time.
37    /// The calculation works by multiplying the time between invocations
38    /// by a random amount, and taking the percentage of that random amount.
39    ///
40    /// See also [`Self::jitter_percent_min`] to set a minimum jitter.
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub jitter_percent_max: Option<u8>,
43
44    /// Minimum "jitter" to introduce between invocations.
45    ///
46    /// Value range: 0-100
47    ///
48    /// Jitter is used to spread out jobs over time.
49    /// The calculation works by multiplying the time between invocations
50    /// by a random amount, and taking the percentage of that random amount.
51    ///
52    /// If not specified while `jitter_percent_max` is, it will default to 10%.
53    ///
54    /// See also [`Self::jitter_percent_max`] to set a maximum jitter.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub jitter_percent_min: Option<u8>,
57
58    action: JobAction,
59
60    /// Additional unknown fields.
61    ///
62    /// Exists for forward compatibility for newly added fields.
63    #[serde(flatten)]
64    pub other: IndexMap<String, serde_json::Value>,
65}
66
67// We need this wrapper struct to enable this formatting:
68// job:
69//   action:
70//     execute: ...
71#[derive(
72    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
73)]
74pub struct JobAction {
75    #[serde(flatten)]
76    action: JobActionCase,
77}
78
79#[derive(
80    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
81)]
82#[serde(rename_all = "lowercase")]
83pub enum JobActionCase {
84    Fetch(HttpRequest),
85    Execute(ExecutableJob),
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub struct CronExpression {
90    pub cron: saffron::parse::CronExpr,
91    // Keep the original string form around for serialization purposes.
92    pub parsed_from: String,
93}
94
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub enum JobTrigger {
97    PreDeployment,
98    PostDeployment,
99    Cron(CronExpression),
100    Duration(PrettyDuration),
101}
102
103#[derive(
104    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
105)]
106pub struct ExecutableJob {
107    /// The package that contains the command to run. Defaults to the app config's package.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    package: Option<PackageSource>,
110
111    /// The command to run. Defaults to the package's entrypoint.
112    #[serde(skip_serializing_if = "Option::is_none")]
113    command: Option<String>,
114
115    /// CLI arguments passed to the runner.
116    /// Only applicable for runners that accept CLI arguments.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    cli_args: Option<Vec<String>>,
119
120    /// Environment variables.
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub env: Option<IndexMap<String, String>>,
123
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub capabilities: Option<ExecutableJobCompatibilityMapV1>,
126
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub volumes: Option<Vec<AppVolume>>,
129}
130
131#[derive(
132    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
133)]
134pub struct ExecutableJobCompatibilityMapV1 {
135    /// Instance memory settings.
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub memory: Option<AppConfigCapabilityMemoryV1>,
138
139    /// Additional unknown capabilities.
140    ///
141    /// This provides a small bit of forwards compatibility for newly added
142    /// capabilities.
143    #[serde(flatten)]
144    pub other: IndexMap<String, serde_json::Value>,
145}
146
147impl Serialize for JobTrigger {
148    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149    where
150        S: serde::Serializer,
151    {
152        self.to_string().serialize(serializer)
153    }
154}
155
156impl<'de> Deserialize<'de> for JobTrigger {
157    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
158    where
159        D: serde::Deserializer<'de>,
160    {
161        let repr: Cow<'de, str> = Cow::deserialize(deserializer)?;
162        repr.parse().map_err(D::Error::custom)
163    }
164}
165
166impl Display for JobTrigger {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            Self::PreDeployment => write!(f, "pre-deployment"),
170            Self::PostDeployment => write!(f, "post-deployment"),
171            Self::Cron(cron) => write!(f, "{}", cron.parsed_from),
172            Self::Duration(duration) => write!(f, "{duration}"),
173        }
174    }
175}
176
177impl FromStr for JobTrigger {
178    type Err = anyhow::Error;
179
180    fn from_str(s: &str) -> Result<Self, Self::Err> {
181        if s == "pre-deployment" {
182            Ok(Self::PreDeployment)
183        } else if s == "post-deployment" {
184            Ok(Self::PostDeployment)
185        } else {
186            match s.parse::<CronExpression>() {
187                Ok(expr) => Ok(Self::Cron(expr)),
188                _ => {
189                    if let Ok(duration) = s.parse::<PrettyDuration>() {
190                        Ok(Self::Duration(duration))
191                    } else {
192                        Err(anyhow!(
193                            "Invalid job trigger '{s}'. Must be 'pre-deployment', 'post-deployment', \
194                a valid cron expression such as '0 */5 * * *' or a duration such as '15m'.",
195                        ))
196                    }
197                }
198            }
199        }
200    }
201}
202
203impl FromStr for CronExpression {
204    type Err = Box<dyn std::error::Error + Send + Sync>;
205
206    fn from_str(s: &str) -> Result<Self, Self::Err> {
207        if let Some(predefined_sched) = s.strip_prefix('@') {
208            match predefined_sched {
209                "hourly" => Ok(Self {
210                    cron: "0 * * * *".parse().unwrap(),
211                    parsed_from: s.to_owned(),
212                }),
213                "daily" => Ok(Self {
214                    cron: "0 0 * * *".parse().unwrap(),
215                    parsed_from: s.to_owned(),
216                }),
217                "weekly" => Ok(Self {
218                    cron: "0 0 * * 1".parse().unwrap(),
219                    parsed_from: s.to_owned(),
220                }),
221                "monthly" => Ok(Self {
222                    cron: "0 0 1 * *".parse().unwrap(),
223                    parsed_from: s.to_owned(),
224                }),
225                "yearly" => Ok(Self {
226                    cron: "0 0 1 1 *".parse().unwrap(),
227                    parsed_from: s.to_owned(),
228                }),
229                _ => Err(format!("Invalid cron expression {s}").into()),
230            }
231        } else {
232            // Let's make sure the input string is valid...
233            match s.parse() {
234                Ok(expr) => Ok(Self {
235                    cron: expr,
236                    parsed_from: s.to_owned(),
237                }),
238                Err(_) => Err(format!("Invalid cron expression {s}").into()),
239            }
240        }
241    }
242}
243
244impl schemars::JsonSchema for JobTrigger {
245    fn schema_name() -> String {
246        "JobTrigger".to_owned()
247    }
248
249    fn json_schema(r#gen: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
250        String::json_schema(r#gen)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use std::time::Duration;
257
258    use super::*;
259
260    #[test]
261    pub fn job_trigger_serialization_roundtrip() {
262        fn assert_roundtrip(
263            serialized: &str,
264            description: Option<&str>,
265            duration: Option<Duration>,
266        ) {
267            let parsed = serialized.parse::<JobTrigger>().unwrap();
268            assert_eq!(&parsed.to_string(), serialized);
269
270            if let JobTrigger::Cron(expr) = &parsed {
271                assert_eq!(
272                    &expr
273                        .cron
274                        .describe(saffron::parse::English::default())
275                        .to_string(),
276                    description.unwrap()
277                );
278            } else {
279                assert!(description.is_none());
280            }
281
282            if let JobTrigger::Duration(d) = &parsed {
283                assert_eq!(d.as_duration(), duration.unwrap());
284            } else {
285                assert!(duration.is_none());
286            }
287        }
288
289        assert_roundtrip("pre-deployment", None, None);
290        assert_roundtrip("post-deployment", None, None);
291
292        assert_roundtrip("@hourly", Some("Every hour"), None);
293        assert_roundtrip("@daily", Some("At 12:00 AM"), None);
294        assert_roundtrip("@weekly", Some("At 12:00 AM on Sunday"), None);
295        assert_roundtrip(
296            "@monthly",
297            Some("At 12:00 AM on the 1st of every month"),
298            None,
299        );
300        assert_roundtrip("@yearly", Some("At 12:00 AM on the 1st of January"), None);
301
302        // Note: the parsing code should keep the formatting of the source string.
303        // This is tested in assert_roundtrip.
304        assert_roundtrip(
305            "0/2 12 * JAN-APR 2",
306            Some(
307                "At every 2nd minute from 0 through 59 minutes past the hour, \
308                between 12:00 PM and 12:59 PM on Monday of January to April",
309            ),
310            None,
311        );
312
313        assert_roundtrip("10s", None, Some(Duration::from_secs(10)));
314        assert_roundtrip("15m", None, Some(Duration::from_secs(15 * 60)));
315        assert_roundtrip("20h", None, Some(Duration::from_secs(20 * 60 * 60)));
316        assert_roundtrip("2d", None, Some(Duration::from_secs(2 * 60 * 60 * 24)));
317    }
318
319    #[test]
320    pub fn job_serialization_roundtrip() {
321        fn parse_cron(expr: &str) -> CronExpression {
322            CronExpression {
323                cron: expr.parse().unwrap(),
324                parsed_from: expr.to_owned(),
325            }
326        }
327
328        let job = Job {
329            name: "my-job".to_owned(),
330            trigger: JobTrigger::Cron(parse_cron("0/2 12 * JAN-APR 2")),
331            timeout: Some("1m".parse().unwrap()),
332            max_schedule_drift: Some("2h".parse().unwrap()),
333            jitter_percent_max: None,
334            jitter_percent_min: None,
335            retries: None,
336            action: JobAction {
337                action: JobActionCase::Execute(super::ExecutableJob {
338                    package: Some(crate::package::PackageSource::Ident(
339                        crate::package::PackageIdent::Named(crate::package::NamedPackageIdent {
340                            registry: None,
341                            namespace: Some("ns".to_owned()),
342                            name: "pkg".to_owned(),
343                            tag: None,
344                        }),
345                    )),
346                    command: Some("cmd".to_owned()),
347                    cli_args: Some(vec!["arg-1".to_owned(), "arg-2".to_owned()]),
348                    env: Some([("VAR1".to_owned(), "Value".to_owned())].into()),
349                    capabilities: Some(super::ExecutableJobCompatibilityMapV1 {
350                        memory: Some(crate::app::AppConfigCapabilityMemoryV1 {
351                            limit: Some(bytesize::ByteSize::gb(1)),
352                        }),
353                        other: Default::default(),
354                    }),
355                    volumes: Some(vec![crate::app::AppVolume {
356                        name: "vol".to_owned(),
357                        mount: "/path/to/volume".to_owned(),
358                    }]),
359                }),
360            },
361            other: Default::default(),
362        };
363
364        let serialized = r#"
365name: my-job
366trigger: 0/2 12 * JAN-APR 2
367timeout: 1m
368max_schedule_drift: 2h
369action:
370  execute:
371    package: ns/pkg
372    command: cmd
373    cli_args:
374    - arg-1
375    - arg-2
376    env:
377      VAR1: Value
378    capabilities:
379      memory:
380        limit: 1000.0 MB
381    volumes:
382    - name: vol
383      mount: /path/to/volume"#;
384
385        assert_eq!(
386            serialized.trim(),
387            serde_yaml::to_string(&job).unwrap().trim()
388        );
389        assert_eq!(job, serde_yaml::from_str(serialized).unwrap());
390    }
391}