diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index db81dbc6..823dd3f6 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -19,7 +19,9 @@ from six.moves import queue from optimizely import logger as _logging +from optimizely import notification_center as _notification_center from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher +from optimizely.helpers import enums from optimizely.helpers import validator from .event_factory import EventFactory from .user_event import UserEvent @@ -62,8 +64,10 @@ def __init__(self, event_queue=None, batch_size=None, flush_interval=None, - timeout_interval=None): - """ BatchEventProcessor init method to configure event batching. + timeout_interval=None, + notification_center=None): + """ EventProcessor init method to configure event batching. + Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. logger: Provides a log method to log messages. By default nothing would be logged. @@ -76,6 +80,7 @@ def __init__(self, be flushed. timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer thread. + notification_center: Optional instance of notification_center.NotificationCenter. """ self.event_dispatcher = event_dispatcher or default_event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) @@ -88,8 +93,13 @@ def __init__(self, self.timeout_interval = timedelta(seconds=timeout_interval) \ if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ else self._DEFAULT_TIMEOUT_INTERVAL + self.notification_center = notification_center self._current_batch = list() + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.notification_center = _notification_center.NotificationCenter() + if start_on_init is True: self.start() @@ -195,6 +205,12 @@ def _flush_queue(self): log_event = EventFactory.create_log_event(to_process_batch, self.logger) + if self.notification_center is not None: + self.notification_center.send_notifications( + enums.NotificationTypes.LOG_EVENT, + log_event + ) + try: self.event_dispatcher.dispatch_event(log_event) except Exception as e: diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index 73ecfe54..893538ca 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -121,8 +121,12 @@ class NotificationTypes(object): TRACK notification listener has the following parameters: str event_key, str user_id, dict attributes (can be None), event_tags (can be None), Event event + + LOG_EVENT notification listener has the following parameter(s): + LogEvent log_event """ ACTIVATE = 'ACTIVATE:experiment, user_id, attributes, variation, event' DECISION = 'DECISION:type, user_id, attributes, decision_info' OPTIMIZELY_CONFIG_UPDATE = 'OPTIMIZELY_CONFIG_UPDATE' TRACK = 'TRACK:event_key, user_id, attributes, event_tags, event' + LOG_EVENT = 'LOG_EVENT:log_event' diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 2e6f0442..09a758b6 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -17,10 +17,12 @@ from six.moves import queue from . import base -from optimizely.logger import SimpleLogger from optimizely.event.payload import Decision, Visitor -from optimizely.event.user_event_factory import UserEventFactory from optimizely.event.event_processor import BatchEventProcessor +from optimizely.event.log_event import LogEvent +from optimizely.event.user_event_factory import UserEventFactory +from optimizely.helpers import enums +from optimizely.logger import SimpleLogger class CanonicalEvent(object): @@ -110,6 +112,7 @@ def setUp(self, *args, **kwargs): self.event_name = 'test_event' self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) self.optimizely.logger = SimpleLogger() + self.notification_center = self.optimizely.notification_center def tearDown(self): self._event_processor.stop() @@ -125,7 +128,8 @@ def _set_event_processor(self, event_dispatcher, logger): self.event_queue, self.MAX_BATCH_SIZE, self.MAX_DURATION_SEC, - self.MAX_TIMEOUT_INTERVAL_SEC + self.MAX_TIMEOUT_INTERVAL_SEC, + self.optimizely.notification_center ) def test_drain_on_stop(self): @@ -371,3 +375,29 @@ def test_init__NaN_timeout_interval(self): # default timeout interval is 5s. self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + + def test_notification_center__on_log_event(self): + + mock_event_dispatcher = mock.Mock() + callback_hit = [False] + + def on_log_event(log_event): + self.assertStrictTrue(isinstance(log_event, LogEvent)) + callback_hit[0] = True + + self.optimizely.notification_center.add_notification_listener( + enums.NotificationTypes.LOG_EVENT, on_log_event + ) + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(mock_event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event) + + self._event_processor.stop() + + self.assertEqual(True, callback_hit[0]) + self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ + enums.NotificationTypes.LOG_EVENT + ])) diff --git a/tests/test_notification_center.py b/tests/test_notification_center.py index eec1abe6..4ed8ba0d 100644 --- a/tests/test_notification_center.py +++ b/tests/test_notification_center.py @@ -34,6 +34,10 @@ def on_track_listener(*args): pass +def on_log_event_listener(*args): + pass + + class NotificationCenterTest(unittest.TestCase): def test_add_notification_listener__valid_type(self): @@ -59,6 +63,11 @@ def test_add_notification_listener__valid_type(self): 4, test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) ) + self.assertEqual( + 5, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, + on_log_event_listener) + ) + def test_add_notification_listener__multiple_listeners(self): """ Test that multiple listeners of the same type can be successfully added. """ @@ -138,6 +147,7 @@ def another_on_activate_listener(*args): self.assertEqual(2, len(test_notification_center.notification_listeners[enums.NotificationTypes.ACTIVATE])) self.assertEqual(1, len(test_notification_center.notification_listeners[enums.NotificationTypes.DECISION])) self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.TRACK])) + self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.LOG_EVENT])) # Remove one of the activate listeners and assert. self.assertTrue(test_notification_center.remove_notification_listener(3)) @@ -164,6 +174,10 @@ def another_on_activate_listener(*args): 3, test_notification_center.add_notification_listener(enums.NotificationTypes.ACTIVATE, another_on_activate_listener) ) + self.assertEqual( + 4, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, + on_log_event_listener) + ) # Try removing a listener which does not exist. self.assertFalse(test_notification_center.remove_notification_listener(42)) @@ -180,6 +194,7 @@ def test_clear_notification_listeners(self): on_config_update_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) + test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) # Assert all listeners are there: for notification_type in notification_center.NOTIFICATION_TYPES: @@ -210,6 +225,7 @@ def test_clear_all_notification_listeners(self): on_config_update_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) + test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) # Assert all listeners are there: for notification_type in notification_center.NOTIFICATION_TYPES: