Skip to main content

scheduler/
lib.rs

1use std::{
2    sync::{
3        Arc,
4        atomic::{AtomicBool, AtomicU64, Ordering},
5    },
6    time::Duration,
7};
8
9use chrono::Utc;
10use tokio::{sync::RwLock, task::JoinHandle};
11#[cfg(feature = "tracing")]
12use tracing::*;
13
14use crate::{
15    error::SchedulerError,
16    job::{Job, JobScheduler},
17    scheduler::{Scheduler, TryToScheduler},
18};
19
20pub mod error;
21pub mod job;
22pub mod scheduler;
23
24#[derive(Clone)]
25pub struct JobExecutor {
26    executor: Arc<JobExecutorInternal>,
27}
28
29struct JobExecutorInternal {
30    sleep_between_checks_ms: AtomicU64,
31    running: AtomicBool,
32    jobs: RwLock<Vec<Arc<JobScheduler>>>,
33}
34
35impl JobExecutorInternal {
36    /*
37        /// Returns true if the JobExecutor contains no jobs.
38        pub async fn is_empty(&self) -> bool {
39            let jobs = self.jobs.read().await;
40            jobs.is_empty()
41        }
42
43        /// Returns the number of jobs in the JobExecutor.
44        pub async fn len(&self) -> usize {
45            let jobs = self.jobs.read().await;
46            jobs.len()
47        }
48
49        /// Clear the JobExecutor, removing all jobs.
50        pub async fn clear(&mut self) {
51            let mut jobs = self.jobs.write().await;
52            jobs.clear()
53        }
54
55        /// Returns true if there is at least one job pending.
56        pub async fn is_pending_job(&self) -> bool {
57            let jobs = self.jobs.read().await;
58            for job_scheduler in jobs.iter() {
59                if job_scheduler.is_pending().await {
60                    return true;
61                }
62            }
63            false
64        }
65    */
66    /// Returns true if the Job Executor is running
67    fn is_running(&self) -> bool {
68        self.running.load(Ordering::SeqCst)
69    }
70
71    /// Returns true if there is at least one job running.
72    async fn is_running_job(&self) -> bool {
73        let jobs = self.jobs.read().await;
74        for job_scheduler in jobs.iter() {
75            if job_scheduler.job.is_running().await {
76                return true;
77            }
78        }
79        false
80    }
81
82    /// Run pending jobs in the JobExecutor.
83    async fn run_pending_jobs(&self) {
84        #[cfg(feature = "tracing")]
85        trace!("Check pending jobs");
86        let jobs = self.jobs.read().await;
87        for job_scheduler in jobs.iter() {
88            //println!("check JOB IS PENDING: {}", job.is_pending());
89            if job_scheduler.is_pending().await {
90                //println!("JOB IS RUNNING? {}", is_running);
91                if !job_scheduler.job.is_running().await {
92                    let job_clone = job_scheduler.clone();
93
94                    let timestamp = Utc::now().timestamp();
95                    let group = job_clone.job.group().to_owned();
96                    let name = job_clone.job.name().to_owned();
97
98                    let fut = instrument(timestamp, group.clone(), name.clone(), async move {
99                        #[cfg(feature = "tracing")]
100                        info!("scheduler: starting task {name}");
101                        let start = std::time::Instant::now();
102                        let result = job_clone.run().await;
103
104                        let duration = start.elapsed();
105
106                        match result {
107                            Ok(()) => {
108                                #[cfg(feature = "tracing")]
109                                info!("scheduler: task {name} completed successfully in {duration:?}")
110                            }
111                            Err(err) => {
112                                #[cfg(feature = "tracing")]
113                                error!("scheduler: task {name} failed in {duration:?} with errors: Err: {err:?}")
114                            }
115                        }
116                    });
117
118                    tokio::spawn(fut);
119                } else {
120                    #[cfg(feature = "tracing")]
121                    debug!(
122                        "Job [{}/{}] is pending but already running. It will not be executed.",
123                        job_scheduler.job.group(),
124                        job_scheduler.job.name()
125                    )
126                }
127            }
128        }
129    }
130
131    /// Adds a job to the JobExecutor.
132    async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
133        #[cfg(feature = "tracing")]
134        debug!("Add job to scheduler. Group [{}] - Name [{}]", job.group(), job.name());
135        let mut jobs = self.jobs.write().await;
136        jobs.push(Arc::new(JobScheduler::new(schedule.into(), job)));
137    }
138}
139
140impl JobExecutor {
141    /// Creates a new Executor that uses the Local time zone for the execution times evaluation.
142    /// For example, the cron expressions will refer to the Local time zone.
143    pub fn new_with_local_tz() -> JobExecutor {
144        Self::new_with_tz()
145    }
146
147    /// Creates a new Executor that uses the UTC time zone for the execution times evaluation.
148    /// For example, the cron expressions will refer to the UTC time zone.
149    pub fn new_with_utc_tz() -> JobExecutor {
150        Self::new_with_tz()
151    }
152
153    /// Creates a new Executor that uses a custom time zone for the execution times evaluation.
154    /// For example, the cron expressions will refer to the specified time zone.
155    pub fn new_with_tz() -> JobExecutor {
156        JobExecutor {
157            executor: Arc::new(JobExecutorInternal {
158                sleep_between_checks_ms: AtomicU64::new(1_000),
159                running: AtomicBool::new(false),
160                jobs: RwLock::new(vec![]),
161            }),
162        }
163    }
164
165    /// Adds a job to the JobExecutor.
166    pub async fn add_job(&self, schedule: &dyn TryToScheduler, job: Job) -> Result<(), SchedulerError> {
167        self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
168        Ok(())
169    }
170
171    /// Adds a job to the JobExecutor.
172    pub async fn add_job_with_multi_schedule(
173        &self,
174        schedule: &[&dyn TryToScheduler],
175        job: Job,
176    ) -> Result<(), SchedulerError> {
177        self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
178        Ok(())
179    }
180
181    /// Adds a job to the JobExecutor.
182    pub async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
183        self.executor.add_job_with_scheduler(schedule, job).await
184    }
185
186    /// Starts the JobExecutor
187    pub async fn run(&self) -> Result<JoinHandle<()>, SchedulerError> {
188        let was_running = self.executor.running.swap(true, Ordering::SeqCst);
189        if !was_running {
190            let executor = self.executor.clone();
191            Ok(tokio::spawn(async move {
192                #[cfg(feature = "tracing")]
193                debug!("Starting the job executor");
194                while executor.is_running() {
195                    executor.run_pending_jobs().await;
196                    tokio::time::sleep(Duration::from_millis(executor.sleep_between_checks_ms.load(Ordering::SeqCst)))
197                        .await;
198                }
199                #[cfg(feature = "tracing")]
200                debug!("Job executor stopped");
201            }))
202        } else {
203            #[cfg(feature = "tracing")]
204            warn!("The JobExecutor is already running.");
205            Err(SchedulerError::JobExecutionStateError {
206                message: "The JobExecutor is already running.".to_owned(),
207            })
208        }
209    }
210
211    /// Stops the JobExecutor
212    pub async fn stop(&self, graceful: bool) -> Result<(), SchedulerError> {
213        let was_running = self.executor.running.swap(false, Ordering::SeqCst);
214        if was_running {
215            #[cfg(feature = "tracing")]
216            debug!("Stopping the job executor");
217            if graceful {
218                #[cfg(feature = "tracing")]
219                debug!("Wait for all Jobs to complete");
220                while self.executor.is_running_job().await {
221                    tokio::time::sleep(Duration::from_millis(
222                        self.executor.sleep_between_checks_ms.load(Ordering::SeqCst),
223                    ))
224                    .await;
225                }
226                #[cfg(feature = "tracing")]
227                debug!("All Jobs completed");
228            }
229            Ok(())
230        } else {
231            #[cfg(feature = "tracing")]
232            warn!("The JobExecutor is not running.");
233            Err(SchedulerError::JobExecutionStateError {
234                message: "The JobExecutor is not running.".to_owned(),
235            })
236        }
237    }
238
239    /// Sets the sleep time between checks for pending Jobs.
240    /// The default is 1 second.
241    pub fn set_sleep_between_checks(&self, sleep_ms: u64) {
242        self.executor.sleep_between_checks_ms.store(sleep_ms, Ordering::SeqCst);
243    }
244}
245
246#[cfg(feature = "tracing")]
247fn instrument<F: std::future::Future<Output = ()>>(
248    timestamp: i64,
249    group: String,
250    name: String,
251    fut: F,
252) -> impl std::future::Future<Output = ()> {
253    let span = tracing::error_span!("run_pending", group, name, timestamp);
254    fut.instrument(span)
255}
256
257#[cfg(not(feature = "tracing"))]
258fn instrument<F: std::future::Future<Output = ()>>(
259    _timestamp: i64,
260    _group: String,
261    _name: String,
262    fut: F,
263) -> impl std::future::Future<Output = ()> {
264    fut
265}
266
267#[cfg(test)]
268pub mod test {
269
270    use std::{
271        sync::atomic::{AtomicUsize, Ordering},
272        time::Duration,
273    };
274
275    use chrono::Utc;
276    use tokio::sync::mpsc::channel;
277
278    use super::*;
279
280    #[tokio::test]
281    async fn should_not_run_an_already_running_job() {
282        let executor = JobExecutor::new_with_utc_tz();
283
284        let count = Arc::new(AtomicUsize::new(0));
285        let count_clone = count.clone();
286
287        let (tx, mut rx) = channel(1000);
288
289        executor
290            .add_job(
291                &Duration::new(0, 1),
292                Job::new("g", "n", None, move || {
293                    let count_clone = count_clone.clone();
294                    let tx = tx.clone();
295                    Box::pin(async move {
296                        tx.send("").await.unwrap();
297                        println!("job - started");
298                        count_clone.fetch_add(1, Ordering::SeqCst);
299                        tokio::time::sleep(Duration::new(1, 0)).await;
300                        Ok(())
301                    })
302                }),
303            )
304            .await
305            .unwrap();
306
307        for i in 0..100 {
308            println!("run_pending {i}");
309            executor.executor.run_pending_jobs().await;
310            tokio::time::sleep(Duration::new(0, 2)).await;
311        }
312
313        println!("run_pending completed");
314        rx.recv().await.unwrap();
315
316        assert_eq!(count.load(Ordering::Relaxed), 1);
317    }
318
319    #[tokio::test]
320    async fn a_running_job_should_not_block_the_executor() {
321        let executor = JobExecutor::new_with_local_tz();
322
323        let (tx, mut rx) = channel(959898);
324
325        let count_1 = Arc::new(AtomicUsize::new(0));
326        let count_1_clone = count_1.clone();
327        let tx_1 = tx.clone();
328        executor
329            .add_job_with_multi_schedule(
330                &[&Duration::new(0, 1)],
331                Job::new("g", "n", None, move || {
332                    let count_1_clone = count_1_clone.clone();
333                    let tx_1 = tx_1.clone();
334                    Box::pin(async move {
335                        tx_1.send("").await.unwrap();
336                        println!("job 1 - started");
337                        count_1_clone.fetch_add(1, Ordering::SeqCst);
338                        tokio::time::sleep(Duration::new(1, 0)).await;
339                        Ok(())
340                    })
341                }),
342            )
343            .await
344            .unwrap();
345
346        let count_2 = Arc::new(AtomicUsize::new(0));
347        let count_2_clone = count_2.clone();
348        let tx_2 = tx.clone();
349        executor
350            .add_job(
351                &Duration::new(0, 1),
352                Job::new("g", "n", None, move || {
353                    let count_2_clone = count_2_clone.clone();
354                    let tx_2 = tx_2.clone();
355                    Box::pin(async move {
356                        tx_2.send("").await.unwrap();
357                        println!("job 2 - started");
358                        count_2_clone.fetch_add(1, Ordering::SeqCst);
359                        tokio::time::sleep(Duration::new(1, 0)).await;
360                        Ok(())
361                    })
362                }),
363            )
364            .await
365            .unwrap();
366
367        let count_3 = Arc::new(AtomicUsize::new(0));
368        let count_3_clone = count_3.clone();
369        let tx_3 = tx.clone();
370        executor
371            .add_job(
372                &Duration::new(0, 1),
373                Job::new("g", "n", None, move || {
374                    let count_3_clone = count_3_clone.clone();
375                    let tx_3 = tx_3.clone();
376                    Box::pin(async move {
377                        tx_3.send("").await.unwrap();
378                        println!("job 3 - started");
379                        count_3_clone.fetch_add(1, Ordering::SeqCst);
380                        tokio::time::sleep(Duration::new(1, 0)).await;
381                        Ok(())
382                    })
383                }),
384            )
385            .await
386            .unwrap();
387
388        let before_millis = Utc::now().timestamp_millis();
389        for i in 0..100 {
390            println!("run_pending {i}");
391            executor.executor.run_pending_jobs().await;
392            tokio::time::sleep(Duration::new(0, 1_000_000)).await;
393        }
394        let after_millis = Utc::now().timestamp_millis();
395
396        assert!((after_millis - before_millis) >= 100);
397        assert!((after_millis - before_millis) < 1000);
398
399        rx.recv().await.unwrap();
400
401        assert_eq!(count_1.load(Ordering::SeqCst), 1);
402        assert_eq!(count_2.load(Ordering::SeqCst), 1);
403        assert_eq!(count_3.load(Ordering::SeqCst), 1);
404    }
405
406    #[tokio::test]
407    async fn should_gracefully_shutdown_the_job_executor() {
408        let executor = JobExecutor::new_with_utc_tz();
409
410        let count = Arc::new(AtomicUsize::new(0));
411
412        let tasks = 100;
413
414        for _i in 0..tasks {
415            let count_clone = count.clone();
416            executor
417                .add_job(
418                    &Duration::new(0, 1),
419                    Job::new("g", "n", None, move || {
420                        let count_clone = count_clone.clone();
421                        Box::pin(async move {
422                            tokio::time::sleep(Duration::new(1, 0)).await;
423                            println!("job - started");
424                            count_clone.fetch_add(1, Ordering::SeqCst);
425                            Ok(())
426                        })
427                    }),
428                )
429                .await
430                .unwrap();
431        }
432
433        executor.set_sleep_between_checks(10);
434
435        executor.run().await.unwrap();
436
437        loop {
438            if executor.executor.is_running_job().await {
439                break;
440            }
441            tokio::time::sleep(Duration::from_nanos(1)).await;
442        }
443
444        executor.stop(true).await.unwrap();
445
446        assert_eq!(count.load(Ordering::Relaxed), tasks);
447    }
448
449    #[tokio::test]
450    async fn start_should_fail_if_already_running() {
451        let executor = JobExecutor::new_with_utc_tz();
452        assert!(executor.run().await.is_ok());
453        assert!(executor.run().await.is_err());
454        assert!(executor.stop(false).await.is_ok());
455    }
456
457    #[tokio::test]
458    async fn stop_should_fail_if_not_running() {
459        let executor = JobExecutor::new_with_utc_tz();
460        assert!(executor.stop(false).await.is_err());
461        assert!(executor.run().await.is_ok());
462        assert!(executor.stop(false).await.is_ok());
463        assert!(executor.stop(false).await.is_err());
464    }
465
466    #[tokio::test]
467    async fn should_add_with_explicit_scheduler() {
468        let executor = JobExecutor::new_with_utc_tz();
469        executor
470            .add_job_with_scheduler(Scheduler::Never, Job::new("g", "n", None, move || Box::pin(async { Ok(()) })))
471            .await;
472    }
473
474    #[tokio::test]
475    async fn should_register_a_schedule_by_vec() {
476        let executor = JobExecutor::new_with_utc_tz();
477        executor
478            .add_job(
479                &vec!["0 1 * * * * *"],
480                Job::new("g", "n", None, move || Box::pin(async { Ok(()) })),
481            )
482            .await
483            .unwrap();
484        executor
485            .add_job(
486                &vec!["0 1 * * * * *".to_owned(), "0 1 * * * * *".to_owned()],
487                Job::new("g", "n", None, move || Box::pin(async { Ok(()) })),
488            )
489            .await
490            .unwrap();
491    }
492}