Skip to content

feat: add odp event manager #403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions optimizely/helpers/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ class Errors:
NONE_VARIABLE_KEY_PARAMETER: Final = '"None" is an invalid value for variable key.'
UNSUPPORTED_DATAFILE_VERSION: Final = (
'This version of the Python SDK does not support the given datafile version: "{}".')
INVALID_SEGMENT_IDENTIFIER = 'Audience segments fetch failed (invalid identifier).'
FETCH_SEGMENTS_FAILED = 'Audience segments fetch failed ({}).'
ODP_EVENT_FAILED = 'ODP event send failed ({}).'
ODP_NOT_ENABLED = 'ODP is not enabled. '
INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).'
FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).'
ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).'
ODP_NOT_ENABLED: Final = 'ODP is not enabled. '


class ForcedDecisionLogs:
Expand Down Expand Up @@ -205,3 +205,9 @@ class OdpRestApiConfig:
class OdpGraphQLApiConfig:
"""ODP GraphQL API configs."""
REQUEST_TIMEOUT: Final = 10


class OdpEventManagerConfig:
"""ODP Event Manager configs."""
DEFAULT_QUEUE_CAPACITY: Final = 1000
DEFAULT_BATCH_SIZE: Final = 10
33 changes: 32 additions & 1 deletion optimizely/odp/odp_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from __future__ import annotations

from typing import Any
import uuid
import json
from optimizely import version


class OdpEvent:
Expand All @@ -24,4 +27,32 @@ def __init__(self, type: str, action: str,
self.type = type
self.action = action
self.identifiers = identifiers
self.data = data
self.data = self._add_common_event_data(data)

def __repr__(self) -> str:
return str(self.__dict__)

def __eq__(self, other: object) -> bool:
if isinstance(other, OdpEvent):
return self.__dict__ == other.__dict__
elif isinstance(other, dict):
return self.__dict__ == other
else:
return False

def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]:
data = {
'idempotence_id': str(uuid.uuid4()),
'data_source_type': 'sdk',
'data_source': 'python-sdk',
'data_source_version': version.__version__
}
data.update(custom_data)
return data


class OdpEventEncoder(json.JSONEncoder):
def default(self, obj: object) -> Any:
if isinstance(obj, OdpEvent):
return obj.__dict__
return json.JSONEncoder.default(self, obj)
186 changes: 186 additions & 0 deletions optimizely/odp/odp_event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2022, Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from threading import Thread
from typing import Any, Optional
import queue
from queue import Queue
from sys import version_info

from optimizely import logger as _logging
from .odp_event import OdpEvent
from .odp_config import OdpConfig
from .zaius_rest_api_manager import ZaiusRestApiManager
from optimizely.helpers.enums import OdpEventManagerConfig, Errors


if version_info < (3, 8):
from typing_extensions import Final
else:
from typing import Final # type: ignore


class Signal:
'''Used to create unique objects for sending signals to event queue.'''
pass


class OdpEventManager:
"""
Class that sends batches of ODP events.

The OdpEventManager maintains a single consumer thread that pulls events off of
the queue and buffers them before the events are sent to ODP.
"""

_SHUTDOWN_SIGNAL: Final = Signal()
_FLUSH_SIGNAL: Final = Signal()

def __init__(
self,
odp_config: OdpConfig,
logger: Optional[_logging.Logger] = None,
api_manager: Optional[ZaiusRestApiManager] = None

):
""" OdpEventManager init method to configure event batching.

Args:
odp_config: ODP integration config.
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
api_manager: Optional component which sends events to ODP.
"""
self.logger = logger or _logging.NoOpLogger()
self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger)
self.odp_config = odp_config
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
self._current_batch: list[OdpEvent] = []
self.executor = Thread(target=self._run, daemon=True)

@property
def is_running(self) -> bool:
""" Property to check if consumer thread is alive or not. """
return self.executor.is_alive()

def start(self) -> None:
""" Starts the batch processing thread to batch events. """
if self.is_running:
self.logger.warning('ODP event processor already started.')
return

self.executor.start()

def _run(self) -> None:
""" Triggered as part of the thread which batches odp events or flushes event_queue and blocks on get
for flush interval if queue is empty.
"""
try:
while True:
item = self.event_queue.get()

if item == self._SHUTDOWN_SIGNAL:
self.logger.debug('Received ODP event shutdown signal.')
self.event_queue.task_done()
break

if item is self._FLUSH_SIGNAL:
self.logger.debug('Received ODP event flush signal.')
self._flush_batch()
self.event_queue.task_done()
continue

if isinstance(item, OdpEvent):
self._add_to_batch(item)
self.event_queue.task_done()

except Exception as exception:
self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}')

finally:
self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.')
self._flush_batch()

def flush(self) -> None:
""" Adds flush signal to event_queue. """

self.event_queue.put(self._FLUSH_SIGNAL)

def _flush_batch(self) -> None:
""" Flushes current batch by dispatching event. """
batch_len = len(self._current_batch)
if batch_len == 0:
self.logger.debug('Nothing to flush.')
return

api_key = self.odp_config.get_api_key()
api_host = self.odp_config.get_api_host()

if not api_key or not api_host:
self.logger.debug('ODP event processing has been disabled.')
return

self.logger.debug(f'Flushing batch size {batch_len}.')
should_retry = False
event_batch = list(self._current_batch)
try:
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, event_batch)
except Exception as e:
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{event_batch} {e}'))

if should_retry:
self.logger.debug('Error dispatching ODP events, scheduled to retry.')
return

self._current_batch = []

def _add_to_batch(self, odp_event: OdpEvent) -> None:
""" Method to append received odp event to current batch."""

self._current_batch.append(odp_event)
if len(self._current_batch) >= self.batch_size:
self.logger.debug('Flushing ODP events on batch size.')
self._flush_batch()

def stop(self) -> None:
""" Stops and disposes batch odp event queue."""
self.event_queue.put(self._SHUTDOWN_SIGNAL)
self.logger.warning('Stopping ODP Event Queue.')

if self.is_running:
self.executor.join()

if len(self._current_batch) > 0:
self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch))

if self.is_running:
self.logger.error('Error stopping ODP event queue.')

def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None:
event = OdpEvent(type, action, identifiers, data)
self.dispatch(event)

def dispatch(self, event: OdpEvent) -> None:
if not self.odp_config.odp_integrated():
self.logger.debug('ODP event processing has been disabled.')
return

if not self.is_running:
self.logger.warning('ODP event processor is shutdown, not accepting events.')
return

try:
self.event_queue.put_nowait(event)
except queue.Full:
self.logger.error(Errors.ODP_EVENT_FAILED.format("Queue is full"))
4 changes: 2 additions & 2 deletions optimizely/odp/zaius_rest_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from optimizely import logger as optimizely_logger
from optimizely.helpers.enums import Errors, OdpRestApiConfig
from optimizely.odp.odp_event import OdpEvent
from optimizely.odp.odp_event import OdpEvent, OdpEventEncoder

"""
ODP REST Events API
Expand Down Expand Up @@ -60,7 +60,7 @@ def send_odp_events(self, api_key: str, api_host: str, events: list[OdpEvent]) -
request_headers = {'content-type': 'application/json', 'x-api-key': api_key}

try:
payload_dict = json.dumps(events)
payload_dict = json.dumps(events, cls=OdpEventEncoder)
except TypeError as err:
self.logger.error(Errors.ODP_EVENT_FAILED.format(err))
return should_retry
Expand Down
Loading