1use std::{future::Future, pin::Pin, sync::Arc};
2
3use chrono::{DateTime, Utc};
4use tokio::sync::{Mutex, RwLock};
5#[cfg(feature = "tracing")]
6use tracing::*;
7
8use crate::{error::SchedulerError, scheduler::Scheduler};
9
10pub struct JobScheduler {
11 pub job: Job,
12 schedule: Mutex<Scheduler>,
13 next_run_at: Mutex<Option<DateTime<Utc>>>,
14 last_run_at: Mutex<Option<DateTime<Utc>>>,
15}
16
17impl JobScheduler {
18 pub fn new(mut schedule: Scheduler, job: Job) -> Self {
19 let next_run_at = schedule.next(&Utc::now());
21 JobScheduler {
22 job,
23 schedule: Mutex::new(schedule),
24 next_run_at: Mutex::new(next_run_at),
25 last_run_at: Mutex::new(None),
26 }
27 }
28
29 pub async fn is_pending(&self) -> bool {
31 if !self.job.is_active {
33 return false;
34 }
35
36 if let Some(next_run_at) = self.next_run_at.lock().await.as_ref() {
38 *next_run_at < Utc::now()
39 } else {
40 false
41 }
42 }
43
44 pub async fn run(&self) -> Result<(), SchedulerError> {
46 let run_result = self.job.run().await;
48
49 let now = Utc::now();
50
51 let mut schedule = self.schedule.lock().await;
52
53 let mut next_run_at = self.next_run_at.lock().await;
55 *next_run_at = schedule.next(&now);
56
57 let mut last_run_at = self.last_run_at.lock().await;
59 *last_run_at = Some(now);
60
61 run_result
62 }
63}
64
65pub type JobFn = dyn 'static
66 + Send
67 + Sync
68 + Fn() -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>>;
69
70pub struct Job {
71 function: Arc<JobFn>,
72 group: String,
73 name: String,
74 is_active: bool,
75 is_running: RwLock<bool>,
76 retries_after_failure: Option<u64>,
77}
78
79impl Job {
80 pub fn new<
81 G: Into<String>,
82 N: Into<String>,
83 F: 'static
84 + Send
85 + Sync
86 + Fn() -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>>,
87 >(
88 group: G,
89 name: N,
90 retries_after_failure: Option<u64>,
91 function: F,
92 ) -> Self {
93 Job {
94 function: Arc::new(function),
95 name: name.into(),
96 group: group.into(),
97 retries_after_failure,
98 is_running: RwLock::new(false),
99 is_active: true,
100 }
101 }
102
103 pub async fn is_running(&self) -> bool {
105 let read = self.is_running.read().await;
106 *read
107 }
108
109 pub fn name(&self) -> &str {
110 &self.name
111 }
112
113 pub fn group(&self) -> &str {
114 &self.group
115 }
116
117 pub async fn run(&self) -> Result<(), SchedulerError> {
119 self.set_running(true).await?;
120
121 let mut run_result = self.exec().await;
123
124 if let Some(retries) = self.retries_after_failure {
125 for attempt in 1..=retries {
126 if let Err(e) = run_result {
127 #[cfg(feature = "tracing")]
128 warn!(
129 "Execution failed for job [{}/{}] - Retry execution, attempt {}/{}. Previous err: {}",
130 self.group, self.name, attempt, retries, e
131 );
132 run_result = self.exec().await;
133 } else {
134 break;
135 }
136 }
137 }
138
139 self.set_running(false).await?;
140
141 run_result.map_err(|err| SchedulerError::JobExecutionError {
142 source: err,
143 })
144 }
145
146 async fn exec(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
147 let function = self.function.clone();
148 (function)().await
149 }
150
151 async fn set_running(&self, is_running: bool) -> Result<(), SchedulerError> {
152 let mut write = self.is_running.write().await;
153
154 if is_running.eq(&*write) {
155 return Err(SchedulerError::JobLockError {
156 message: format!(
157 "Wrong Job status found for job [{}/{}]. Expected: {}",
158 self.group, self.name, !is_running
159 ),
160 });
161 }
162
163 *write = is_running;
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169pub mod test {
170
171 use std::{sync::Arc, time::Duration};
172
173 use tokio::sync::mpsc::channel;
174
175 use super::*;
176
177 #[tokio::test]
178 async fn should_be_running() {
179 let lock = Arc::new(Mutex::new(true));
180 let lock_clone = lock.clone();
181 let (tx, mut rx) = channel(10000);
182 let tx_clone = tx.clone();
183
184 let job_scheduler = Arc::new(JobScheduler::new(
185 Scheduler::Interval {
186 interval_duration: Duration::new(1, 0),
187 execute_at_startup: false,
188 },
189 Job::new("g", "n", None, move || {
190 let lock_clone = lock_clone.clone();
191 let tx_clone = tx_clone.clone();
192 Box::pin(async move {
193 println!("job - started");
194 tx_clone.send("").await.unwrap();
195 println!("job - Trying to get the lock");
196 let _lock = lock_clone.lock().await;
197 println!("job - lock acquired");
198 Ok(())
199 })
200 }),
201 ));
202
203 assert!(!job_scheduler.job.is_running().await);
204
205 {
206 let _lock = lock.lock().await;
207 let job_clone = job_scheduler.clone();
208 tokio::spawn(async move {
209 println!("starting job");
210 job_clone.run().await.unwrap();
211 println!("end job execution");
212 tx.send("").await.unwrap();
213 });
214 rx.recv().await.unwrap();
215 assert!(job_scheduler.job.is_running().await);
216 }
217
218 rx.recv().await.unwrap();
219 assert!(!job_scheduler.job.is_running().await);
220 }
221
222 #[tokio::test]
223 async fn job_should_not_retry_run_if_ok() {
224 let lock = Arc::new(Mutex::new(0));
225 let lock_clone = lock.clone();
226
227 let max_retries = 12;
228
229 let job = Job::new("g", "n", Some(max_retries), move || {
230 let lock_clone = lock_clone.clone();
231 Box::pin(async move {
232 println!("job - started");
233 println!("job - Trying to get the lock");
234 let mut lock = lock_clone.lock().await;
235 let count = *lock;
236 *lock = count + 1;
237 println!("job - count {count}");
238 Ok(())
239 })
240 });
241
242 let result = job.run().await;
243
244 assert!(result.is_ok());
245
246 let lock = lock.lock().await;
247 let count = *lock;
248 assert_eq!(1, count);
249 }
250
251 #[tokio::test]
252 async fn job_should_retry_run_if_error() {
253 let lock = Arc::new(Mutex::new(0));
254 let lock_clone = lock.clone();
255
256 let max_retries = 12;
257
258 let job = Job::new("g", "n", Some(max_retries), move || {
259 let lock_clone = lock_clone.clone();
260 Box::pin(async move {
261 println!("job - started");
262 println!("job - Trying to get the lock");
263 let mut lock = lock_clone.lock().await;
264 let count = *lock;
265 *lock = count + 1;
266 println!("job - count {count}");
267 Err(SchedulerError::JobLockError {
268 message: "".to_owned(),
269 })?
270 })
271 });
272
273 let result = job.run().await;
274
275 assert!(result.is_err());
276
277 let lock = lock.lock().await;
278 let count = *lock;
279 assert_eq!(max_retries + 1, count);
280 }
281
282 #[tokio::test]
283 async fn job_should_stop_retrying_run_if_attempt_succeed() {
284 let lock = Arc::new(Mutex::new(0));
285 let lock_clone = lock.clone();
286
287 let succeed_at = 7;
288 let max_retries = 12;
289
290 let job = Job::new("g", "n", Some(max_retries), move || {
291 let lock_clone = lock_clone.clone();
292 Box::pin(async move {
293 println!("job - started");
294 println!("job - Trying to get the lock");
295 let mut lock = lock_clone.lock().await;
296 let count = *lock;
297 *lock = count + 1;
298 println!("job - count {count}");
299
300 if count == succeed_at {
301 Ok(())
302 } else {
303 Err(SchedulerError::JobLockError {
304 message: "".to_owned(),
305 })?
306 }
307 })
308 });
309
310 let result = job.run().await;
311
312 assert!(result.is_ok());
313
314 let lock = lock.lock().await;
315 let count = *lock;
316 assert_eq!(succeed_at + 1, count);
317 }
318}