From d7f69ec62b8947ad98626c167ec6f93d226f7ea2 Mon Sep 17 00:00:00 2001 From: "FOLIO3PK\\muhammadnoman" Date: Tue, 20 Aug 2019 20:31:04 +0500 Subject: [PATCH 01/11] Added Log event notification --- optimizely/event/event_processor.py | 14 ++++++++++-- optimizely/helpers/enums.py | 5 +++++ tests/test_event_processor.py | 34 ++++++++++++++++++++++++++++- tests/test_notification_center.py | 17 +++++++++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index f6099685..31ae0c52 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -23,7 +23,7 @@ from optimizely import logger as _logging from optimizely.closeable import Closeable from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher -from optimizely.helpers import validator +from optimizely.helpers import validator, enums ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) @@ -40,6 +40,7 @@ def process(user_event): class BatchEventProcessor(EventProcessor, Closeable): """ BatchEventProcessor is a batched implementation of the EventProcessor. + The BatchEventProcessor maintains a single consumer thread that pulls events off of the blocking queue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the EventDispatcher. @@ -60,7 +61,8 @@ def __init__(self, event_queue=None, batch_size=None, flush_interval=None, - timeout_interval=None): + timeout_interval=None, + notification_center=None): self.event_dispatcher = event_dispatcher or default_event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) @@ -72,6 +74,8 @@ def __init__(self, self.timeout_interval = timedelta(milliseconds=timeout_interval) \ if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ else self._DEFAULT_TIMEOUT_INTERVAL + + self.notification_center = notification_center self._disposed = False self._is_started = False self._current_batch = list() @@ -163,6 +167,12 @@ def _flush_queue(self): log_event = EventFactory.create_log_event(to_process_batch, self.logger) + if self.notification_center is not None: + self.notification_center.send_notifications( + enums.NotificationTypes.LOG_EVENT, + log_event + ) + try: self.event_dispatcher.dispatch_event(log_event) except Exception as e: diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index 1e683fb3..93253716 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -123,8 +123,13 @@ class NotificationTypes(object): TRACK notification listener has the following parameters: str event_key, str user_id, dict attributes (can be None), event_tags (can be None), Event event + + LOG_EVENT notification listener has the following parameter(s): + LogEvent log_event """ ACTIVATE = 'ACTIVATE:experiment, user_id, attributes, variation, event' DECISION = 'DECISION:type, user_id, attributes, decision_info' OPTIMIZELY_CONFIG_UPDATE = 'OPTIMIZELY_CONFIG_UPDATE' TRACK = 'TRACK:event_key, user_id, attributes, event_tags, event' + LOG_EVENT = 'LOG_EVENT:log_event' + diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index dd1e9f3d..de91689b 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -22,6 +22,10 @@ from optimizely.event.event_payload import Decision, Visitor from optimizely.event.user_event_factory import UserEventFactory from optimizely.event.event_processor import BatchEventProcessor +from optimizely.event.log_event import LogEvent +from optimizely.event.user_event_factory import UserEventFactory +from optimizely.helpers import enums +from optimizely.logger import SimpleLogger class CanonicalEvent(object): @@ -116,6 +120,7 @@ def setUp(self, *args, **kwargs): self.event_name = 'test_event' self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) self.optimizely.logger = SimpleLogger() + self.notification_center = self.optimizely.notification_center def tearDown(self): self._event_processor.close() @@ -131,7 +136,8 @@ def _set_event_processor(self, event_dispatcher, logger): self.event_queue, self.MAX_BATCH_SIZE, self.MAX_DURATION_MS, - self.MAX_TIMEOUT_INTERVAL_MS + self.MAX_TIMEOUT_INTERVAL_MS, + self.optimizely.notification_center ) def test_drain_on_close(self): @@ -377,3 +383,29 @@ def test_init__NaN_timeout_interval(self): # default timeout interval is 5s. self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + + def test_notification_center(self): + + mock_event_dispatcher = mock.Mock() + callback_hit = [False] + + def on_log_event(log_event): + self.assertStrictTrue(isinstance(log_event, LogEvent)) + callback_hit[0] = True + + self.optimizely.notification_center.add_notification_listener( + enums.NotificationTypes.LOG_EVENT, on_log_event + ) + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(mock_event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event) + + self._event_processor.close() + + self.assertEqual(True, callback_hit[0]) + self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ + enums.NotificationTypes.LOG_EVENT + ])) diff --git a/tests/test_notification_center.py b/tests/test_notification_center.py index eec1abe6..c1af1762 100644 --- a/tests/test_notification_center.py +++ b/tests/test_notification_center.py @@ -34,6 +34,10 @@ def on_track_listener(*args): pass +def on_log_event_listener(*args): + pass + + class NotificationCenterTest(unittest.TestCase): def test_add_notification_listener__valid_type(self): @@ -59,6 +63,11 @@ def test_add_notification_listener__valid_type(self): 4, test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) ) + self.assertEqual( + 5, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, + on_log_event_listener) + ) + def test_add_notification_listener__multiple_listeners(self): """ Test that multiple listeners of the same type can be successfully added. """ @@ -138,6 +147,7 @@ def another_on_activate_listener(*args): self.assertEqual(2, len(test_notification_center.notification_listeners[enums.NotificationTypes.ACTIVATE])) self.assertEqual(1, len(test_notification_center.notification_listeners[enums.NotificationTypes.DECISION])) self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.TRACK])) + self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.LOG_EVENT])) # Remove one of the activate listeners and assert. self.assertTrue(test_notification_center.remove_notification_listener(3)) @@ -164,6 +174,10 @@ def another_on_activate_listener(*args): 3, test_notification_center.add_notification_listener(enums.NotificationTypes.ACTIVATE, another_on_activate_listener) ) + self.assertEqual( + 4, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, + on_log_event_listener) + ) # Try removing a listener which does not exist. self.assertFalse(test_notification_center.remove_notification_listener(42)) @@ -180,6 +194,7 @@ def test_clear_notification_listeners(self): on_config_update_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) + test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) # Assert all listeners are there: for notification_type in notification_center.NOTIFICATION_TYPES: @@ -210,6 +225,7 @@ def test_clear_all_notification_listeners(self): on_config_update_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) + test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) # Assert all listeners are there: for notification_type in notification_center.NOTIFICATION_TYPES: @@ -219,6 +235,7 @@ def test_clear_all_notification_listeners(self): test_notification_center.clear_all_notification_listeners() for notification_type in notification_center.NOTIFICATION_TYPES: + print(notification_type) self.assertEqual(0, len(test_notification_center.notification_listeners[notification_type])) def set_listener_called_to_true(self): From 8853cb388f699cf61abc30e359a13adc0f949a6b Mon Sep 17 00:00:00 2001 From: "FOLIO3PK\\muhammadnoman" Date: Wed, 21 Aug 2019 11:39:53 +0500 Subject: [PATCH 02/11] fix: fix imports in test_event_processor. --- tests/test_event_processor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index de91689b..3e46f9ef 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -18,9 +18,7 @@ from six.moves import queue from . import base -from optimizely.logger import SimpleLogger from optimizely.event.event_payload import Decision, Visitor -from optimizely.event.user_event_factory import UserEventFactory from optimizely.event.event_processor import BatchEventProcessor from optimizely.event.log_event import LogEvent from optimizely.event.user_event_factory import UserEventFactory From 5934764fb34540bf1cd4b3d43ebee9a598907313 Mon Sep 17 00:00:00 2001 From: "FOLIO3PK\\muhammadnoman" Date: Wed, 21 Aug 2019 11:43:29 +0500 Subject: [PATCH 03/11] fix: linter issues --- optimizely/helpers/enums.py | 1 - 1 file changed, 1 deletion(-) diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index 93253716..38c13316 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -132,4 +132,3 @@ class NotificationTypes(object): OPTIMIZELY_CONFIG_UPDATE = 'OPTIMIZELY_CONFIG_UPDATE' TRACK = 'TRACK:event_key, user_id, attributes, event_tags, event' LOG_EVENT = 'LOG_EVENT:log_event' - From ddef2089e487d0ee4fd2c19bc62c622b064d1d37 Mon Sep 17 00:00:00 2001 From: "mjamal@folio3.com" Date: Fri, 23 Aug 2019 11:09:18 +0500 Subject: [PATCH 04/11] fix: remove print statement. --- tests/test_notification_center.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_notification_center.py b/tests/test_notification_center.py index c1af1762..4ed8ba0d 100644 --- a/tests/test_notification_center.py +++ b/tests/test_notification_center.py @@ -235,7 +235,6 @@ def test_clear_all_notification_listeners(self): test_notification_center.clear_all_notification_listeners() for notification_type in notification_center.NOTIFICATION_TYPES: - print(notification_type) self.assertEqual(0, len(test_notification_center.notification_listeners[notification_type])) def set_listener_called_to_true(self): From e96404865a17f57bce9a8824f8b8fefccab2f7ab Mon Sep 17 00:00:00 2001 From: "mjamal@folio3.com" Date: Fri, 23 Aug 2019 11:09:18 +0500 Subject: [PATCH 05/11] fix: remove print statement. --- tests/test_notification_center.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_notification_center.py b/tests/test_notification_center.py index c1af1762..4ed8ba0d 100644 --- a/tests/test_notification_center.py +++ b/tests/test_notification_center.py @@ -235,7 +235,6 @@ def test_clear_all_notification_listeners(self): test_notification_center.clear_all_notification_listeners() for notification_type in notification_center.NOTIFICATION_TYPES: - print(notification_type) self.assertEqual(0, len(test_notification_center.notification_listeners[notification_type])) def set_listener_called_to_true(self): From cb07348689eeb2f01df291de63323f380425100c Mon Sep 17 00:00:00 2001 From: MariamJamal32 Date: Thu, 12 Sep 2019 17:55:39 +0500 Subject: [PATCH 06/11] fix: address review comments. --- optimizely/event/event_processor.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 0c897695..efaa13e0 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -23,7 +23,8 @@ from optimizely import logger as _logging from optimizely.closeable import Closeable from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher -from optimizely.helpers import validator, enums +from optimizely.helpers import enums +from optimizely.helpers import validator ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) @@ -63,6 +64,20 @@ def __init__(self, flush_interval=None, timeout_interval=None, notification_center=None): + """ EventProcessor init method to configure event batching. + Args: + event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. + logger: Provides a log method to log messages. By default nothing would be logged. + default_start: Optional boolean param which starts the consumer thread if set to True. + By default thread does not start unless 'start' method is called. + event_queue: Optional component which accumulates the events until dispacthed. + batch_size: Optional param which defines the upper limit of the number of events in event_queue after which + the event_queue will be flushed. + flush_interval: Optional param which defines the time in milliseconds after which event_queue will be flushed. + timeout_interval: Optional param which defines the time in milliseconds before joining the consumer + thread. + notification_center: Optional instance of notification_center.NotificationCenter. + """ self.event_dispatcher = event_dispatcher or default_event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) From bb3f738881c532a3fdd1790183ba4b644133c5a3 Mon Sep 17 00:00:00 2001 From: MariamJamal32 Date: Fri, 13 Sep 2019 09:19:44 +0500 Subject: [PATCH 07/11] fix: remove close method from test_notification_center. --- tests/test_event_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index edd91834..f302e698 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -395,7 +395,7 @@ def on_log_event(log_event): user_event = self._build_conversion_event(self.event_name, self.project_config) self._event_processor.process(user_event) - self._event_processor.close() + self._event_processor.stop() self.assertEqual(True, callback_hit[0]) self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ From 1ab8b32ea219b3c92a2fc47246e672caff73b0d9 Mon Sep 17 00:00:00 2001 From: MariamJamal32 Date: Tue, 17 Sep 2019 11:45:56 +0500 Subject: [PATCH 08/11] fix: linting error. --- tests/test_event_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 911f0620..7c986cbc 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -129,8 +129,8 @@ def _set_event_processor(self, event_dispatcher, logger): self.MAX_BATCH_SIZE, self.MAX_DURATION_SEC, self.MAX_TIMEOUT_INTERVAL_SEC, - self.optimizely.notification_center - ) + self.optimizely.notification_center + ) def test_drain_on_stop(self): event_dispatcher = TestEventDispatcher() From 4d4e7d10d25f39abc7dad7b79e15e151b0328c81 Mon Sep 17 00:00:00 2001 From: MariamJamal32 Date: Tue, 17 Sep 2019 19:00:17 +0500 Subject: [PATCH 09/11] update: update documentation for event_processor. --- optimizely/event/event_processor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 10176e66..d23a26c8 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -67,13 +67,14 @@ def __init__(self, Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. logger: Provides a log method to log messages. By default nothing would be logged. - default_start: Optional boolean param which starts the consumer thread if set to True. + start_on_init: Optional boolean param which starts the consumer thread if set to True. By default thread does not start unless 'start' method is called. event_queue: Optional component which accumulates the events until dispacthed. batch_size: Optional param which defines the upper limit of the number of events in event_queue after which the event_queue will be flushed. - flush_interval: Optional param which defines the time in milliseconds after which event_queue will be flushed. - timeout_interval: Optional param which defines the time in milliseconds before joining the consumer + flush_interval: Optional floating point number representing time interval in seconds after which event_queue will + be flushed. + timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer thread. notification_center: Optional instance of notification_center.NotificationCenter. """ From 2c67253786e4af857d134fc859fcaa14f73f8c6f Mon Sep 17 00:00:00 2001 From: MariamJamal32 Date: Wed, 25 Sep 2019 10:03:47 +0500 Subject: [PATCH 10/11] fix: address review comments. --- optimizely/event/event_processor.py | 4 ++++ tests/test_event_processor.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 6b7d7c6b..09c46522 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -95,6 +95,10 @@ def __init__(self, self.notification_center = notification_center self._current_batch = list() + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.notification_center = notification_center.NotificationCenter() + if start_on_init is True: self.start() diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 48eec597..09a758b6 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -376,7 +376,7 @@ def test_init__NaN_timeout_interval(self): self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') - def test_notification_center(self): + def test_notification_center__on_log_event(self): mock_event_dispatcher = mock.Mock() callback_hit = [False] From 9ebf1a4566da46ea099dadc0f25a88dbfe6a3f6e Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Thu, 3 Oct 2019 12:28:09 +0500 Subject: [PATCH 11/11] fix notification center impoer --- optimizely/event/event_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 09c46522..823dd3f6 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -19,6 +19,7 @@ from six.moves import queue from optimizely import logger as _logging +from optimizely import notification_center as _notification_center from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher from optimizely.helpers import enums from optimizely.helpers import validator @@ -97,7 +98,7 @@ def __init__(self, if not validator.is_notification_center_valid(self.notification_center): self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) - self.notification_center = notification_center.NotificationCenter() + self.notification_center = _notification_center.NotificationCenter() if start_on_init is True: self.start()