diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index db44c041..3f82a7fe 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -150,18 +150,18 @@ def _validate_instantiation_props(self, prop, prop_name, default_value): return is_valid def _get_time(self, _time=None): - """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + """ Method to return time as float in seconds. If _time is None, uses current time. Args: - _time: time in seconds that needs to be rounded off. + _time: time in seconds. Returns: - Integer time in seconds. + Float time in seconds. """ if _time is None: - return int(round(time.time())) + return time.time() - return int(round(_time)) + return _time def start(self): """ Starts the batch processing thread to batch events. """ @@ -182,12 +182,18 @@ def _run(self): while True: if self._get_time() >= self.flushing_interval_deadline: self._flush_queue() + self.flushing_interval_deadline = self._get_time() + \ + self._get_time(self.flush_interval.total_seconds()) + self.logger.debug('Flush interval deadline. Flushed queue.') try: - item = self.event_queue.get(False) + interval = self.flushing_interval_deadline - self._get_time() + item = self.event_queue.get(True, interval) + + if item is None: + continue except queue.Empty: - time.sleep(0.05) continue if item == self._SHUTDOWN_SIGNAL: diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index e16032fe..a8a954f4 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -173,6 +173,28 @@ def test_flush_on_max_timeout(self): self.assertStrictTrue(event_dispatcher.compare_events()) self.assertEqual(0, self.event_processor.event_queue.qsize()) + def test_flush_once_max_timeout(self): + event_dispatcher = TestEventDispatcher() + + self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG) + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self.event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.75) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) + self.assertTrue(mock_config_logging.debug.called) + mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.') + mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.') + self.assertTrue(mock_config_logging.debug.call_count == 2) + self.optimizely.logger = SimpleLogger() + def test_flush_max_batch_size(self): event_dispatcher = TestEventDispatcher() @@ -339,6 +361,40 @@ def test_init__invalid_flush_interval(self): self.assertEqual(datetime.timedelta(seconds=30), self.event_processor.flush_interval) mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') + def test_init__float_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0.5, + self.MAX_TIMEOUT_INTERVAL_SEC, + ) + + # default flush interval is 30s. + self.assertEqual(datetime.timedelta(seconds=0.5), self.event_processor.flush_interval) + + def test_init__float_flush_deadline(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0.5, + self.MAX_TIMEOUT_INTERVAL_SEC, + ) + + # default flush interval is 30s. + self.assertTrue(isinstance(self.event_processor.flushing_interval_deadline, float)) + def test_init__bool_flush_interval(self): event_dispatcher = TestEventDispatcher()