From 71b849f80a7c85993e6020092c9aa700ac16776e Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 12:26:58 -0800 Subject: [PATCH 01/11] refactor batch_event_processor to reset deadline after it passes. Also, hang on queue with timeout at flush interval --- optimizely/event/event_processor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index db44c041..cc10127c 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -182,12 +182,14 @@ def _run(self): while True: if self._get_time() >= self.flushing_interval_deadline: self._flush_queue() + self.flushing_interval_deadline = self._get_time() + \ + self._get_time(self.flush_interval.total_seconds()) try: - item = self.event_queue.get(False) + interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() + item = self.event_queue.get(True, interval) - except queue.Empty: - time.sleep(0.05) + except item is None: continue if item == self._SHUTDOWN_SIGNAL: From 124fd85ab519911bafb2efc3c57b9cfb385265a9 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 12:32:03 -0800 Subject: [PATCH 02/11] fix lint error --- optimizely/event/event_processor.py | 393 ++++++++++++++-------------- 1 file changed, 196 insertions(+), 197 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index cc10127c..f8b0b60e 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -31,19 +31,19 @@ class BaseEventProcessor(ABC): - """ Class encapsulating event processing. Override with your own implementation. """ + """ Class encapsulating event processing. Override with your own implementation. """ - @abc.abstractmethod - def process(self, user_event): - """ Method to provide intermediary processing stage within event production. + @abc.abstractmethod + def process(self, user_event): + """ Method to provide intermediary processing stage within event production. Args: user_event: UserEvent instance that needs to be processed and dispatched. """ - pass + pass class BatchEventProcessor(BaseEventProcessor): - """ + """ BatchEventProcessor is an implementation of the BaseEventProcessor that batches events. The BatchEventProcessor maintains a single consumer thread that pulls events off of @@ -51,26 +51,26 @@ class BatchEventProcessor(BaseEventProcessor): maximum duration before the resulting LogEvent is sent to the EventDispatcher. """ - _DEFAULT_QUEUE_CAPACITY = 1000 - _DEFAULT_BATCH_SIZE = 10 - _DEFAULT_FLUSH_INTERVAL = 30 - _DEFAULT_TIMEOUT_INTERVAL = 5 - _SHUTDOWN_SIGNAL = object() - _FLUSH_SIGNAL = object() - LOCK = threading.Lock() - - def __init__( - self, - event_dispatcher, - logger=None, - start_on_init=False, - event_queue=None, - batch_size=None, - flush_interval=None, - timeout_interval=None, - notification_center=None, - ): - """ BatchEventProcessor init method to configure event batching. + _DEFAULT_QUEUE_CAPACITY = 1000 + _DEFAULT_BATCH_SIZE = 10 + _DEFAULT_FLUSH_INTERVAL = 30 + _DEFAULT_TIMEOUT_INTERVAL = 5 + _SHUTDOWN_SIGNAL = object() + _FLUSH_SIGNAL = object() + LOCK = threading.Lock() + + def __init__( + self, + event_dispatcher, + logger=None, + start_on_init=False, + event_queue=None, + batch_size=None, + flush_interval=None, + timeout_interval=None, + notification_center=None, + ): + """ BatchEventProcessor init method to configure event batching. Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. @@ -86,44 +86,44 @@ def __init__( thread. notification_center: Optional instance of notification_center.NotificationCenter. """ - self.event_dispatcher = event_dispatcher or default_event_dispatcher - self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) - self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) - self.batch_size = ( - batch_size - if self._validate_instantiation_props(batch_size, 'batch_size', self._DEFAULT_BATCH_SIZE) - else self._DEFAULT_BATCH_SIZE - ) - self.flush_interval = ( - timedelta(seconds=flush_interval) - if self._validate_instantiation_props(flush_interval, 'flush_interval', self._DEFAULT_FLUSH_INTERVAL) - else timedelta(seconds=self._DEFAULT_FLUSH_INTERVAL) - ) - self.timeout_interval = ( - timedelta(seconds=timeout_interval) - if self._validate_instantiation_props(timeout_interval, 'timeout_interval', self._DEFAULT_TIMEOUT_INTERVAL) - else timedelta(seconds=self._DEFAULT_TIMEOUT_INTERVAL) - ) - - self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) - self._current_batch = list() - - if not validator.is_notification_center_valid(self.notification_center): - self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) - self.logger.debug('Creating notification center for use.') - self.notification_center = _notification_center.NotificationCenter(self.logger) - - self.executor = None - if start_on_init is True: - self.start() - - @property - def is_running(self): - """ Property to check if consumer thread is alive or not. """ - return self.executor.isAlive() if self.executor else False - - def _validate_instantiation_props(self, prop, prop_name, default_value): - """ Method to determine if instantiation properties like batch_size, flush_interval + self.event_dispatcher = event_dispatcher or default_event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) + self.batch_size = ( + batch_size + if self._validate_instantiation_props(batch_size, 'batch_size', self._DEFAULT_BATCH_SIZE) + else self._DEFAULT_BATCH_SIZE + ) + self.flush_interval = ( + timedelta(seconds=flush_interval) + if self._validate_instantiation_props(flush_interval, 'flush_interval', self._DEFAULT_FLUSH_INTERVAL) + else timedelta(seconds=self._DEFAULT_FLUSH_INTERVAL) + ) + self.timeout_interval = ( + timedelta(seconds=timeout_interval) + if self._validate_instantiation_props(timeout_interval, 'timeout_interval', self._DEFAULT_TIMEOUT_INTERVAL) + else timedelta(seconds=self._DEFAULT_TIMEOUT_INTERVAL) + ) + + self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) + self._current_batch = list() + + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.logger.debug('Creating notification center for use.') + self.notification_center = _notification_center.NotificationCenter(self.logger) + + self.executor = None + if start_on_init is True: + self.start() + + @property + def is_running(self): + """ Property to check if consumer thread is alive or not. """ + return self.executor.isAlive() if self.executor else False + + def _validate_instantiation_props(self, prop, prop_name, default_value): + """ Method to determine if instantiation properties like batch_size, flush_interval and timeout_interval are valid. Args: @@ -136,21 +136,21 @@ def _validate_instantiation_props(self, prop, prop_name, default_value): False if property name is batch_size and value is a floating point number. True otherwise. """ - is_valid = True + is_valid = True - if prop is None or not validator.is_finite_number(prop) or prop <= 0: - is_valid = False + if prop is None or not validator.is_finite_number(prop) or prop <= 0: + is_valid = False - if prop_name == 'batch_size' and not isinstance(prop, numbers.Integral): - is_valid = False + if prop_name == 'batch_size' and not isinstance(prop, numbers.Integral): + is_valid = False - if is_valid is False: - self.logger.info('Using default value {} for {}.'.format(default_value, prop_name)) + if is_valid is False: + self.logger.info('Using default value {} for {}.'.format(default_value, prop_name)) - return is_valid + return is_valid - def _get_time(self, _time=None): - """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + def _get_time(self, _time=None): + """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. Args: _time: time in seconds that needs to be rounded off. @@ -158,125 +158,124 @@ def _get_time(self, _time=None): Returns: Integer time in seconds. """ - if _time is None: - return int(round(time.time())) + if _time is None: + return int(round(time.time())) - return int(round(_time)) + return int(round(_time)) - def start(self): - """ Starts the batch processing thread to batch events. """ - if hasattr(self, 'executor') and self.is_running: - self.logger.warning('BatchEventProcessor already started.') - return + def start(self): + """ Starts the batch processing thread to batch events. """ + if hasattr(self, 'executor') and self.is_running: + self.logger.warning('BatchEventProcessor already started.') + return - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - self.executor = threading.Thread(target=self._run) - self.executor.setDaemon(True) - self.executor.start() + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.executor = threading.Thread(target=self._run) + self.executor.setDaemon(True) + self.executor.start() - def _run(self): - """ Triggered as part of the thread which batches events or flushes event_queue and sleeps + def _run(self): + """ Triggered as part of the thread which batches events or flushes event_queue and sleeps periodically if queue is empty. """ - try: - while True: - if self._get_time() >= self.flushing_interval_deadline: - self._flush_queue() - self.flushing_interval_deadline = self._get_time() + \ - self._get_time(self.flush_interval.total_seconds()) + try: + while True: + if self._get_time() >= self.flushing_interval_deadline: + self._flush_queue() + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - try: - interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() - item = self.event_queue.get(True, interval) + try: + interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() + item = self.event_queue.get(True, interval) - except item is None: - continue + except item is None: + continue - if item == self._SHUTDOWN_SIGNAL: - self.logger.debug('Received shutdown signal.') - break + if item == self._SHUTDOWN_SIGNAL: + self.logger.debug('Received shutdown signal.') + break - if item == self._FLUSH_SIGNAL: - self.logger.debug('Received flush signal.') - self._flush_queue() - continue + if item == self._FLUSH_SIGNAL: + self.logger.debug('Received flush signal.') + self._flush_queue() + continue - if isinstance(item, UserEvent): - self._add_to_batch(item) + if isinstance(item, UserEvent): + self._add_to_batch(item) - except Exception as exception: - self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) + except Exception as exception: + self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) - finally: - self.logger.info('Exiting processing loop. Attempting to flush pending events.') - self._flush_queue() + finally: + self.logger.info('Exiting processing loop. Attempting to flush pending events.') + self._flush_queue() - def flush(self): - """ Adds flush signal to event_queue. """ + def flush(self): + """ Adds flush signal to event_queue. """ - self.event_queue.put(self._FLUSH_SIGNAL) + self.event_queue.put(self._FLUSH_SIGNAL) - def _flush_queue(self): - """ Flushes event_queue by dispatching events. """ + def _flush_queue(self): + """ Flushes event_queue by dispatching events. """ - if len(self._current_batch) == 0: - return + if len(self._current_batch) == 0: + return - with self.LOCK: - to_process_batch = list(self._current_batch) - self._current_batch = list() + with self.LOCK: + to_process_batch = list(self._current_batch) + self._current_batch = list() - log_event = EventFactory.create_log_event(to_process_batch, self.logger) + log_event = EventFactory.create_log_event(to_process_batch, self.logger) - self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) + self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) - try: - self.event_dispatcher.dispatch_event(log_event) - except Exception as e: - self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception as e: + self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) - def process(self, user_event): - """ Method to process the user_event by putting it in event_queue. + def process(self, user_event): + """ Method to process the user_event by putting it in event_queue. Args: user_event: UserEvent Instance. """ - if not isinstance(user_event, UserEvent): - self.logger.error('Provided event is in an invalid format.') - return + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return - self.logger.debug( - 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) - ) + self.logger.debug( + 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) + ) - try: - self.event_queue.put_nowait(user_event) - except queue.Full: - self.logger.debug( - 'Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())) - ) + try: + self.event_queue.put_nowait(user_event) + except queue.Full: + self.logger.debug( + 'Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())) + ) - def _add_to_batch(self, user_event): - """ Method to append received user event to current batch. + def _add_to_batch(self, user_event): + """ Method to append received user event to current batch. Args: user_event: UserEvent Instance. """ - if self._should_split(user_event): - self._flush_queue() - self._current_batch = list() + if self._should_split(user_event): + self._flush_queue() + self._current_batch = list() - # Reset the deadline if starting a new batch. - if len(self._current_batch) == 0: - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + # Reset the deadline if starting a new batch. + if len(self._current_batch) == 0: + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - with self.LOCK: - self._current_batch.append(user_event) - if len(self._current_batch) >= self.batch_size: - self._flush_queue() + with self.LOCK: + self._current_batch.append(user_event) + if len(self._current_batch) >= self.batch_size: + self._flush_queue() - def _should_split(self, user_event): - """ Method to check if current event batch should split into two. + def _should_split(self, user_event): + """ Method to check if current event batch should split into two. Args: user_event: UserEvent Instance. @@ -286,74 +285,74 @@ def _should_split(self, user_event): revision number and project id respectively. - False, otherwise. """ - if len(self._current_batch) == 0: - return False + if len(self._current_batch) == 0: + return False - current_context = self._current_batch[-1].event_context - new_context = user_event.event_context + current_context = self._current_batch[-1].event_context + new_context = user_event.event_context - if current_context.revision != new_context.revision: - return True + if current_context.revision != new_context.revision: + return True - if current_context.project_id != new_context.project_id: - return True + if current_context.project_id != new_context.project_id: + return True - return False + return False - def stop(self): - """ Stops and disposes batch event processor. """ - self.event_queue.put(self._SHUTDOWN_SIGNAL) - self.logger.warning('Stopping Scheduler.') + def stop(self): + """ Stops and disposes batch event processor. """ + self.event_queue.put(self._SHUTDOWN_SIGNAL) + self.logger.warning('Stopping Scheduler.') - if self.executor: - self.executor.join(self.timeout_interval.total_seconds()) + if self.executor: + self.executor.join(self.timeout_interval.total_seconds()) - if self.is_running: - self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') + if self.is_running: + self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') class ForwardingEventProcessor(BaseEventProcessor): - """ + """ ForwardingEventProcessor serves as the default EventProcessor. The ForwardingEventProcessor sends the LogEvent to EventDispatcher as soon as it is received. """ - def __init__(self, event_dispatcher, logger=None, notification_center=None): - """ ForwardingEventProcessor init method to configure event dispatching. + def __init__(self, event_dispatcher, logger=None, notification_center=None): + """ ForwardingEventProcessor init method to configure event dispatching. Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. logger: Optional component which provides a log method to log messages. By default nothing would be logged. notification_center: Optional instance of notification_center.NotificationCenter. """ - self.event_dispatcher = event_dispatcher - self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) - self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) + self.event_dispatcher = event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) - if not validator.is_notification_center_valid(self.notification_center): - self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) - self.notification_center = _notification_center.NotificationCenter() + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.notification_center = _notification_center.NotificationCenter() - def process(self, user_event): - """ Method to process the user_event by dispatching it. + def process(self, user_event): + """ Method to process the user_event by dispatching it. Args: user_event: UserEvent Instance. """ - if not isinstance(user_event, UserEvent): - self.logger.error('Provided event is in an invalid format.') - return + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return - self.logger.debug( - 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) - ) + self.logger.debug( + 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) + ) - log_event = EventFactory.create_log_event(user_event, self.logger) + log_event = EventFactory.create_log_event(user_event, self.logger) - self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) + self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) - try: - self.event_dispatcher.dispatch_event(log_event) - except Exception as e: - self.logger.exception('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception as e: + self.logger.exception('Error dispatching event: ' + str(log_event) + ' ' + str(e)) From 078f6e787daca6eeb4ef85fe934b14a240914198 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 12:34:15 -0800 Subject: [PATCH 03/11] lint --- optimizely/event/event_processor.py | 392 ++++++++++++++-------------- 1 file changed, 196 insertions(+), 196 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index f8b0b60e..2c910dbc 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -31,19 +31,19 @@ class BaseEventProcessor(ABC): - """ Class encapsulating event processing. Override with your own implementation. """ + """ Class encapsulating event processing. Override with your own implementation. """ - @abc.abstractmethod - def process(self, user_event): - """ Method to provide intermediary processing stage within event production. + @abc.abstractmethod + def process(self, user_event): + """ Method to provide intermediary processing stage within event production. Args: user_event: UserEvent instance that needs to be processed and dispatched. """ - pass + pass class BatchEventProcessor(BaseEventProcessor): - """ + """ BatchEventProcessor is an implementation of the BaseEventProcessor that batches events. The BatchEventProcessor maintains a single consumer thread that pulls events off of @@ -51,26 +51,26 @@ class BatchEventProcessor(BaseEventProcessor): maximum duration before the resulting LogEvent is sent to the EventDispatcher. """ - _DEFAULT_QUEUE_CAPACITY = 1000 - _DEFAULT_BATCH_SIZE = 10 - _DEFAULT_FLUSH_INTERVAL = 30 - _DEFAULT_TIMEOUT_INTERVAL = 5 - _SHUTDOWN_SIGNAL = object() - _FLUSH_SIGNAL = object() - LOCK = threading.Lock() - - def __init__( - self, - event_dispatcher, - logger=None, - start_on_init=False, - event_queue=None, - batch_size=None, - flush_interval=None, - timeout_interval=None, - notification_center=None, - ): - """ BatchEventProcessor init method to configure event batching. + _DEFAULT_QUEUE_CAPACITY = 1000 + _DEFAULT_BATCH_SIZE = 10 + _DEFAULT_FLUSH_INTERVAL = 30 + _DEFAULT_TIMEOUT_INTERVAL = 5 + _SHUTDOWN_SIGNAL = object() + _FLUSH_SIGNAL = object() + LOCK = threading.Lock() + + def __init__( + self, + event_dispatcher, + logger=None, + start_on_init=False, + event_queue=None, + batch_size=None, + flush_interval=None, + timeout_interval=None, + notification_center=None, + ): + """ BatchEventProcessor init method to configure event batching. Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. @@ -86,44 +86,44 @@ def __init__( thread. notification_center: Optional instance of notification_center.NotificationCenter. """ - self.event_dispatcher = event_dispatcher or default_event_dispatcher - self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) - self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) - self.batch_size = ( - batch_size - if self._validate_instantiation_props(batch_size, 'batch_size', self._DEFAULT_BATCH_SIZE) - else self._DEFAULT_BATCH_SIZE - ) - self.flush_interval = ( - timedelta(seconds=flush_interval) - if self._validate_instantiation_props(flush_interval, 'flush_interval', self._DEFAULT_FLUSH_INTERVAL) - else timedelta(seconds=self._DEFAULT_FLUSH_INTERVAL) - ) - self.timeout_interval = ( - timedelta(seconds=timeout_interval) - if self._validate_instantiation_props(timeout_interval, 'timeout_interval', self._DEFAULT_TIMEOUT_INTERVAL) - else timedelta(seconds=self._DEFAULT_TIMEOUT_INTERVAL) - ) - - self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) - self._current_batch = list() - - if not validator.is_notification_center_valid(self.notification_center): - self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) - self.logger.debug('Creating notification center for use.') - self.notification_center = _notification_center.NotificationCenter(self.logger) - - self.executor = None - if start_on_init is True: - self.start() - - @property - def is_running(self): - """ Property to check if consumer thread is alive or not. """ - return self.executor.isAlive() if self.executor else False - - def _validate_instantiation_props(self, prop, prop_name, default_value): - """ Method to determine if instantiation properties like batch_size, flush_interval + self.event_dispatcher = event_dispatcher or default_event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) + self.batch_size = ( + batch_size + if self._validate_instantiation_props(batch_size, 'batch_size', self._DEFAULT_BATCH_SIZE) + else self._DEFAULT_BATCH_SIZE + ) + self.flush_interval = ( + timedelta(seconds=flush_interval) + if self._validate_instantiation_props(flush_interval, 'flush_interval', self._DEFAULT_FLUSH_INTERVAL) + else timedelta(seconds=self._DEFAULT_FLUSH_INTERVAL) + ) + self.timeout_interval = ( + timedelta(seconds=timeout_interval) + if self._validate_instantiation_props(timeout_interval, 'timeout_interval', self._DEFAULT_TIMEOUT_INTERVAL) + else timedelta(seconds=self._DEFAULT_TIMEOUT_INTERVAL) + ) + + self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) + self._current_batch = list() + + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.logger.debug('Creating notification center for use.') + self.notification_center = _notification_center.NotificationCenter(self.logger) + + self.executor = None + if start_on_init is True: + self.start() + + @property + def is_running(self): + """ Property to check if consumer thread is alive or not. """ + return self.executor.isAlive() if self.executor else False + + def _validate_instantiation_props(self, prop, prop_name, default_value): + """ Method to determine if instantiation properties like batch_size, flush_interval and timeout_interval are valid. Args: @@ -136,21 +136,21 @@ def _validate_instantiation_props(self, prop, prop_name, default_value): False if property name is batch_size and value is a floating point number. True otherwise. """ - is_valid = True + is_valid = True - if prop is None or not validator.is_finite_number(prop) or prop <= 0: - is_valid = False + if prop is None or not validator.is_finite_number(prop) or prop <= 0: + is_valid = False - if prop_name == 'batch_size' and not isinstance(prop, numbers.Integral): - is_valid = False + if prop_name == 'batch_size' and not isinstance(prop, numbers.Integral): + is_valid = False - if is_valid is False: - self.logger.info('Using default value {} for {}.'.format(default_value, prop_name)) + if is_valid is False: + self.logger.info('Using default value {} for {}.'.format(default_value, prop_name)) - return is_valid + return is_valid - def _get_time(self, _time=None): - """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + def _get_time(self, _time=None): + """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. Args: _time: time in seconds that needs to be rounded off. @@ -158,124 +158,124 @@ def _get_time(self, _time=None): Returns: Integer time in seconds. """ - if _time is None: - return int(round(time.time())) + if _time is None: + return int(round(time.time())) - return int(round(_time)) + return int(round(_time)) - def start(self): - """ Starts the batch processing thread to batch events. """ - if hasattr(self, 'executor') and self.is_running: - self.logger.warning('BatchEventProcessor already started.') - return + def start(self): + """ Starts the batch processing thread to batch events. """ + if hasattr(self, 'executor') and self.is_running: + self.logger.warning('BatchEventProcessor already started.') + return - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - self.executor = threading.Thread(target=self._run) - self.executor.setDaemon(True) - self.executor.start() + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.executor = threading.Thread(target=self._run) + self.executor.setDaemon(True) + self.executor.start() - def _run(self): - """ Triggered as part of the thread which batches events or flushes event_queue and sleeps + def _run(self): + """ Triggered as part of the thread which batches events or flushes event_queue and sleeps periodically if queue is empty. """ - try: - while True: - if self._get_time() >= self.flushing_interval_deadline: - self._flush_queue() - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - try: - interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() - item = self.event_queue.get(True, interval) + while True: + if self._get_time() >= self.flushing_interval_deadline: + self._flush_queue() + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - except item is None: - continue + try: + interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() + item = self.event_queue.get(True, interval) - if item == self._SHUTDOWN_SIGNAL: - self.logger.debug('Received shutdown signal.') - break + except item is None: + continue - if item == self._FLUSH_SIGNAL: - self.logger.debug('Received flush signal.') - self._flush_queue() - continue + if item == self._SHUTDOWN_SIGNAL: + self.logger.debug('Received shutdown signal.') + break - if isinstance(item, UserEvent): - self._add_to_batch(item) + if item == self._FLUSH_SIGNAL: + self.logger.debug('Received flush signal.') + self._flush_queue() + continue - except Exception as exception: - self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) + if isinstance(item, UserEvent): + self._add_to_batch(item) - finally: - self.logger.info('Exiting processing loop. Attempting to flush pending events.') - self._flush_queue() + except Exception as exception: + self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) - def flush(self): - """ Adds flush signal to event_queue. """ + finally: + self.logger.info('Exiting processing loop. Attempting to flush pending events.') + self._flush_queue() - self.event_queue.put(self._FLUSH_SIGNAL) + def flush(self): + """ Adds flush signal to event_queue. """ - def _flush_queue(self): - """ Flushes event_queue by dispatching events. """ + self.event_queue.put(self._FLUSH_SIGNAL) - if len(self._current_batch) == 0: - return + def _flush_queue(self): + """ Flushes event_queue by dispatching events. """ - with self.LOCK: - to_process_batch = list(self._current_batch) - self._current_batch = list() + if len(self._current_batch) == 0: + return - log_event = EventFactory.create_log_event(to_process_batch, self.logger) + with self.LOCK: + to_process_batch = list(self._current_batch) + self._current_batch = list() - self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) + log_event = EventFactory.create_log_event(to_process_batch, self.logger) - try: - self.event_dispatcher.dispatch_event(log_event) - except Exception as e: - self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) + + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception as e: + self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) - def process(self, user_event): - """ Method to process the user_event by putting it in event_queue. + def process(self, user_event): + """ Method to process the user_event by putting it in event_queue. Args: user_event: UserEvent Instance. """ - if not isinstance(user_event, UserEvent): - self.logger.error('Provided event is in an invalid format.') - return + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return - self.logger.debug( - 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) - ) + self.logger.debug( + 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) + ) - try: - self.event_queue.put_nowait(user_event) - except queue.Full: - self.logger.debug( - 'Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())) - ) + try: + self.event_queue.put_nowait(user_event) + except queue.Full: + self.logger.debug( + 'Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())) + ) - def _add_to_batch(self, user_event): - """ Method to append received user event to current batch. + def _add_to_batch(self, user_event): + """ Method to append received user event to current batch. Args: user_event: UserEvent Instance. """ - if self._should_split(user_event): - self._flush_queue() - self._current_batch = list() + if self._should_split(user_event): + self._flush_queue() + self._current_batch = list() - # Reset the deadline if starting a new batch. - if len(self._current_batch) == 0: - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + # Reset the deadline if starting a new batch. + if len(self._current_batch) == 0: + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) - with self.LOCK: - self._current_batch.append(user_event) - if len(self._current_batch) >= self.batch_size: - self._flush_queue() + with self.LOCK: + self._current_batch.append(user_event) + if len(self._current_batch) >= self.batch_size: + self._flush_queue() - def _should_split(self, user_event): - """ Method to check if current event batch should split into two. + def _should_split(self, user_event): + """ Method to check if current event batch should split into two. Args: user_event: UserEvent Instance. @@ -285,74 +285,74 @@ def _should_split(self, user_event): revision number and project id respectively. - False, otherwise. """ - if len(self._current_batch) == 0: - return False + if len(self._current_batch) == 0: + return False - current_context = self._current_batch[-1].event_context - new_context = user_event.event_context + current_context = self._current_batch[-1].event_context + new_context = user_event.event_context - if current_context.revision != new_context.revision: - return True + if current_context.revision != new_context.revision: + return True - if current_context.project_id != new_context.project_id: - return True + if current_context.project_id != new_context.project_id: + return True - return False + return False - def stop(self): - """ Stops and disposes batch event processor. """ - self.event_queue.put(self._SHUTDOWN_SIGNAL) - self.logger.warning('Stopping Scheduler.') + def stop(self): + """ Stops and disposes batch event processor. """ + self.event_queue.put(self._SHUTDOWN_SIGNAL) + self.logger.warning('Stopping Scheduler.') - if self.executor: - self.executor.join(self.timeout_interval.total_seconds()) + if self.executor: + self.executor.join(self.timeout_interval.total_seconds()) - if self.is_running: - self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') + if self.is_running: + self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') class ForwardingEventProcessor(BaseEventProcessor): - """ + """ ForwardingEventProcessor serves as the default EventProcessor. The ForwardingEventProcessor sends the LogEvent to EventDispatcher as soon as it is received. """ - def __init__(self, event_dispatcher, logger=None, notification_center=None): - """ ForwardingEventProcessor init method to configure event dispatching. + def __init__(self, event_dispatcher, logger=None, notification_center=None): + """ ForwardingEventProcessor init method to configure event dispatching. Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. logger: Optional component which provides a log method to log messages. By default nothing would be logged. notification_center: Optional instance of notification_center.NotificationCenter. """ - self.event_dispatcher = event_dispatcher - self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) - self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) + self.event_dispatcher = event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) - if not validator.is_notification_center_valid(self.notification_center): - self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) - self.notification_center = _notification_center.NotificationCenter() + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.notification_center = _notification_center.NotificationCenter() - def process(self, user_event): - """ Method to process the user_event by dispatching it. + def process(self, user_event): + """ Method to process the user_event by dispatching it. Args: user_event: UserEvent Instance. """ - if not isinstance(user_event, UserEvent): - self.logger.error('Provided event is in an invalid format.') - return + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return - self.logger.debug( - 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) - ) + self.logger.debug( + 'Received event of type {} for user {}.'.format(type(user_event).__name__, user_event.user_id) + ) - log_event = EventFactory.create_log_event(user_event, self.logger) + log_event = EventFactory.create_log_event(user_event, self.logger) - self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) + self.notification_center.send_notifications(enums.NotificationTypes.LOG_EVENT, log_event) - try: - self.event_dispatcher.dispatch_event(log_event) - except Exception as e: - self.logger.exception('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception as e: + self.logger.exception('Error dispatching event: ' + str(log_event) + ' ' + str(e)) From c66db1b37638229a021a9d0f9a23fec2f3a44907 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 12:43:28 -0800 Subject: [PATCH 04/11] fix lint error --- optimizely/event/event_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 2c910dbc..de8c1718 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -182,7 +182,8 @@ def _run(self): while True: if self._get_time() >= self.flushing_interval_deadline: self._flush_queue() - self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.flushing_interval_deadline = self._get_time() + \ + self._get_time(self.flush_interval.total_seconds()) try: interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() From a9a15accd8c09cd36be68d3e8bc058aaa4be97de Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 14:47:06 -0800 Subject: [PATCH 05/11] finally got to debug replacing the mock logger --- optimizely/event/event_processor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index de8c1718..9175b3db 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -186,10 +186,13 @@ def _run(self): self._get_time(self.flush_interval.total_seconds()) try: - interval = self._get_time(self.flush_interval.total_seconds()) - self._get_time() + interval = self.flushing_interval_deadline - self._get_time() item = self.event_queue.get(True, interval) - except item is None: + if item is None: + continue + + except queue.Empty: continue if item == self._SHUTDOWN_SIGNAL: From c681c1cfa16fe7c8f88c120e98ff4efd332ef861 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 17:05:27 -0800 Subject: [PATCH 06/11] update to take time in float --- optimizely/event/event_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 9175b3db..07a45046 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -159,9 +159,9 @@ def _get_time(self, _time=None): Integer time in seconds. """ if _time is None: - return int(round(time.time())) + return time.time() - return int(round(_time)) + return _time def start(self): """ Starts the batch processing thread to batch events. """ From d7b09eb46154a0adba31a099ba17b70b768de566 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 17:32:52 -0800 Subject: [PATCH 07/11] add unit tests for float flush deadline and flush interval --- tests/test_event_processor.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index e16032fe..1dd8611f 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -339,6 +339,41 @@ def test_init__invalid_flush_interval(self): self.assertEqual(datetime.timedelta(seconds=30), self.event_processor.flush_interval) mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') + def test_init__float_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0.5, + self.MAX_TIMEOUT_INTERVAL_SEC, + ) + + # default flush interval is 30s. + self.assertEqual(0.5, self.event_processor.flush_interval) + mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') + + def test_init__float_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0.5, + self.MAX_TIMEOUT_INTERVAL_SEC, + ) + + # default flush interval is 30s. + self.assertTrue(isinstance(self.event_processor.flushing_interval_deadline, float)) + def test_init__bool_flush_interval(self): event_dispatcher = TestEventDispatcher() From 1aebafda7bfe65260412d221567d1aec175da437 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 17:41:11 -0800 Subject: [PATCH 08/11] fix broken test --- tests/test_event_processor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 1dd8611f..cbd8a644 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -354,10 +354,9 @@ def test_init__float_flush_interval(self): ) # default flush interval is 30s. - self.assertEqual(0.5, self.event_processor.flush_interval) - mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') + self.assertEqual(datetime.timedelta(seconds=0.5), self.event_processor.flush_interval) - def test_init__float_flush_interval(self): + def test_init__float_flush_deadline(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: From c772b3e00f45bd2cc995f51e100ce9cf8faa935b Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 13 Dec 2019 11:44:04 -0800 Subject: [PATCH 09/11] update method description --- optimizely/event/event_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 07a45046..b80eed59 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -150,13 +150,13 @@ def _validate_instantiation_props(self, prop, prop_name, default_value): return is_valid def _get_time(self, _time=None): - """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + """ Method to return time as float in seconds. If _time is None, uses current time. Args: - _time: time in seconds that needs to be rounded off. + _time: time in seconds. Returns: - Integer time in seconds. + Float time in seconds. """ if _time is None: return time.time() From 0e1276d9f8463adafd2cc865ff233f133dbce83a Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 13 Dec 2019 14:04:46 -0800 Subject: [PATCH 10/11] added a unit test to make sure processor is called once during flush interval --- optimizely/event/event_processor.py | 1 + tests/test_event_processor.py | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index b80eed59..3f82a7fe 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -184,6 +184,7 @@ def _run(self): self._flush_queue() self.flushing_interval_deadline = self._get_time() + \ self._get_time(self.flush_interval.total_seconds()) + self.logger.debug('Flush interval deadline. Flushed queue.') try: interval = self.flushing_interval_deadline - self._get_time() diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index cbd8a644..b5c97b83 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -173,6 +173,30 @@ def test_flush_on_max_timeout(self): self.assertStrictTrue(event_dispatcher.compare_events()) self.assertEqual(0, self.event_processor.event_queue.qsize()) + def test_flush_once_max_timeout(self): + event_dispatcher = TestEventDispatcher() + + self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG) + + mock_debug = mock.patch.object(self.optimizely.logger, 'debug') + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self.event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.75) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) + self.assertTrue(mock_config_logging.debug.called) + mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.') + mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.') + self.assertTrue(mock_config_logging.debug.call_count == 2) + self.optimizely.logger = SimpleLogger() + def test_flush_max_batch_size(self): event_dispatcher = TestEventDispatcher() From 94f52e1610f3a1c1cc5ec58278f11f24fb15c36f Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 13 Dec 2019 14:08:46 -0800 Subject: [PATCH 11/11] lint error --- tests/test_event_processor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index b5c97b83..a8a954f4 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -178,8 +178,6 @@ def test_flush_once_max_timeout(self): self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG) - mock_debug = mock.patch.object(self.optimizely.logger, 'debug') - with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: self._set_event_processor(event_dispatcher, mock_config_logging)