Skip to main content

scheduler/
job.rs

1use std::{future::Future, pin::Pin, sync::Arc};
2
3use chrono::{DateTime, Utc};
4use tokio::sync::{Mutex, RwLock};
5#[cfg(feature = "tracing")]
6use tracing::*;
7
8use crate::{error::SchedulerError, scheduler::Scheduler};
9
10pub struct JobScheduler {
11    pub job: Job,
12    schedule: Mutex<Scheduler>,
13    next_run_at: Mutex<Option<DateTime<Utc>>>,
14    last_run_at: Mutex<Option<DateTime<Utc>>>,
15}
16
17impl JobScheduler {
18    pub fn new(mut schedule: Scheduler, job: Job) -> Self {
19        // Determine the next time it should run
20        let next_run_at = schedule.next(&Utc::now());
21        JobScheduler {
22            job,
23            schedule: Mutex::new(schedule),
24            next_run_at: Mutex::new(next_run_at),
25            last_run_at: Mutex::new(None),
26        }
27    }
28
29    /// Returns true if this job is pending execution.
30    pub async fn is_pending(&self) -> bool {
31        // Check if paused
32        if !self.job.is_active {
33            return false;
34        }
35
36        // Check if NOW is on or after next_run_at
37        if let Some(next_run_at) = self.next_run_at.lock().await.as_ref() {
38            *next_run_at < Utc::now()
39        } else {
40            false
41        }
42    }
43
44    /// Run the job immediately and re-schedule it.
45    pub async fn run(&self) -> Result<(), SchedulerError> {
46        // Execute the job function
47        let run_result = self.job.run().await;
48
49        let now = Utc::now();
50
51        let mut schedule = self.schedule.lock().await;
52
53        // Determine the next time it should run
54        let mut next_run_at = self.next_run_at.lock().await;
55        *next_run_at = schedule.next(&now);
56
57        // Save the last time this ran
58        let mut last_run_at = self.last_run_at.lock().await;
59        *last_run_at = Some(now);
60
61        run_result
62    }
63}
64
65pub type JobFn = dyn 'static
66    + Send
67    + Sync
68    + Fn() -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>>;
69
70pub struct Job {
71    function: Arc<JobFn>,
72    group: String,
73    name: String,
74    is_active: bool,
75    is_running: RwLock<bool>,
76    retries_after_failure: Option<u64>,
77}
78
79impl Job {
80    pub fn new<
81        G: Into<String>,
82        N: Into<String>,
83        F: 'static
84            + Send
85            + Sync
86            + Fn() -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>>,
87    >(
88        group: G,
89        name: N,
90        retries_after_failure: Option<u64>,
91        function: F,
92    ) -> Self {
93        Job {
94            function: Arc::new(function),
95            name: name.into(),
96            group: group.into(),
97            retries_after_failure,
98            is_running: RwLock::new(false),
99            is_active: true,
100        }
101    }
102
103    /// Returns true if this job is currently running.
104    pub async fn is_running(&self) -> bool {
105        let read = self.is_running.read().await;
106        *read
107    }
108
109    pub fn name(&self) -> &str {
110        &self.name
111    }
112
113    pub fn group(&self) -> &str {
114        &self.group
115    }
116
117    /// Run the job immediately and re-schedule it.
118    pub async fn run(&self) -> Result<(), SchedulerError> {
119        self.set_running(true).await?;
120
121        // Execute the job function
122        let mut run_result = self.exec().await;
123
124        if let Some(retries) = self.retries_after_failure {
125            for attempt in 1..=retries {
126                if let Err(e) = run_result {
127                    #[cfg(feature = "tracing")]
128                    warn!(
129                        "Execution failed for job [{}/{}] - Retry execution, attempt {}/{}. Previous err: {}",
130                        self.group, self.name, attempt, retries, e
131                    );
132                    run_result = self.exec().await;
133                } else {
134                    break;
135                }
136            }
137        }
138
139        self.set_running(false).await?;
140
141        run_result.map_err(|err| SchedulerError::JobExecutionError {
142            source: err,
143        })
144    }
145
146    async fn exec(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
147        let function = self.function.clone();
148        (function)().await
149    }
150
151    async fn set_running(&self, is_running: bool) -> Result<(), SchedulerError> {
152        let mut write = self.is_running.write().await;
153
154        if is_running.eq(&*write) {
155            return Err(SchedulerError::JobLockError {
156                message: format!(
157                    "Wrong Job status found for job [{}/{}]. Expected: {}",
158                    self.group, self.name, !is_running
159                ),
160            });
161        }
162
163        *write = is_running;
164        Ok(())
165    }
166}
167
168#[cfg(test)]
169pub mod test {
170
171    use std::{sync::Arc, time::Duration};
172
173    use tokio::sync::mpsc::channel;
174
175    use super::*;
176
177    #[tokio::test]
178    async fn should_be_running() {
179        let lock = Arc::new(Mutex::new(true));
180        let lock_clone = lock.clone();
181        let (tx, mut rx) = channel(10000);
182        let tx_clone = tx.clone();
183
184        let job_scheduler = Arc::new(JobScheduler::new(
185            Scheduler::Interval {
186                interval_duration: Duration::new(1, 0),
187                execute_at_startup: false,
188            },
189            Job::new("g", "n", None, move || {
190                let lock_clone = lock_clone.clone();
191                let tx_clone = tx_clone.clone();
192                Box::pin(async move {
193                    println!("job - started");
194                    tx_clone.send("").await.unwrap();
195                    println!("job - Trying to get the lock");
196                    let _lock = lock_clone.lock().await;
197                    println!("job - lock acquired");
198                    Ok(())
199                })
200            }),
201        ));
202
203        assert!(!job_scheduler.job.is_running().await);
204
205        {
206            let _lock = lock.lock().await;
207            let job_clone = job_scheduler.clone();
208            tokio::spawn(async move {
209                println!("starting job");
210                job_clone.run().await.unwrap();
211                println!("end job execution");
212                tx.send("").await.unwrap();
213            });
214            rx.recv().await.unwrap();
215            assert!(job_scheduler.job.is_running().await);
216        }
217
218        rx.recv().await.unwrap();
219        assert!(!job_scheduler.job.is_running().await);
220    }
221
222    #[tokio::test]
223    async fn job_should_not_retry_run_if_ok() {
224        let lock = Arc::new(Mutex::new(0));
225        let lock_clone = lock.clone();
226
227        let max_retries = 12;
228
229        let job = Job::new("g", "n", Some(max_retries), move || {
230            let lock_clone = lock_clone.clone();
231            Box::pin(async move {
232                println!("job - started");
233                println!("job - Trying to get the lock");
234                let mut lock = lock_clone.lock().await;
235                let count = *lock;
236                *lock = count + 1;
237                println!("job - count {count}");
238                Ok(())
239            })
240        });
241
242        let result = job.run().await;
243
244        assert!(result.is_ok());
245
246        let lock = lock.lock().await;
247        let count = *lock;
248        assert_eq!(1, count);
249    }
250
251    #[tokio::test]
252    async fn job_should_retry_run_if_error() {
253        let lock = Arc::new(Mutex::new(0));
254        let lock_clone = lock.clone();
255
256        let max_retries = 12;
257
258        let job = Job::new("g", "n", Some(max_retries), move || {
259            let lock_clone = lock_clone.clone();
260            Box::pin(async move {
261                println!("job - started");
262                println!("job - Trying to get the lock");
263                let mut lock = lock_clone.lock().await;
264                let count = *lock;
265                *lock = count + 1;
266                println!("job - count {count}");
267                Err(SchedulerError::JobLockError {
268                    message: "".to_owned(),
269                })?
270            })
271        });
272
273        let result = job.run().await;
274
275        assert!(result.is_err());
276
277        let lock = lock.lock().await;
278        let count = *lock;
279        assert_eq!(max_retries + 1, count);
280    }
281
282    #[tokio::test]
283    async fn job_should_stop_retrying_run_if_attempt_succeed() {
284        let lock = Arc::new(Mutex::new(0));
285        let lock_clone = lock.clone();
286
287        let succeed_at = 7;
288        let max_retries = 12;
289
290        let job = Job::new("g", "n", Some(max_retries), move || {
291            let lock_clone = lock_clone.clone();
292            Box::pin(async move {
293                println!("job - started");
294                println!("job - Trying to get the lock");
295                let mut lock = lock_clone.lock().await;
296                let count = *lock;
297                *lock = count + 1;
298                println!("job - count {count}");
299
300                if count == succeed_at {
301                    Ok(())
302                } else {
303                    Err(SchedulerError::JobLockError {
304                        message: "".to_owned(),
305                    })?
306                }
307            })
308        });
309
310        let result = job.run().await;
311
312        assert!(result.is_ok());
313
314        let lock = lock.lock().await;
315        let count = *lock;
316        assert_eq!(succeed_at + 1, count);
317    }
318}