-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathodp_event_manager.py
243 lines (196 loc) · 9.22 KB
/
odp_event_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# Copyright 2022, Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import time
from enum import Enum
from queue import Empty, Queue, Full
from threading import Thread
from typing import Optional
from optimizely import logger as _logging
from optimizely.helpers.enums import OdpEventManagerConfig, Errors, OdpManagerConfig
from .odp_config import OdpConfig, OdpConfigState
from .odp_event import OdpEvent, OdpDataDict
from .zaius_rest_api_manager import ZaiusRestApiManager
class Signal(Enum):
"""Enum for sending signals to the event queue."""
SHUTDOWN = 1
FLUSH = 2
class OdpEventManager:
"""
Class that sends batches of ODP events.
The OdpEventManager maintains a single consumer thread that pulls events off of
the queue and buffers them before events are sent to ODP.
Sends events when the batch size is met or when the flush timeout has elapsed.
"""
def __init__(
self,
odp_config: OdpConfig,
logger: Optional[_logging.Logger] = None,
api_manager: Optional[ZaiusRestApiManager] = None
):
"""OdpEventManager init method to configure event batching.
Args:
odp_config: ODP integration config.
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
api_manager: Optional component which sends events to ODP.
"""
self.logger = logger or _logging.NoOpLogger()
self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger)
self.odp_config = odp_config
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
self._flush_deadline: float = 0
self.retry_count = OdpEventManagerConfig.DEFAULT_RETRY_COUNT
self._current_batch: list[OdpEvent] = []
"""_current_batch should only be modified by the processing thread, as it is not thread safe"""
self.thread = Thread(target=self._run, daemon=True)
self.thread_exception = False
"""thread_exception will be True if the processing thread did not exit cleanly"""
@property
def is_running(self) -> bool:
"""Property to check if consumer thread is alive or not."""
return self.thread.is_alive()
def start(self) -> None:
"""Starts the batch processing thread to batch events."""
if self.is_running:
self.logger.warning('ODP event queue already started.')
return
self.thread.start()
def _run(self) -> None:
"""Processes the event queue from a child thread. Events are batched until
the batch size is met or until the flush timeout has elapsed.
"""
try:
while True:
timeout = self._get_queue_timeout()
try:
item = self.event_queue.get(True, timeout)
except Empty:
item = None
if item == Signal.SHUTDOWN:
self.logger.debug('ODP event queue: received shutdown signal.')
break
elif item == Signal.FLUSH:
self.logger.debug('ODP event queue: received flush signal.')
self._flush_batch()
self.event_queue.task_done()
continue
elif isinstance(item, OdpEvent):
self._add_to_batch(item)
self.event_queue.task_done()
elif len(self._current_batch) > 0:
self.logger.debug('ODP event queue: flushing on interval.')
self._flush_batch()
except Exception as exception:
self.thread_exception = True
self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}')
finally:
self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.')
self._flush_batch()
if item == Signal.SHUTDOWN:
self.event_queue.task_done()
def flush(self) -> None:
"""Adds flush signal to event_queue."""
try:
self.event_queue.put_nowait(Signal.FLUSH)
except Full:
self.logger.error("Error flushing ODP event queue")
def _flush_batch(self) -> None:
"""Flushes current batch by dispatching event.
Should only be called by the processing thread."""
batch_len = len(self._current_batch)
if batch_len == 0:
self.logger.debug('ODP event queue: nothing to flush.')
return
api_key = self.odp_config.get_api_key()
api_host = self.odp_config.get_api_host()
if not api_key or not api_host:
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
self._current_batch.clear()
return
self.logger.debug(f'ODP event queue: flushing batch size {batch_len}.')
should_retry = False
for i in range(1 + self.retry_count):
try:
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch)
except Exception as error:
should_retry = False
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Error: {error} {self._current_batch}'))
if not should_retry:
break
if i < self.retry_count:
self.logger.debug('Error dispatching ODP events, scheduled to retry.')
if should_retry:
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Failed after {i} retries: {self._current_batch}'))
self._current_batch.clear()
def _add_to_batch(self, odp_event: OdpEvent) -> None:
"""Appends received ODP event to current batch, flushing if batch is greater than batch size.
Should only be called by the processing thread."""
if not self._current_batch:
self._set_flush_deadline()
self._current_batch.append(odp_event)
if len(self._current_batch) >= self.batch_size:
self.logger.debug('ODP event queue: flushing on batch size.')
self._flush_batch()
def _set_flush_deadline(self) -> None:
"""Sets time that next flush will occur."""
self._flush_deadline = time.time() + self.flush_interval
def _get_time_till_flush(self) -> float:
"""Returns seconds until next flush; no less than 0."""
return max(0, self._flush_deadline - time.time())
def _get_queue_timeout(self) -> Optional[float]:
"""Returns seconds until next flush or None if current batch is empty."""
if len(self._current_batch) == 0:
return None
return self._get_time_till_flush()
def stop(self) -> None:
"""Flushes and then stops ODP event queue."""
try:
self.event_queue.put_nowait(Signal.SHUTDOWN)
except Full:
self.logger.error('Error stopping ODP event queue.')
return
self.logger.warning('Stopping ODP event queue.')
if self.is_running:
self.thread.join()
if len(self._current_batch) > 0:
self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch))
if self.is_running:
self.logger.error('Error stopping ODP event queue.')
def send_event(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None:
"""Create OdpEvent and add it to the event queue."""
odp_state = self.odp_config.odp_state()
if odp_state == OdpConfigState.UNDETERMINED:
self.logger.debug('ODP event queue: cannot send before the datafile has loaded.')
return
if odp_state == OdpConfigState.NOT_INTEGRATED:
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
return
self.dispatch(OdpEvent(type, action, identifiers, data))
def dispatch(self, event: OdpEvent) -> None:
"""Add OdpEvent to the event queue."""
if self.thread_exception:
self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down'))
return
if not self.is_running:
self.logger.warning('ODP event queue is shutdown, not accepting events.')
return
try:
self.logger.debug('ODP event queue: adding event.')
self.event_queue.put_nowait(event)
except Full:
self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full"))
def identify_user(self, user_id: str) -> None:
self.send_event(OdpManagerConfig.EVENT_TYPE, 'identified',
{OdpManagerConfig.KEY_FOR_USER_ID: user_id}, {})