Skip to content

Commit e13482f

Browse files
authored
feat: add odp event flush interval (#414)
* expose odp flush interval to the client * add odp flush interval and tests * tests fix * Delete z_matjaz_play directory * pr fixes * add a new test * cleanup * rename docstring
1 parent b2f1cc9 commit e13482f

File tree

6 files changed

+111
-22
lines changed

6 files changed

+111
-22
lines changed

optimizely/helpers/sdk_settings.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ def __init__(
3131
odp_segments_cache: Optional[OptimizelySegmentsCache] = None,
3232
odp_segment_manager: Optional[OdpSegmentManager] = None,
3333
odp_event_manager: Optional[OdpEventManager] = None,
34-
fetch_segments_timeout: Optional[int] = None,
35-
odp_event_timeout: Optional[int] = None
34+
odp_segment_request_timeout: Optional[int] = None,
35+
odp_event_request_timeout: Optional[int] = None,
36+
odp_event_flush_interval: Optional[int] = None
3637
) -> None:
3738
"""
3839
Args:
@@ -47,8 +48,10 @@ def __init__(
4748
`fetch_qualified_segments(user_key, user_value, options)`.
4849
odp_event_manager: A custom odp event manager. Required method is:
4950
`send_event(type:, action:, identifiers:, data:)`
50-
fetch_segments_timeout: A fetch segment timeout in seconds (optional).
51-
odp_event_timeout: A send odp event timeout in seconds (optional).
51+
odp_segment_request_timeout: Time to wait in seconds for fetch_qualified_segments request to
52+
send successfully (optional).
53+
odp_event_request_timeout: Time to wait in seconds for send_odp_events request to send successfully.
54+
odp_event_flush_interval: Time to wait for events to accumulate before sending a batch in seconds (optional).
5255
"""
5356

5457
self.odp_disabled = odp_disabled
@@ -57,5 +60,6 @@ def __init__(
5760
self.segments_cache = odp_segments_cache
5861
self.odp_segment_manager = odp_segment_manager
5962
self.odp_event_manager = odp_event_manager
60-
self.fetch_segments_timeout = fetch_segments_timeout
61-
self.odp_event_timeout = odp_event_timeout
63+
self.fetch_segments_timeout = odp_segment_request_timeout
64+
self.odp_event_timeout = odp_event_request_timeout
65+
self.odp_flush_interval = odp_event_flush_interval

optimizely/odp/odp_event_manager.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -40,31 +40,37 @@ class OdpEventManager:
4040
The OdpEventManager maintains a single consumer thread that pulls events off of
4141
the queue and buffers them before events are sent to ODP.
4242
Sends events when the batch size is met or when the flush timeout has elapsed.
43+
Flushes the event queue after specified time (seconds).
4344
"""
4445

4546
def __init__(
4647
self,
4748
logger: Optional[_logging.Logger] = None,
4849
api_manager: Optional[OdpEventApiManager] = None,
49-
timeout: Optional[int] = None
50+
request_timeout: Optional[int] = None,
51+
flush_interval: Optional[int] = None
5052
):
5153
"""OdpEventManager init method to configure event batching.
5254
5355
Args:
5456
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
5557
api_manager: Optional component which sends events to ODP.
56-
timeout: Optional event timeout in seconds.
58+
request_timeout: Optional event timeout in seconds - wait time for odp platform to respond before failing.
59+
flush_interval: Optional time to wait for events to accumulate before sending the batch in seconds.
5760
"""
5861
self.logger = logger or _logging.NoOpLogger()
59-
self.api_manager = api_manager or OdpEventApiManager(self.logger, timeout)
62+
self.api_manager = api_manager or OdpEventApiManager(self.logger, request_timeout)
6063

6164
self.odp_config: Optional[OdpConfig] = None
6265
self.api_key: Optional[str] = None
6366
self.api_host: Optional[str] = None
6467

6568
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
66-
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
67-
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
69+
self.batch_size = 0 if flush_interval == 0 else OdpEventManagerConfig.DEFAULT_BATCH_SIZE
70+
71+
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL if flush_interval is None \
72+
else flush_interval
73+
6874
self._flush_deadline: float = 0
6975
self.retry_count = OdpEventManagerConfig.DEFAULT_RETRY_COUNT
7076
self._current_batch: list[OdpEvent] = []

optimizely/odp/odp_manager.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
event_manager: Optional[OdpEventManager] = None,
3636
fetch_segments_timeout: Optional[int] = None,
3737
odp_event_timeout: Optional[int] = None,
38+
odp_flush_interval: Optional[int] = None,
3839
logger: Optional[optimizely_logger.Logger] = None
3940
) -> None:
4041

@@ -58,7 +59,8 @@ def __init__(
5859
)
5960
self.segment_manager = OdpSegmentManager(segments_cache, logger=self.logger, timeout=fetch_segments_timeout)
6061

61-
self.event_manager = self.event_manager or OdpEventManager(self.logger, timeout=odp_event_timeout)
62+
self.event_manager = self.event_manager or OdpEventManager(self.logger, request_timeout=odp_event_timeout,
63+
flush_interval=odp_flush_interval)
6264
self.segment_manager.odp_config = self.odp_config
6365

6466
def fetch_qualified_segments(self, user_id: str, options: list[str]) -> Optional[list[str]]:

optimizely/optimizely.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1317,7 +1317,8 @@ def setup_odp(self, sdk_key: Optional[str]) -> None:
13171317
self.sdk_settings.odp_event_manager,
13181318
self.sdk_settings.fetch_segments_timeout,
13191319
self.sdk_settings.odp_event_timeout,
1320-
self.logger
1320+
self.sdk_settings.odp_flush_interval,
1321+
self.logger,
13211322
)
13221323

13231324
if self.sdk_settings.odp_disabled:

tests/test_odp_event_manager.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,10 @@ def test_odp_event_manager_override_default_data(self, *args):
382382
mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event])
383383
event_manager.stop()
384384

385-
def test_odp_event_manager_flush_timeout(self, *args):
385+
def test_odp_event_manager_flush_interval(self, *args):
386+
"""Verify that both events have been sent together after they have been batched."""
386387
mock_logger = mock.Mock()
387-
event_manager = OdpEventManager(mock_logger)
388-
event_manager.flush_interval = .5
388+
event_manager = OdpEventManager(mock_logger, flush_interval=.5)
389389
event_manager.start(self.odp_config)
390390

391391
with mock.patch.object(
@@ -394,13 +394,34 @@ def test_odp_event_manager_flush_timeout(self, *args):
394394
event_manager.send_event(**self.events[0])
395395
event_manager.send_event(**self.events[1])
396396
event_manager.event_queue.join()
397-
time.sleep(1)
397+
time.sleep(1) # ensures that the flush interval time has passed
398398

399399
mock_logger.error.assert_not_called()
400400
mock_logger.debug.assert_any_call('ODP event queue: flushing on interval.')
401401
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
402402
event_manager.stop()
403403

404+
def test_odp_event_manager_flush_interval_is_zero(self, *args):
405+
"""Verify that event is immediately if flush interval is zero."""
406+
mock_logger = mock.Mock()
407+
event_manager = OdpEventManager(mock_logger, flush_interval=0)
408+
event_manager.start(self.odp_config)
409+
410+
with mock.patch.object(
411+
event_manager.api_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False
412+
) as mock_send:
413+
event_manager.send_event(**self.events[0])
414+
event_manager.send_event(**self.events[1])
415+
event_manager.event_queue.join()
416+
417+
mock_send.assert_has_calls(
418+
[mock.call(self.api_key, self.api_host, [self.processed_events[0]]),
419+
mock.call(self.api_key, self.api_host, [self.processed_events[1]])]
420+
)
421+
mock_logger.error.assert_not_called()
422+
mock_logger.debug.assert_any_call('ODP event queue: flushing batch size 1.')
423+
event_manager.stop()
424+
404425
def test_odp_event_manager_events_before_odp_ready(self, *args):
405426
mock_logger = mock.Mock()
406427
odp_config = OdpConfig()

tests/test_optimizely.py

+60-5
Original file line numberDiff line numberDiff line change
@@ -5140,11 +5140,7 @@ def test_user_context_invalid_user_id(self):
51405140
uc = self.optimizely.create_user_context(u)
51415141
self.assertIsNone(uc, "invalid user id should return none")
51425142

5143-
def test_invalid_flag_key(self):
5144-
"""Tests invalid flag key in function get_flag_variation_by_key()."""
5145-
pass
5146-
5147-
def test_send_identify_event_when_called_with_odp_enabled(self):
5143+
def test_send_identify_event__when_called_with_odp_enabled(self):
51485144
mock_logger = mock.Mock()
51495145
client = optimizely.Optimizely(json.dumps(self.config_dict_with_audience_segments), logger=mock_logger)
51505146
with mock.patch.object(client, 'identify_user') as identify:
@@ -5154,6 +5150,34 @@ def test_send_identify_event_when_called_with_odp_enabled(self):
51545150
mock_logger.error.assert_not_called()
51555151
client.close()
51565152

5153+
def test_sdk_settings__accept_zero_for_flush_interval(self):
5154+
mock_logger = mock.Mock()
5155+
sdk_settings = OptimizelySdkSettings(odp_event_flush_interval=0)
5156+
client = optimizely.Optimizely(
5157+
json.dumps(self.config_dict_with_audience_segments),
5158+
logger=mock_logger,
5159+
settings=sdk_settings
5160+
)
5161+
flush_interval = client.odp_manager.event_manager.flush_interval
5162+
5163+
self.assertEqual(flush_interval, 0)
5164+
mock_logger.error.assert_not_called()
5165+
client.close()
5166+
5167+
def test_sdk_settings__should_use_default_when_odp_flush_interval_none(self):
5168+
mock_logger = mock.Mock()
5169+
sdk_settings = OptimizelySdkSettings(odp_event_flush_interval=None)
5170+
client = optimizely.Optimizely(
5171+
json.dumps(self.config_dict_with_audience_segments),
5172+
logger=mock_logger,
5173+
settings=sdk_settings
5174+
)
5175+
flush_interval = client.odp_manager.event_manager.flush_interval
5176+
self.assertEqual(flush_interval, enums.OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL)
5177+
5178+
mock_logger.error.assert_not_called()
5179+
client.close()
5180+
51575181
def test_sdk_settings__log_info_when_disabled(self):
51585182
mock_logger = mock.Mock()
51595183
sdk_settings = OptimizelySdkSettings(odp_disabled=True)
@@ -5162,6 +5186,7 @@ def test_sdk_settings__log_info_when_disabled(self):
51625186
logger=mock_logger,
51635187
settings=sdk_settings
51645188
)
5189+
51655190
self.assertIsNone(client.odp_manager.event_manager)
51665191
self.assertIsNone(client.odp_manager.segment_manager)
51675192
mock_logger.info.assert_called_once_with('ODP is disabled.')
@@ -5211,6 +5236,36 @@ def test_sdk_settings__accept_cache_size_and_cache_timeout(self):
52115236
mock_logger.error.assert_not_called()
52125237
client.close()
52135238

5239+
def test_sdk_settings__use_default_cache_size_and_timeout_when_odp_flush_interval_none(self):
5240+
mock_logger = mock.Mock()
5241+
sdk_settings = OptimizelySdkSettings()
5242+
client = optimizely.Optimizely(
5243+
json.dumps(self.config_dict_with_audience_segments),
5244+
logger=mock_logger,
5245+
settings=sdk_settings
5246+
)
5247+
segments_cache = client.odp_manager.segment_manager.segments_cache
5248+
self.assertEqual(segments_cache.timeout, enums.OdpSegmentsCacheConfig.DEFAULT_TIMEOUT_SECS)
5249+
self.assertEqual(segments_cache.capacity, enums.OdpSegmentsCacheConfig.DEFAULT_CAPACITY)
5250+
5251+
mock_logger.error.assert_not_called()
5252+
client.close()
5253+
5254+
def test_sdk_settings__accept_zero_cache_size_timeout_and_cache_size(self):
5255+
mock_logger = mock.Mock()
5256+
sdk_settings = OptimizelySdkSettings(segments_cache_size=0, segments_cache_timeout_in_secs=0)
5257+
client = optimizely.Optimizely(
5258+
json.dumps(self.config_dict_with_audience_segments),
5259+
logger=mock_logger,
5260+
settings=sdk_settings
5261+
)
5262+
segments_cache = client.odp_manager.segment_manager.segments_cache
5263+
self.assertEqual(segments_cache.capacity, 0)
5264+
self.assertEqual(segments_cache.timeout, 0)
5265+
5266+
mock_logger.error.assert_not_called()
5267+
client.close()
5268+
52145269
def test_sdk_settings__accept_valid_custom_cache(self):
52155270
class CustomCache:
52165271
def reset(self):

0 commit comments

Comments
 (0)