Skip to content

Commit 3b2cb4a

Browse files
committed
Merge branch 'mnoman/AddBatchEP' into mnoman/log_event_notification
# Conflicts: # optimizely/event/event_processor.py
2 parents 4c8b7f3 + 443b7aa commit 3b2cb4a

File tree

1 file changed

+44
-27
lines changed

1 file changed

+44
-27
lines changed

optimizely/event/event_processor.py

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,27 @@
1818
from datetime import timedelta
1919
from six.moves import queue
2020

21-
from .user_event import UserEvent
22-
from .event_factory import EventFactory
2321
from optimizely import logger as _logging
2422
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
2523
from optimizely.helpers import enums
2624
from optimizely.helpers import validator
25+
from .user_event import UserEvent
26+
from .event_factory import EventFactory
2727

2828
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
2929

3030

31-
class EventProcessor(ABC):
32-
""" Class encapsulating event_processor functionality. Override with your own processor
33-
providing process method. """
31+
class BaseEventProcessor(ABC):
32+
""" Class encapsulating event processing. Override with your own implementation. """
3433

3534
@abc.abstractmethod
3635
def process(user_event):
3736
pass
3837

3938

40-
class BatchEventProcessor(EventProcessor):
39+
class BatchEventProcessor(BaseEventProcessor):
4140
"""
42-
BatchEventProcessor is a batched implementation of the EventProcessor.
43-
41+
BatchEventProcessor is a batched implementation of the BaseEventProcessor.
4442
The BatchEventProcessor maintains a single consumer thread that pulls events off of
4543
the blocking queue and buffers them for either a configured batch size or for a
4644
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
@@ -55,22 +53,23 @@ class BatchEventProcessor(EventProcessor):
5553
LOCK = threading.Lock()
5654

5755
def __init__(self,
58-
event_dispatcher,
59-
logger,
60-
start_on_init=False,
61-
event_queue=None,
62-
batch_size=None,
63-
flush_interval=None,
64-
timeout_interval=None,
65-
notification_center=None):
56+
event_dispatcher,
57+
logger,
58+
start_on_init=False,
59+
event_queue=None,
60+
batch_size=None,
61+
flush_interval=None,
62+
timeout_interval=None,
63+
notification_center=None):
6664
""" EventProcessor init method to configure event batching.
65+
6766
Args:
6867
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
6968
logger: Provides a log method to log messages. By default nothing would be logged.
7069
start_on_init: Optional boolean param which starts the consumer thread if set to True.
71-
By default thread does not start unless 'start' method is called.
70+
Default value is False.
7271
event_queue: Optional component which accumulates the events until dispacthed.
73-
batch_size: Optional param which defines the upper limit of the number of events in event_queue after which
72+
batch_size: Optional param which defines the upper limit on the number of events in event_queue after which
7473
the event_queue will be flushed.
7574
flush_interval: Optional floating point number representing time interval in seconds after which event_queue will
7675
be flushed.
@@ -89,9 +88,8 @@ def __init__(self,
8988
self.timeout_interval = timedelta(seconds=timeout_interval) \
9089
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
9190
else self._DEFAULT_TIMEOUT_INTERVAL
92-
9391
self.notification_center = notification_center
94-
self._disposed = False
92+
9593
self._is_started = False
9694
self._current_batch = list()
9795

@@ -100,13 +98,22 @@ def __init__(self,
10098

10199
@property
102100
def is_started(self):
101+
""" Property to check if consumer thread is alive or not. """
103102
return self._is_started
104103

105-
@property
106-
def disposed(self):
107-
return self._disposed
108-
109104
def _validate_intantiation_props(self, prop, prop_name):
105+
""" Method to determine if instantiation properties like batch_size, flush_interval
106+
and timeout_interval are valid.
107+
108+
Args:
109+
prop: Property value that needs to be validated.
110+
prop_name: Property name.
111+
112+
Returns:
113+
False if property value is None or less than 1 or not a finite number.
114+
False if property name is batch_size and value is a floating point number.
115+
True otherwise.
116+
"""
110117
if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \
111118
not validator.is_finite_number(prop):
112119
self.logger.info('Using default value for {}.'.format(prop_name))
@@ -115,13 +122,22 @@ def _validate_intantiation_props(self, prop, prop_name):
115122
return True
116123

117124
def _get_time(self, _time=None):
125+
""" Method to return rounded off time as integer in seconds. If _time is None, uses current time.
126+
127+
Args:
128+
_time: time in seconds that needs to be rounded off.
129+
130+
Returns:
131+
Integer time in seconds.
132+
"""
118133
if _time is None:
119134
return int(round(time.time()))
120135

121136
return int(round(_time))
122137

123138
def start(self):
124-
if self.is_started and not self.disposed:
139+
""" Starts the batch processing thread to batch events. """
140+
if self.is_started:
125141
self.logger.warning('Service already started')
126142
return
127143

@@ -133,7 +149,9 @@ def start(self):
133149
self._is_started = True
134150

135151
def _run(self):
136-
""" Scheduler method that periodically flushes events queue. """
152+
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps
153+
periodically if queue is empty.
154+
"""
137155
try:
138156
while True:
139157
if self._get_time() > self.flushing_interval_deadline:
@@ -143,7 +161,6 @@ def _run(self):
143161
item = self.event_queue.get(True, 0.05)
144162

145163
except queue.Empty:
146-
self.logger.debug('Empty queue, sleeping for 50ms.')
147164
time.sleep(0.05)
148165
continue
149166

0 commit comments

Comments
 (0)