-
Notifications
You must be signed in to change notification settings - Fork 36
feat(notification-center): Add LogEvent notification #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
d7f69ec
8853cb3
5934764
af497f9
f21c94b
ddef208
e964048
8953a43
752e35f
f9a2cb6
54446b9
c92f79d
d42d14b
06ac3f0
9630139
2582fb7
75725bd
cb07348
f50e053
bb3f738
46cc51d
1ab8b32
f0dd1fd
4d4e7d1
cbd918a
4c8b7f3
3b2cb4a
4753c0b
3baf024
7e20be7
614a3d4
2c67253
9ebf1a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ | |
| from optimizely import logger as _logging | ||
| from optimizely.closeable import Closeable | ||
| from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher | ||
| from optimizely.helpers import validator | ||
| from optimizely.helpers import validator, enums | ||
|
|
||
| ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) | ||
|
|
||
|
|
@@ -40,6 +40,7 @@ def process(user_event): | |
| class BatchEventProcessor(EventProcessor, Closeable): | ||
| """ | ||
| BatchEventProcessor is a batched implementation of the EventProcessor. | ||
|
|
||
| The BatchEventProcessor maintains a single consumer thread that pulls events off of | ||
| the blocking queue and buffers them for either a configured batch size or for a | ||
| maximum duration before the resulting LogEvent is sent to the EventDispatcher. | ||
|
|
@@ -60,7 +61,8 @@ def __init__(self, | |
| event_queue=None, | ||
| batch_size=None, | ||
| flush_interval=None, | ||
| timeout_interval=None): | ||
| timeout_interval=None, | ||
| notification_center=None): | ||
|
||
| self.event_dispatcher = event_dispatcher or default_event_dispatcher | ||
| self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) | ||
| self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) | ||
|
|
@@ -72,6 +74,8 @@ def __init__(self, | |
| self.timeout_interval = timedelta(milliseconds=timeout_interval) \ | ||
| if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ | ||
| else self._DEFAULT_TIMEOUT_INTERVAL | ||
|
|
||
| self.notification_center = notification_center | ||
| self._disposed = False | ||
| self._is_started = False | ||
| self._current_batch = list() | ||
|
|
@@ -163,6 +167,12 @@ def _flush_queue(self): | |
|
|
||
| log_event = EventFactory.create_log_event(to_process_batch, self.logger) | ||
|
|
||
| if self.notification_center is not None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be validating that |
||
| self.notification_center.send_notifications( | ||
| enums.NotificationTypes.LOG_EVENT, | ||
| log_event | ||
| ) | ||
|
|
||
| try: | ||
| self.event_dispatcher.dispatch_event(log_event) | ||
| except Exception as e: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,12 @@ | |
| from six.moves import queue | ||
|
|
||
| from . import base | ||
| from optimizely.logger import SimpleLogger | ||
| from optimizely.event.payload import Decision, Visitor | ||
| from optimizely.event.user_event_factory import UserEventFactory | ||
| from optimizely.event.event_processor import BatchEventProcessor | ||
| from optimizely.event.log_event import LogEvent | ||
| from optimizely.event.user_event_factory import UserEventFactory | ||
| from optimizely.helpers import enums | ||
| from optimizely.logger import SimpleLogger | ||
|
|
||
|
|
||
| class CanonicalEvent(object): | ||
|
|
@@ -115,6 +117,7 @@ def setUp(self, *args, **kwargs): | |
| self.event_name = 'test_event' | ||
| self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) | ||
| self.optimizely.logger = SimpleLogger() | ||
| self.notification_center = self.optimizely.notification_center | ||
|
|
||
| def tearDown(self): | ||
| self._event_processor.close() | ||
|
|
@@ -130,7 +133,8 @@ def _set_event_processor(self, event_dispatcher, logger): | |
| self.event_queue, | ||
| self.MAX_BATCH_SIZE, | ||
| self.MAX_DURATION_MS, | ||
| self.MAX_TIMEOUT_INTERVAL_MS | ||
| self.MAX_TIMEOUT_INTERVAL_MS, | ||
| self.optimizely.notification_center | ||
| ) | ||
|
|
||
| def test_drain_on_close(self): | ||
|
|
@@ -376,3 +380,29 @@ def test_init__NaN_timeout_interval(self): | |
| # default timeout interval is 5s. | ||
| self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) | ||
| mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') | ||
|
|
||
| def test_notification_center(self): | ||
|
||
|
|
||
| mock_event_dispatcher = mock.Mock() | ||
| callback_hit = [False] | ||
|
|
||
| def on_log_event(log_event): | ||
| self.assertStrictTrue(isinstance(log_event, LogEvent)) | ||
| callback_hit[0] = True | ||
|
|
||
| self.optimizely.notification_center.add_notification_listener( | ||
| enums.NotificationTypes.LOG_EVENT, on_log_event | ||
| ) | ||
|
|
||
| with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: | ||
| self._set_event_processor(mock_event_dispatcher, mock_config_logging) | ||
|
|
||
| user_event = self._build_conversion_event(self.event_name, self.project_config) | ||
| self._event_processor.process(user_event) | ||
|
|
||
| self._event_processor.close() | ||
|
|
||
| self.assertEqual(True, callback_hit[0]) | ||
| self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ | ||
| enums.NotificationTypes.LOG_EVENT | ||
| ])) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,10 @@ def on_track_listener(*args): | |
| pass | ||
|
|
||
|
|
||
| def on_log_event_listener(*args): | ||
| pass | ||
|
|
||
|
|
||
| class NotificationCenterTest(unittest.TestCase): | ||
|
|
||
| def test_add_notification_listener__valid_type(self): | ||
|
|
@@ -59,6 +63,11 @@ def test_add_notification_listener__valid_type(self): | |
| 4, test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) | ||
| ) | ||
|
|
||
| self.assertEqual( | ||
| 5, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, | ||
| on_log_event_listener) | ||
| ) | ||
|
|
||
| def test_add_notification_listener__multiple_listeners(self): | ||
| """ Test that multiple listeners of the same type can be successfully added. """ | ||
|
|
||
|
|
@@ -138,6 +147,7 @@ def another_on_activate_listener(*args): | |
| self.assertEqual(2, len(test_notification_center.notification_listeners[enums.NotificationTypes.ACTIVATE])) | ||
| self.assertEqual(1, len(test_notification_center.notification_listeners[enums.NotificationTypes.DECISION])) | ||
| self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.TRACK])) | ||
| self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.LOG_EVENT])) | ||
|
|
||
| # Remove one of the activate listeners and assert. | ||
| self.assertTrue(test_notification_center.remove_notification_listener(3)) | ||
|
|
@@ -164,6 +174,10 @@ def another_on_activate_listener(*args): | |
| 3, test_notification_center.add_notification_listener(enums.NotificationTypes.ACTIVATE, | ||
| another_on_activate_listener) | ||
| ) | ||
| self.assertEqual( | ||
| 4, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, | ||
| on_log_event_listener) | ||
| ) | ||
|
|
||
| # Try removing a listener which does not exist. | ||
| self.assertFalse(test_notification_center.remove_notification_listener(42)) | ||
|
|
@@ -180,6 +194,7 @@ def test_clear_notification_listeners(self): | |
| on_config_update_listener) | ||
| test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) | ||
| test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) | ||
| test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) | ||
|
|
||
| # Assert all listeners are there: | ||
| for notification_type in notification_center.NOTIFICATION_TYPES: | ||
|
|
@@ -210,6 +225,7 @@ def test_clear_all_notification_listeners(self): | |
| on_config_update_listener) | ||
| test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) | ||
| test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) | ||
| test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) | ||
|
|
||
| # Assert all listeners are there: | ||
| for notification_type in notification_center.NOTIFICATION_TYPES: | ||
|
|
@@ -219,6 +235,7 @@ def test_clear_all_notification_listeners(self): | |
| test_notification_center.clear_all_notification_listeners() | ||
|
|
||
| for notification_type in notification_center.NOTIFICATION_TYPES: | ||
| print(notification_type) | ||
|
||
| self.assertEqual(0, len(test_notification_center.notification_listeners[notification_type])) | ||
|
|
||
| def set_listener_called_to_true(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I'd recommend splitting up imports into separate lines. Just a conventional thing.