1use std::{borrow::Cow, fmt::Display, str::FromStr};
2
3use anyhow::anyhow;
4use serde::{Deserialize, Serialize, de::Error};
5
6use indexmap::IndexMap;
7
8use crate::package::PackageSource;
9
10use super::{AppConfigCapabilityMemoryV1, AppVolume, HttpRequest, pretty_duration::PrettyDuration};
11
12#[derive(
14 serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
15)]
16pub struct Job {
17 name: String,
18 trigger: JobTrigger,
19
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub timeout: Option<PrettyDuration>,
22
23 #[serde(skip_serializing_if = "Option::is_none")]
27 pub max_schedule_drift: Option<PrettyDuration>,
28
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub retries: Option<u32>,
31
32 #[serde(skip_serializing_if = "Option::is_none")]
42 pub jitter_percent_max: Option<u8>,
43
44 #[serde(skip_serializing_if = "Option::is_none")]
56 pub jitter_percent_min: Option<u8>,
57
58 action: JobAction,
59
60 #[serde(flatten)]
64 pub other: IndexMap<String, serde_json::Value>,
65}
66
67#[derive(
72 serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
73)]
74pub struct JobAction {
75 #[serde(flatten)]
76 action: JobActionCase,
77}
78
79#[derive(
80 serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
81)]
82#[serde(rename_all = "lowercase")]
83pub enum JobActionCase {
84 Fetch(HttpRequest),
85 Execute(ExecutableJob),
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub struct CronExpression {
90 pub cron: saffron::parse::CronExpr,
91 pub parsed_from: String,
93}
94
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub enum JobTrigger {
97 PreDeployment,
98 PostDeployment,
99 Cron(CronExpression),
100 Duration(PrettyDuration),
101}
102
103#[derive(
104 serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
105)]
106pub struct ExecutableJob {
107 #[serde(skip_serializing_if = "Option::is_none")]
109 package: Option<PackageSource>,
110
111 #[serde(skip_serializing_if = "Option::is_none")]
113 command: Option<String>,
114
115 #[serde(skip_serializing_if = "Option::is_none")]
118 cli_args: Option<Vec<String>>,
119
120 #[serde(default, skip_serializing_if = "Option::is_none")]
122 pub env: Option<IndexMap<String, String>>,
123
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub capabilities: Option<ExecutableJobCompatibilityMapV1>,
126
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub volumes: Option<Vec<AppVolume>>,
129}
130
131#[derive(
132 serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
133)]
134pub struct ExecutableJobCompatibilityMapV1 {
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub memory: Option<AppConfigCapabilityMemoryV1>,
138
139 #[serde(flatten)]
144 pub other: IndexMap<String, serde_json::Value>,
145}
146
147impl Serialize for JobTrigger {
148 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149 where
150 S: serde::Serializer,
151 {
152 self.to_string().serialize(serializer)
153 }
154}
155
156impl<'de> Deserialize<'de> for JobTrigger {
157 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
158 where
159 D: serde::Deserializer<'de>,
160 {
161 let repr: Cow<'de, str> = Cow::deserialize(deserializer)?;
162 repr.parse().map_err(D::Error::custom)
163 }
164}
165
166impl Display for JobTrigger {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 match self {
169 Self::PreDeployment => write!(f, "pre-deployment"),
170 Self::PostDeployment => write!(f, "post-deployment"),
171 Self::Cron(cron) => write!(f, "{}", cron.parsed_from),
172 Self::Duration(duration) => write!(f, "{duration}"),
173 }
174 }
175}
176
177impl FromStr for JobTrigger {
178 type Err = anyhow::Error;
179
180 fn from_str(s: &str) -> Result<Self, Self::Err> {
181 if s == "pre-deployment" {
182 Ok(Self::PreDeployment)
183 } else if s == "post-deployment" {
184 Ok(Self::PostDeployment)
185 } else {
186 match s.parse::<CronExpression>() {
187 Ok(expr) => Ok(Self::Cron(expr)),
188 _ => {
189 if let Ok(duration) = s.parse::<PrettyDuration>() {
190 Ok(Self::Duration(duration))
191 } else {
192 Err(anyhow!(
193 "Invalid job trigger '{s}'. Must be 'pre-deployment', 'post-deployment', \
194 a valid cron expression such as '0 */5 * * *' or a duration such as '15m'.",
195 ))
196 }
197 }
198 }
199 }
200 }
201}
202
203impl FromStr for CronExpression {
204 type Err = Box<dyn std::error::Error + Send + Sync>;
205
206 fn from_str(s: &str) -> Result<Self, Self::Err> {
207 if let Some(predefined_sched) = s.strip_prefix('@') {
208 match predefined_sched {
209 "hourly" => Ok(Self {
210 cron: "0 * * * *".parse().unwrap(),
211 parsed_from: s.to_owned(),
212 }),
213 "daily" => Ok(Self {
214 cron: "0 0 * * *".parse().unwrap(),
215 parsed_from: s.to_owned(),
216 }),
217 "weekly" => Ok(Self {
218 cron: "0 0 * * 1".parse().unwrap(),
219 parsed_from: s.to_owned(),
220 }),
221 "monthly" => Ok(Self {
222 cron: "0 0 1 * *".parse().unwrap(),
223 parsed_from: s.to_owned(),
224 }),
225 "yearly" => Ok(Self {
226 cron: "0 0 1 1 *".parse().unwrap(),
227 parsed_from: s.to_owned(),
228 }),
229 _ => Err(format!("Invalid cron expression {s}").into()),
230 }
231 } else {
232 match s.parse() {
234 Ok(expr) => Ok(Self {
235 cron: expr,
236 parsed_from: s.to_owned(),
237 }),
238 Err(_) => Err(format!("Invalid cron expression {s}").into()),
239 }
240 }
241 }
242}
243
244impl schemars::JsonSchema for JobTrigger {
245 fn schema_name() -> String {
246 "JobTrigger".to_owned()
247 }
248
249 fn json_schema(r#gen: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
250 String::json_schema(r#gen)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use std::time::Duration;
257
258 use super::*;
259
260 #[test]
261 pub fn job_trigger_serialization_roundtrip() {
262 fn assert_roundtrip(
263 serialized: &str,
264 description: Option<&str>,
265 duration: Option<Duration>,
266 ) {
267 let parsed = serialized.parse::<JobTrigger>().unwrap();
268 assert_eq!(&parsed.to_string(), serialized);
269
270 if let JobTrigger::Cron(expr) = &parsed {
271 assert_eq!(
272 &expr
273 .cron
274 .describe(saffron::parse::English::default())
275 .to_string(),
276 description.unwrap()
277 );
278 } else {
279 assert!(description.is_none());
280 }
281
282 if let JobTrigger::Duration(d) = &parsed {
283 assert_eq!(d.as_duration(), duration.unwrap());
284 } else {
285 assert!(duration.is_none());
286 }
287 }
288
289 assert_roundtrip("pre-deployment", None, None);
290 assert_roundtrip("post-deployment", None, None);
291
292 assert_roundtrip("@hourly", Some("Every hour"), None);
293 assert_roundtrip("@daily", Some("At 12:00 AM"), None);
294 assert_roundtrip("@weekly", Some("At 12:00 AM on Sunday"), None);
295 assert_roundtrip(
296 "@monthly",
297 Some("At 12:00 AM on the 1st of every month"),
298 None,
299 );
300 assert_roundtrip("@yearly", Some("At 12:00 AM on the 1st of January"), None);
301
302 assert_roundtrip(
305 "0/2 12 * JAN-APR 2",
306 Some(
307 "At every 2nd minute from 0 through 59 minutes past the hour, \
308 between 12:00 PM and 12:59 PM on Monday of January to April",
309 ),
310 None,
311 );
312
313 assert_roundtrip("10s", None, Some(Duration::from_secs(10)));
314 assert_roundtrip("15m", None, Some(Duration::from_secs(15 * 60)));
315 assert_roundtrip("20h", None, Some(Duration::from_secs(20 * 60 * 60)));
316 assert_roundtrip("2d", None, Some(Duration::from_secs(2 * 60 * 60 * 24)));
317 }
318
319 #[test]
320 pub fn job_serialization_roundtrip() {
321 fn parse_cron(expr: &str) -> CronExpression {
322 CronExpression {
323 cron: expr.parse().unwrap(),
324 parsed_from: expr.to_owned(),
325 }
326 }
327
328 let job = Job {
329 name: "my-job".to_owned(),
330 trigger: JobTrigger::Cron(parse_cron("0/2 12 * JAN-APR 2")),
331 timeout: Some("1m".parse().unwrap()),
332 max_schedule_drift: Some("2h".parse().unwrap()),
333 jitter_percent_max: None,
334 jitter_percent_min: None,
335 retries: None,
336 action: JobAction {
337 action: JobActionCase::Execute(super::ExecutableJob {
338 package: Some(crate::package::PackageSource::Ident(
339 crate::package::PackageIdent::Named(crate::package::NamedPackageIdent {
340 registry: None,
341 namespace: Some("ns".to_owned()),
342 name: "pkg".to_owned(),
343 tag: None,
344 }),
345 )),
346 command: Some("cmd".to_owned()),
347 cli_args: Some(vec!["arg-1".to_owned(), "arg-2".to_owned()]),
348 env: Some([("VAR1".to_owned(), "Value".to_owned())].into()),
349 capabilities: Some(super::ExecutableJobCompatibilityMapV1 {
350 memory: Some(crate::app::AppConfigCapabilityMemoryV1 {
351 limit: Some(bytesize::ByteSize::gb(1)),
352 }),
353 other: Default::default(),
354 }),
355 volumes: Some(vec![crate::app::AppVolume {
356 name: "vol".to_owned(),
357 mount: "/path/to/volume".to_owned(),
358 }]),
359 }),
360 },
361 other: Default::default(),
362 };
363
364 let serialized = r#"
365name: my-job
366trigger: 0/2 12 * JAN-APR 2
367timeout: 1m
368max_schedule_drift: 2h
369action:
370 execute:
371 package: ns/pkg
372 command: cmd
373 cli_args:
374 - arg-1
375 - arg-2
376 env:
377 VAR1: Value
378 capabilities:
379 memory:
380 limit: 1000.0 MB
381 volumes:
382 - name: vol
383 mount: /path/to/volume"#;
384
385 assert_eq!(
386 serialized.trim(),
387 serde_yaml::to_string(&job).unwrap().trim()
388 );
389 assert_eq!(job, serde_yaml::from_str(serialized).unwrap());
390 }
391}