@@ -3,10 +3,10 @@ use crate::matrix::MatrixClient;
33use crate :: webhook:: Alert ;
44use crate :: { unix_time, AlertId , Result } ;
55use actix:: prelude:: * ;
6- use std:: sync:: Arc ;
6+ use std:: sync:: { Arc , Mutex } ;
77use std:: time:: Duration ;
88
9- const CRON_JON_INTERVAL : u64 = 5 ;
9+ const CRON_JOB_INTERVAL : u64 = 5 ;
1010
1111#[ derive( Debug , Clone , Eq , PartialEq , Serialize , Deserialize ) ]
1212pub struct AlertContext {
@@ -103,6 +103,8 @@ pub struct Processor {
103103 db : Option < Arc < Database > > ,
104104 escalation_window : u64 ,
105105 should_escalate : bool ,
106+ // Ensures that only one escalation task is running at the time.
107+ escalation_lock : Arc < Mutex < ( ) > > ,
106108}
107109
108110impl Processor {
@@ -111,6 +113,7 @@ impl Processor {
111113 db : db. map ( |db| Arc :: new ( db) ) ,
112114 escalation_window : escalation_window,
113115 should_escalate : should_escalate,
116+ escalation_lock : Default :: default ( ) ,
114117 }
115118 }
116119 fn db ( & self ) -> Arc < Database > {
@@ -159,15 +162,25 @@ impl Actor for Processor {
159162 Result :: < ( ) > :: Ok ( ( ) )
160163 } ;
161164
165+ let lock = Arc :: clone ( & self . escalation_lock ) ;
162166 ctx. run_interval (
163- Duration :: from_secs ( CRON_JON_INTERVAL ) ,
167+ Duration :: from_secs ( CRON_JOB_INTERVAL ) ,
164168 move |_proc, _ctx| {
169+ // Acquire new handles for async task.
165170 let db = Arc :: clone ( & db) ;
171+ let lock = Arc :: clone ( & lock) ;
166172
167173 actix:: spawn ( async move {
168- match local ( db, escalation_window) . await {
169- Ok ( _) => { }
170- Err ( err) => error ! ( "{:?}" , err) ,
174+ // Immediately exits if the lock cannot be acquired.
175+ if let Ok ( locked) = lock. try_lock ( ) {
176+ // Lock acquired and will remain locked until
177+ // `_l` goes out of scope.
178+ let _l = locked;
179+
180+ match local ( db, escalation_window) . await {
181+ Ok ( _) => { }
182+ Err ( err) => error ! ( "{:?}" , err) ,
183+ }
171184 }
172185 } ) ;
173186 } ,
0 commit comments