@@ -34,7 +34,7 @@ class BaseEventProcessor(ABC):
34
34
""" Class encapsulating event processing. Override with your own implementation. """
35
35
36
36
@abc .abstractmethod
37
- def process (user_event ):
37
+ def process (self , user_event ):
38
38
""" Method to provide intermediary processing stage within event production.
39
39
Args:
40
40
user_event: UserEvent instance that needs to be processed and dispatched.
@@ -45,33 +45,34 @@ def process(user_event):
45
45
class BatchEventProcessor (BaseEventProcessor ):
46
46
"""
47
47
BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.
48
+
48
49
The BatchEventProcessor maintains a single consumer thread that pulls events off of
49
50
the blocking queue and buffers them for either a configured batch size or for a
50
51
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
51
52
"""
52
53
53
54
_DEFAULT_QUEUE_CAPACITY = 1000
54
55
_DEFAULT_BATCH_SIZE = 10
55
- _DEFAULT_FLUSH_INTERVAL = timedelta ( seconds = 30 )
56
- _DEFAULT_TIMEOUT_INTERVAL = timedelta ( seconds = 5 )
56
+ _DEFAULT_FLUSH_INTERVAL = 30
57
+ _DEFAULT_TIMEOUT_INTERVAL = 5
57
58
_SHUTDOWN_SIGNAL = object ()
58
59
_FLUSH_SIGNAL = object ()
59
60
LOCK = threading .Lock ()
60
61
61
62
def __init__ (self ,
62
63
event_dispatcher ,
63
- logger ,
64
+ logger = None ,
64
65
start_on_init = False ,
65
66
event_queue = None ,
66
67
batch_size = None ,
67
68
flush_interval = None ,
68
69
timeout_interval = None ,
69
70
notification_center = None ):
70
- """ EventProcessor init method to configure event batching.
71
+ """ BatchEventProcessor init method to configure event batching.
71
72
72
73
Args:
73
74
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
74
- logger: Provides a log method to log messages. By default nothing would be logged.
75
+ logger: Optional component which provides a log method to log messages. By default nothing would be logged.
75
76
start_on_init: Optional boolean param which starts the consumer thread if set to True.
76
77
Default value is False.
77
78
event_queue: Optional component which accumulates the events until dispacthed.
@@ -86,20 +87,28 @@ def __init__(self,
86
87
self .event_dispatcher = event_dispatcher or default_event_dispatcher
87
88
self .logger = _logging .adapt_logger (logger or _logging .NoOpLogger ())
88
89
self .event_queue = event_queue or queue .Queue (maxsize = self ._DEFAULT_QUEUE_CAPACITY )
89
- self .batch_size = batch_size if self ._validate_intantiation_props (batch_size , 'batch_size' ) \
90
+ self .batch_size = batch_size if self ._validate_instantiation_props (batch_size ,
91
+ 'batch_size' ,
92
+ self ._DEFAULT_BATCH_SIZE ) \
90
93
else self ._DEFAULT_BATCH_SIZE
91
94
self .flush_interval = timedelta (seconds = flush_interval ) \
92
- if self ._validate_intantiation_props (flush_interval , 'flush_interval' ) \
93
- else self ._DEFAULT_FLUSH_INTERVAL
95
+ if self ._validate_instantiation_props (flush_interval ,
96
+ 'flush_interval' ,
97
+ self ._DEFAULT_FLUSH_INTERVAL ) \
98
+ else timedelta (self ._DEFAULT_FLUSH_INTERVAL )
94
99
self .timeout_interval = timedelta (seconds = timeout_interval ) \
95
- if self ._validate_intantiation_props (timeout_interval , 'timeout_interval' ) \
96
- else self ._DEFAULT_TIMEOUT_INTERVAL
97
- self .notification_center = notification_center
100
+ if self ._validate_instantiation_props (timeout_interval ,
101
+ 'timeout_interval' ,
102
+ self ._DEFAULT_TIMEOUT_INTERVAL ) \
103
+ else timedelta (self ._DEFAULT_TIMEOUT_INTERVAL )
104
+
105
+ self .notification_center = notification_center or _notification_center .NotificationCenter (self .logger )
98
106
self ._current_batch = list ()
99
107
100
108
if not validator .is_notification_center_valid (self .notification_center ):
101
109
self .logger .error (enums .Errors .INVALID_INPUT .format ('notification_center' ))
102
- self .notification_center = _notification_center .NotificationCenter ()
110
+ self .logger .debug ('Creating notification center for use.' )
111
+ self .notification_center = _notification_center .NotificationCenter (self .logger )
103
112
104
113
self .executor = None
105
114
if start_on_init is True :
@@ -110,13 +119,14 @@ def is_running(self):
110
119
""" Property to check if consumer thread is alive or not. """
111
120
return self .executor .isAlive () if self .executor else False
112
121
113
- def _validate_intantiation_props (self , prop , prop_name ):
122
+ def _validate_instantiation_props (self , prop , prop_name , default_value ):
114
123
""" Method to determine if instantiation properties like batch_size, flush_interval
115
124
and timeout_interval are valid.
116
125
117
126
Args:
118
127
prop: Property value that needs to be validated.
119
128
prop_name: Property name.
129
+ default_value: Default value for property.
120
130
121
131
Returns:
122
132
False if property value is None or less than or equal to 0 or not a finite number.
@@ -132,7 +142,7 @@ def _validate_intantiation_props(self, prop, prop_name):
132
142
is_valid = False
133
143
134
144
if is_valid is False :
135
- self .logger .info ('Using default value for {}.' .format (prop_name ))
145
+ self .logger .info ('Using default value {} for {}.' .format (default_value , prop_name ))
136
146
137
147
return is_valid
138
148
@@ -213,11 +223,10 @@ def _flush_queue(self):
213
223
214
224
log_event = EventFactory .create_log_event (to_process_batch , self .logger )
215
225
216
- if self .notification_center is not None :
217
- self .notification_center .send_notifications (
218
- enums .NotificationTypes .LOG_EVENT ,
219
- log_event
220
- )
226
+ self .notification_center .send_notifications (
227
+ enums .NotificationTypes .LOG_EVENT ,
228
+ log_event
229
+ )
221
230
222
231
try :
223
232
self .event_dispatcher .dispatch_event (log_event )
@@ -226,14 +235,17 @@ def _flush_queue(self):
226
235
227
236
def process (self , user_event ):
228
237
""" Method to process the user_event by putting it in event_queue.
238
+
229
239
Args:
230
240
user_event: UserEvent Instance.
231
241
"""
232
242
if not isinstance (user_event , UserEvent ):
233
243
self .logger .error ('Provided event is in an invalid format.' )
234
244
return
235
245
236
- self .logger .debug ('Received user_event: ' + str (user_event ))
246
+ self .logger .debug ('Received event of type {} for user {}.' .format (
247
+ type (user_event ).__name__ , user_event .user_id )
248
+ )
237
249
238
250
try :
239
251
self .event_queue .put_nowait (user_event )
@@ -242,6 +254,7 @@ def process(self, user_event):
242
254
243
255
def _add_to_batch (self , user_event ):
244
256
""" Method to append received user event to current batch.
257
+
245
258
Args:
246
259
user_event: UserEvent Instance.
247
260
"""
@@ -261,9 +274,11 @@ def _add_to_batch(self, user_event):
261
274
262
275
def _should_split (self , user_event ):
263
276
""" Method to check if current event batch should split into two.
277
+
264
278
Args:
265
279
user_event: UserEvent Instance.
266
- Return Value:
280
+
281
+ Returns:
267
282
- True, if revision number and project_id of last event in current batch do not match received event's
268
283
revision number and project id respectively.
269
284
- False, otherwise.
@@ -311,30 +326,32 @@ def __init__(self, event_dispatcher, logger=None, notification_center=None):
311
326
"""
312
327
self .event_dispatcher = event_dispatcher
313
328
self .logger = _logging .adapt_logger (logger or _logging .NoOpLogger ())
314
- self .notification_center = notification_center
329
+ self .notification_center = notification_center or _notification_center . NotificationCenter ( self . logger )
315
330
316
331
if not validator .is_notification_center_valid (self .notification_center ):
317
332
self .logger .error (enums .Errors .INVALID_INPUT .format ('notification_center' ))
318
333
self .notification_center = _notification_center .NotificationCenter ()
319
334
320
335
def process (self , user_event ):
321
336
""" Method to process the user_event by dispatching it.
337
+
322
338
Args:
323
339
user_event: UserEvent Instance.
324
340
"""
325
341
if not isinstance (user_event , UserEvent ):
326
342
self .logger .error ('Provided event is in an invalid format.' )
327
343
return
328
344
329
- self .logger .debug ('Received user_event: ' + str (user_event ))
345
+ self .logger .debug ('Received event of type {} for user {}.' .format (
346
+ type (user_event ).__name__ , user_event .user_id )
347
+ )
330
348
331
349
log_event = EventFactory .create_log_event (user_event , self .logger )
332
350
333
- if self .notification_center is not None :
334
- self .notification_center .send_notifications (
335
- enums .NotificationTypes .LOG_EVENT ,
336
- log_event
337
- )
351
+ self .notification_center .send_notifications (
352
+ enums .NotificationTypes .LOG_EVENT ,
353
+ log_event
354
+ )
338
355
339
356
try :
340
357
self .event_dispatcher .dispatch_event (log_event )
0 commit comments