1use std::{
2 sync::{
3 Arc,
4 atomic::{AtomicBool, AtomicU64, Ordering},
5 },
6 time::Duration,
7};
8
9use chrono::Utc;
10use tokio::{sync::RwLock, task::JoinHandle};
11#[cfg(feature = "tracing")]
12use tracing::*;
13
14use crate::{
15 error::SchedulerError,
16 job::{Job, JobScheduler},
17 scheduler::{Scheduler, TryToScheduler},
18};
19
20pub mod error;
21pub mod job;
22pub mod scheduler;
23
24#[derive(Clone)]
25pub struct JobExecutor {
26 executor: Arc<JobExecutorInternal>,
27}
28
29struct JobExecutorInternal {
30 sleep_between_checks_ms: AtomicU64,
31 running: AtomicBool,
32 jobs: RwLock<Vec<Arc<JobScheduler>>>,
33}
34
35impl JobExecutorInternal {
36 fn is_running(&self) -> bool {
68 self.running.load(Ordering::SeqCst)
69 }
70
71 async fn is_running_job(&self) -> bool {
73 let jobs = self.jobs.read().await;
74 for job_scheduler in jobs.iter() {
75 if job_scheduler.job.is_running().await {
76 return true;
77 }
78 }
79 false
80 }
81
82 async fn run_pending_jobs(&self) {
84 #[cfg(feature = "tracing")]
85 trace!("Check pending jobs");
86 let jobs = self.jobs.read().await;
87 for job_scheduler in jobs.iter() {
88 if job_scheduler.is_pending().await {
90 if !job_scheduler.job.is_running().await {
92 let job_clone = job_scheduler.clone();
93
94 let timestamp = Utc::now().timestamp();
95 let group = job_clone.job.group().to_owned();
96 let name = job_clone.job.name().to_owned();
97
98 let fut = instrument(timestamp, group.clone(), name.clone(), async move {
99 #[cfg(feature = "tracing")]
100 info!("scheduler: starting task {name}");
101 let start = std::time::Instant::now();
102 let result = job_clone.run().await;
103
104 let duration = start.elapsed();
105
106 match result {
107 Ok(()) => {
108 #[cfg(feature = "tracing")]
109 info!("scheduler: task {name} completed successfully in {duration:?}")
110 }
111 Err(err) => {
112 #[cfg(feature = "tracing")]
113 error!("scheduler: task {name} failed in {duration:?} with errors: Err: {err:?}")
114 }
115 }
116 });
117
118 tokio::spawn(fut);
119 } else {
120 #[cfg(feature = "tracing")]
121 debug!(
122 "Job [{}/{}] is pending but already running. It will not be executed.",
123 job_scheduler.job.group(),
124 job_scheduler.job.name()
125 )
126 }
127 }
128 }
129 }
130
131 async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
133 #[cfg(feature = "tracing")]
134 debug!("Add job to scheduler. Group [{}] - Name [{}]", job.group(), job.name());
135 let mut jobs = self.jobs.write().await;
136 jobs.push(Arc::new(JobScheduler::new(schedule.into(), job)));
137 }
138}
139
140impl JobExecutor {
141 pub fn new_with_local_tz() -> JobExecutor {
144 Self::new_with_tz()
145 }
146
147 pub fn new_with_utc_tz() -> JobExecutor {
150 Self::new_with_tz()
151 }
152
153 pub fn new_with_tz() -> JobExecutor {
156 JobExecutor {
157 executor: Arc::new(JobExecutorInternal {
158 sleep_between_checks_ms: AtomicU64::new(1_000),
159 running: AtomicBool::new(false),
160 jobs: RwLock::new(vec![]),
161 }),
162 }
163 }
164
165 pub async fn add_job(&self, schedule: &dyn TryToScheduler, job: Job) -> Result<(), SchedulerError> {
167 self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
168 Ok(())
169 }
170
171 pub async fn add_job_with_multi_schedule(
173 &self,
174 schedule: &[&dyn TryToScheduler],
175 job: Job,
176 ) -> Result<(), SchedulerError> {
177 self.add_job_with_scheduler(schedule.to_scheduler()?, job).await;
178 Ok(())
179 }
180
181 pub async fn add_job_with_scheduler<S: Into<Scheduler>>(&self, schedule: S, job: Job) {
183 self.executor.add_job_with_scheduler(schedule, job).await
184 }
185
186 pub async fn run(&self) -> Result<JoinHandle<()>, SchedulerError> {
188 let was_running = self.executor.running.swap(true, Ordering::SeqCst);
189 if !was_running {
190 let executor = self.executor.clone();
191 Ok(tokio::spawn(async move {
192 #[cfg(feature = "tracing")]
193 debug!("Starting the job executor");
194 while executor.is_running() {
195 executor.run_pending_jobs().await;
196 tokio::time::sleep(Duration::from_millis(executor.sleep_between_checks_ms.load(Ordering::SeqCst)))
197 .await;
198 }
199 #[cfg(feature = "tracing")]
200 debug!("Job executor stopped");
201 }))
202 } else {
203 #[cfg(feature = "tracing")]
204 warn!("The JobExecutor is already running.");
205 Err(SchedulerError::JobExecutionStateError {
206 message: "The JobExecutor is already running.".to_owned(),
207 })
208 }
209 }
210
211 pub async fn stop(&self, graceful: bool) -> Result<(), SchedulerError> {
213 let was_running = self.executor.running.swap(false, Ordering::SeqCst);
214 if was_running {
215 #[cfg(feature = "tracing")]
216 debug!("Stopping the job executor");
217 if graceful {
218 #[cfg(feature = "tracing")]
219 debug!("Wait for all Jobs to complete");
220 while self.executor.is_running_job().await {
221 tokio::time::sleep(Duration::from_millis(
222 self.executor.sleep_between_checks_ms.load(Ordering::SeqCst),
223 ))
224 .await;
225 }
226 #[cfg(feature = "tracing")]
227 debug!("All Jobs completed");
228 }
229 Ok(())
230 } else {
231 #[cfg(feature = "tracing")]
232 warn!("The JobExecutor is not running.");
233 Err(SchedulerError::JobExecutionStateError {
234 message: "The JobExecutor is not running.".to_owned(),
235 })
236 }
237 }
238
239 pub fn set_sleep_between_checks(&self, sleep_ms: u64) {
242 self.executor.sleep_between_checks_ms.store(sleep_ms, Ordering::SeqCst);
243 }
244}
245
246#[cfg(feature = "tracing")]
247fn instrument<F: std::future::Future<Output = ()>>(
248 timestamp: i64,
249 group: String,
250 name: String,
251 fut: F,
252) -> impl std::future::Future<Output = ()> {
253 let span = tracing::error_span!("run_pending", group, name, timestamp);
254 fut.instrument(span)
255}
256
257#[cfg(not(feature = "tracing"))]
258fn instrument<F: std::future::Future<Output = ()>>(
259 _timestamp: i64,
260 _group: String,
261 _name: String,
262 fut: F,
263) -> impl std::future::Future<Output = ()> {
264 fut
265}
266
267#[cfg(test)]
268pub mod test {
269
270 use std::{
271 sync::atomic::{AtomicUsize, Ordering},
272 time::Duration,
273 };
274
275 use chrono::Utc;
276 use tokio::sync::mpsc::channel;
277
278 use super::*;
279
280 #[tokio::test]
281 async fn should_not_run_an_already_running_job() {
282 let executor = JobExecutor::new_with_utc_tz();
283
284 let count = Arc::new(AtomicUsize::new(0));
285 let count_clone = count.clone();
286
287 let (tx, mut rx) = channel(1000);
288
289 executor
290 .add_job(
291 &Duration::new(0, 1),
292 Job::new("g", "n", None, move || {
293 let count_clone = count_clone.clone();
294 let tx = tx.clone();
295 Box::pin(async move {
296 tx.send("").await.unwrap();
297 println!("job - started");
298 count_clone.fetch_add(1, Ordering::SeqCst);
299 tokio::time::sleep(Duration::new(1, 0)).await;
300 Ok(())
301 })
302 }),
303 )
304 .await
305 .unwrap();
306
307 for i in 0..100 {
308 println!("run_pending {i}");
309 executor.executor.run_pending_jobs().await;
310 tokio::time::sleep(Duration::new(0, 2)).await;
311 }
312
313 println!("run_pending completed");
314 rx.recv().await.unwrap();
315
316 assert_eq!(count.load(Ordering::Relaxed), 1);
317 }
318
319 #[tokio::test]
320 async fn a_running_job_should_not_block_the_executor() {
321 let executor = JobExecutor::new_with_local_tz();
322
323 let (tx, mut rx) = channel(959898);
324
325 let count_1 = Arc::new(AtomicUsize::new(0));
326 let count_1_clone = count_1.clone();
327 let tx_1 = tx.clone();
328 executor
329 .add_job_with_multi_schedule(
330 &[&Duration::new(0, 1)],
331 Job::new("g", "n", None, move || {
332 let count_1_clone = count_1_clone.clone();
333 let tx_1 = tx_1.clone();
334 Box::pin(async move {
335 tx_1.send("").await.unwrap();
336 println!("job 1 - started");
337 count_1_clone.fetch_add(1, Ordering::SeqCst);
338 tokio::time::sleep(Duration::new(1, 0)).await;
339 Ok(())
340 })
341 }),
342 )
343 .await
344 .unwrap();
345
346 let count_2 = Arc::new(AtomicUsize::new(0));
347 let count_2_clone = count_2.clone();
348 let tx_2 = tx.clone();
349 executor
350 .add_job(
351 &Duration::new(0, 1),
352 Job::new("g", "n", None, move || {
353 let count_2_clone = count_2_clone.clone();
354 let tx_2 = tx_2.clone();
355 Box::pin(async move {
356 tx_2.send("").await.unwrap();
357 println!("job 2 - started");
358 count_2_clone.fetch_add(1, Ordering::SeqCst);
359 tokio::time::sleep(Duration::new(1, 0)).await;
360 Ok(())
361 })
362 }),
363 )
364 .await
365 .unwrap();
366
367 let count_3 = Arc::new(AtomicUsize::new(0));
368 let count_3_clone = count_3.clone();
369 let tx_3 = tx.clone();
370 executor
371 .add_job(
372 &Duration::new(0, 1),
373 Job::new("g", "n", None, move || {
374 let count_3_clone = count_3_clone.clone();
375 let tx_3 = tx_3.clone();
376 Box::pin(async move {
377 tx_3.send("").await.unwrap();
378 println!("job 3 - started");
379 count_3_clone.fetch_add(1, Ordering::SeqCst);
380 tokio::time::sleep(Duration::new(1, 0)).await;
381 Ok(())
382 })
383 }),
384 )
385 .await
386 .unwrap();
387
388 let before_millis = Utc::now().timestamp_millis();
389 for i in 0..100 {
390 println!("run_pending {i}");
391 executor.executor.run_pending_jobs().await;
392 tokio::time::sleep(Duration::new(0, 1_000_000)).await;
393 }
394 let after_millis = Utc::now().timestamp_millis();
395
396 assert!((after_millis - before_millis) >= 100);
397 assert!((after_millis - before_millis) < 1000);
398
399 rx.recv().await.unwrap();
400
401 assert_eq!(count_1.load(Ordering::SeqCst), 1);
402 assert_eq!(count_2.load(Ordering::SeqCst), 1);
403 assert_eq!(count_3.load(Ordering::SeqCst), 1);
404 }
405
406 #[tokio::test]
407 async fn should_gracefully_shutdown_the_job_executor() {
408 let executor = JobExecutor::new_with_utc_tz();
409
410 let count = Arc::new(AtomicUsize::new(0));
411
412 let tasks = 100;
413
414 for _i in 0..tasks {
415 let count_clone = count.clone();
416 executor
417 .add_job(
418 &Duration::new(0, 1),
419 Job::new("g", "n", None, move || {
420 let count_clone = count_clone.clone();
421 Box::pin(async move {
422 tokio::time::sleep(Duration::new(1, 0)).await;
423 println!("job - started");
424 count_clone.fetch_add(1, Ordering::SeqCst);
425 Ok(())
426 })
427 }),
428 )
429 .await
430 .unwrap();
431 }
432
433 executor.set_sleep_between_checks(10);
434
435 executor.run().await.unwrap();
436
437 loop {
438 if executor.executor.is_running_job().await {
439 break;
440 }
441 tokio::time::sleep(Duration::from_nanos(1)).await;
442 }
443
444 executor.stop(true).await.unwrap();
445
446 assert_eq!(count.load(Ordering::Relaxed), tasks);
447 }
448
449 #[tokio::test]
450 async fn start_should_fail_if_already_running() {
451 let executor = JobExecutor::new_with_utc_tz();
452 assert!(executor.run().await.is_ok());
453 assert!(executor.run().await.is_err());
454 assert!(executor.stop(false).await.is_ok());
455 }
456
457 #[tokio::test]
458 async fn stop_should_fail_if_not_running() {
459 let executor = JobExecutor::new_with_utc_tz();
460 assert!(executor.stop(false).await.is_err());
461 assert!(executor.run().await.is_ok());
462 assert!(executor.stop(false).await.is_ok());
463 assert!(executor.stop(false).await.is_err());
464 }
465
466 #[tokio::test]
467 async fn should_add_with_explicit_scheduler() {
468 let executor = JobExecutor::new_with_utc_tz();
469 executor
470 .add_job_with_scheduler(Scheduler::Never, Job::new("g", "n", None, move || Box::pin(async { Ok(()) })))
471 .await;
472 }
473
474 #[tokio::test]
475 async fn should_register_a_schedule_by_vec() {
476 let executor = JobExecutor::new_with_utc_tz();
477 executor
478 .add_job(
479 &vec!["0 1 * * * * *"],
480 Job::new("g", "n", None, move || Box::pin(async { Ok(()) })),
481 )
482 .await
483 .unwrap();
484 executor
485 .add_job(
486 &vec!["0 1 * * * * *".to_owned(), "0 1 * * * * *".to_owned()],
487 Job::new("g", "n", None, move || Box::pin(async { Ok(()) })),
488 )
489 .await
490 .unwrap();
491 }
492}