diff --git a/optimizely/closeable.py b/optimizely/closeable.py new file mode 100644 index 00000000..27118747 --- /dev/null +++ b/optimizely/closeable.py @@ -0,0 +1,25 @@ +# Copyright 2019 Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + + +class Closeable(object): + """ Class encapsulating closing functionality. Override with your own implementation + for close method. """ + + @abc.abstractmethod + def close(self): + pass diff --git a/optimizely/config_manager.py b/optimizely/config_manager.py index d4fece65..f8a67c9b 100644 --- a/optimizely/config_manager.py +++ b/optimizely/config_manager.py @@ -30,282 +30,282 @@ class BaseConfigManager(ABC): - """ Base class for Optimizely's config manager. """ - - def __init__(self, - logger=None, - error_handler=None, - notification_center=None): - """ Initialize config manager. - - Args: - logger: Provides a logger instance. - error_handler: Provides a handle_error method to handle exceptions. - notification_center: Provides instance of notification_center.NotificationCenter. - """ - self.logger = optimizely_logger.adapt_logger(logger or optimizely_logger.NoOpLogger()) - self.error_handler = error_handler or NoOpErrorHandler() - self.notification_center = notification_center or NotificationCenter(self.logger) - self._validate_instantiation_options() - - def _validate_instantiation_options(self): - """ Helper method to validate all parameters. - - Raises: - Exception if provided options are invalid. - """ - if not validator.is_logger_valid(self.logger): - raise optimizely_exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('logger')) - - if not validator.is_error_handler_valid(self.error_handler): - raise optimizely_exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('error_handler')) - - if not validator.is_notification_center_valid(self.notification_center): - raise optimizely_exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('notification_center')) - - @abc.abstractmethod - def get_config(self): - """ Get config for use by optimizely.Optimizely. - The config should be an instance of project_config.ProjectConfig.""" - pass + """ Base class for Optimizely's config manager. """ + + def __init__(self, + logger=None, + error_handler=None, + notification_center=None): + """ Initialize config manager. + + Args: + logger: Provides a logger instance. + error_handler: Provides a handle_error method to handle exceptions. + notification_center: Provides instance of notification_center.NotificationCenter. + """ + self.logger = optimizely_logger.adapt_logger(logger or optimizely_logger.NoOpLogger()) + self.error_handler = error_handler or NoOpErrorHandler() + self.notification_center = notification_center or NotificationCenter(self.logger) + self._validate_instantiation_options() + + def _validate_instantiation_options(self): + """ Helper method to validate all parameters. + + Raises: + Exception if provided options are invalid. + """ + if not validator.is_logger_valid(self.logger): + raise optimizely_exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('logger')) + + if not validator.is_error_handler_valid(self.error_handler): + raise optimizely_exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('error_handler')) + + if not validator.is_notification_center_valid(self.notification_center): + raise optimizely_exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('notification_center')) + + @abc.abstractmethod + def get_config(self): + """ Get config for use by optimizely.Optimizely. + The config should be an instance of project_config.ProjectConfig.""" + pass class StaticConfigManager(BaseConfigManager): - """ Config manager that returns ProjectConfig based on provided datafile. """ - - def __init__(self, - datafile=None, - logger=None, - error_handler=None, - notification_center=None, - skip_json_validation=False): - """ Initialize config manager. Datafile has to be provided to use. - - Args: - datafile: JSON string representing the Optimizely project. - logger: Provides a logger instance. - error_handler: Provides a handle_error method to handle exceptions. - notification_center: Notification center to generate config update notification. - skip_json_validation: Optional boolean param which allows skipping JSON schema - validation upon object invocation. By default - JSON schema validation will be performed. - """ - super(StaticConfigManager, self).__init__(logger=logger, - error_handler=error_handler, - notification_center=notification_center) - self._config = None - self.validate_schema = not skip_json_validation - self._set_config(datafile) - - def _set_config(self, datafile): - """ Looks up and sets datafile and config based on response body. - - Args: - datafile: JSON string representing the Optimizely project. - """ - - if self.validate_schema: - if not validator.is_datafile_valid(datafile): - self.logger.error(enums.Errors.INVALID_INPUT.format('datafile')) - return - - error_msg = None - error_to_handle = None - config = None - - try: - config = project_config.ProjectConfig(datafile, self.logger, self.error_handler) - except optimizely_exceptions.UnsupportedDatafileVersionException as error: - error_msg = error.args[0] - error_to_handle = error - except: - error_msg = enums.Errors.INVALID_INPUT.format('datafile') - error_to_handle = optimizely_exceptions.InvalidInputException(error_msg) - finally: - if error_msg: - self.logger.error(error_msg) - self.error_handler.handle_error(error_to_handle) - return - - previous_revision = self._config.get_revision() if self._config else None - - if previous_revision == config.get_revision(): + """ Config manager that returns ProjectConfig based on provided datafile. """ + + def __init__(self, + datafile=None, + logger=None, + error_handler=None, + notification_center=None, + skip_json_validation=False): + """ Initialize config manager. Datafile has to be provided to use. + + Args: + datafile: JSON string representing the Optimizely project. + logger: Provides a logger instance. + error_handler: Provides a handle_error method to handle exceptions. + notification_center: Notification center to generate config update notification. + skip_json_validation: Optional boolean param which allows skipping JSON schema + validation upon object invocation. By default + JSON schema validation will be performed. + """ + super(StaticConfigManager, self).__init__(logger=logger, + error_handler=error_handler, + notification_center=notification_center) + self._config = None + self.validate_schema = not skip_json_validation + self._set_config(datafile) + + def _set_config(self, datafile): + """ Looks up and sets datafile and config based on response body. + + Args: + datafile: JSON string representing the Optimizely project. + """ + + if self.validate_schema: + if not validator.is_datafile_valid(datafile): + self.logger.error(enums.Errors.INVALID_INPUT.format('datafile')) return - self._config = config - self.notification_center.send_notifications(enums.NotificationTypes.OPTIMIZELY_CONFIG_UPDATE) - self.logger.debug( - 'Received new datafile and updated config. ' - 'Old revision number: {}. New revision number: {}.'.format(previous_revision, config.get_revision()) - ) - - def get_config(self): - """ Returns instance of ProjectConfig. - - Returns: - ProjectConfig. None if not set. - """ - return self._config - - -class PollingConfigManager(StaticConfigManager): - """ Config manager that polls for the datafile and updated ProjectConfig based on an update interval. """ - - def __init__(self, - sdk_key=None, - datafile=None, - update_interval=None, - url=None, - url_template=None, - logger=None, - error_handler=None, - notification_center=None, - skip_json_validation=False): - """ Initialize config manager. One of sdk_key or url has to be set to be able to use. - - Args: - sdk_key: Optional string uniquely identifying the datafile. - datafile: Optional JSON string representing the project. - update_interval: Optional floating point number representing time interval in seconds - at which to request datafile and set ProjectConfig. - url: Optional string representing URL from where to fetch the datafile. If set it supersedes the sdk_key. - url_template: Optional string template which in conjunction with sdk_key - determines URL from where to fetch the datafile. - logger: Provides a logger instance. - error_handler: Provides a handle_error method to handle exceptions. - notification_center: Notification center to generate config update notification. - skip_json_validation: Optional boolean param which allows skipping JSON schema - validation upon object invocation. By default - JSON schema validation will be performed. - - """ - super(PollingConfigManager, self).__init__(datafile=datafile, - logger=logger, - error_handler=error_handler, - notification_center=notification_center, - skip_json_validation=skip_json_validation) - self.datafile_url = self.get_datafile_url(sdk_key, url, - url_template or enums.ConfigManager.DATAFILE_URL_TEMPLATE) - self.set_update_interval(update_interval) - self.last_modified = None - self._polling_thread = threading.Thread(target=self._run) - self._polling_thread.setDaemon(True) - self._polling_thread.start() + error_msg = None + error_to_handle = None + config = None + + try: + config = project_config.ProjectConfig(datafile, self.logger, self.error_handler) + except optimizely_exceptions.UnsupportedDatafileVersionException as error: + error_msg = error.args[0] + error_to_handle = error + except: + error_msg = enums.Errors.INVALID_INPUT.format('datafile') + error_to_handle = optimizely_exceptions.InvalidInputException(error_msg) + finally: + if error_msg: + self.logger.error(error_msg) + self.error_handler.handle_error(error_to_handle) + return - @staticmethod - def get_datafile_url(sdk_key, url, url_template): - """ Helper method to determine URL from where to fetch the datafile. - - Args: - sdk_key: Key uniquely identifying the datafile. - url: String representing URL from which to fetch the datafile. - url_template: String representing template which is filled in with - SDK key to determine URL from which to fetch the datafile. - - Returns: - String representing URL to fetch datafile from. - - Raises: - optimizely.exceptions.InvalidInputException if: - - One of sdk_key or url is not provided. - - url_template is invalid. - """ - # Ensure that either is provided by the user. - if sdk_key is None and url is None: - raise optimizely_exceptions.InvalidInputException('Must provide at least one of sdk_key or url.') - - # Return URL if one is provided or use template and SDK key to get it. - if url is None: - try: - return url_template.format(sdk_key=sdk_key) - except (AttributeError, KeyError): - raise optimizely_exceptions.InvalidInputException( - 'Invalid url_template {} provided.'.format(url_template)) - - return url - - def set_update_interval(self, update_interval): - """ Helper method to set frequency at which datafile has to be polled and ProjectConfig updated. - - Args: - update_interval: Time in seconds after which to update datafile. - """ - if not update_interval: - update_interval = enums.ConfigManager.DEFAULT_UPDATE_INTERVAL - self.logger.debug('Set config update interval to default value {}.'.format(update_interval)) - - if not isinstance(update_interval, (int, float)): - raise optimizely_exceptions.InvalidInputException( - 'Invalid update_interval "{}" provided.'.format(update_interval) - ) + previous_revision = self._config.get_revision() if self._config else None - # If polling interval is less than minimum allowed interval then set it to default update interval. - if update_interval < enums.ConfigManager.MIN_UPDATE_INTERVAL: - self.logger.debug('update_interval value {} too small. Defaulting to {}'.format( - update_interval, - enums.ConfigManager.DEFAULT_UPDATE_INTERVAL) - ) - update_interval = enums.ConfigManager.DEFAULT_UPDATE_INTERVAL + if previous_revision == config.get_revision(): + return - self.update_interval = update_interval + self._config = config + self.notification_center.send_notifications(enums.NotificationTypes.OPTIMIZELY_CONFIG_UPDATE) + self.logger.debug( + 'Received new datafile and updated config. ' + 'Old revision number: {}. New revision number: {}.'.format(previous_revision, config.get_revision()) + ) - def set_last_modified(self, response_headers): - """ Looks up and sets last modified time based on Last-Modified header in the response. + def get_config(self): + """ Returns instance of ProjectConfig. - Args: - response_headers: requests.Response.headers - """ - self.last_modified = response_headers.get(enums.HTTPHeaders.LAST_MODIFIED) + Returns: + ProjectConfig. None if not set. + """ + return self._config - def _handle_response(self, response): - """ Helper method to handle response containing datafile. - Args: - response: requests.Response - """ +class PollingConfigManager(StaticConfigManager): + """ Config manager that polls for the datafile and updated ProjectConfig based on an update interval. """ + + def __init__(self, + sdk_key=None, + datafile=None, + update_interval=None, + url=None, + url_template=None, + logger=None, + error_handler=None, + notification_center=None, + skip_json_validation=False): + """ Initialize config manager. One of sdk_key or url has to be set to be able to use. + + Args: + sdk_key: Optional string uniquely identifying the datafile. + datafile: Optional JSON string representing the project. + update_interval: Optional floating point number representing time interval in seconds + at which to request datafile and set ProjectConfig. + url: Optional string representing URL from where to fetch the datafile. If set it supersedes the sdk_key. + url_template: Optional string template which in conjunction with sdk_key + determines URL from where to fetch the datafile. + logger: Provides a logger instance. + error_handler: Provides a handle_error method to handle exceptions. + notification_center: Notification center to generate config update notification. + skip_json_validation: Optional boolean param which allows skipping JSON schema + validation upon object invocation. By default + JSON schema validation will be performed. + + """ + super(PollingConfigManager, self).__init__(datafile=datafile, + logger=logger, + error_handler=error_handler, + notification_center=notification_center, + skip_json_validation=skip_json_validation) + self.datafile_url = self.get_datafile_url(sdk_key, url, + url_template or enums.ConfigManager.DATAFILE_URL_TEMPLATE) + self.set_update_interval(update_interval) + self.last_modified = None + self._polling_thread = threading.Thread(target=self._run) + self._polling_thread.setDaemon(True) + self._polling_thread.start() + + @staticmethod + def get_datafile_url(sdk_key, url, url_template): + """ Helper method to determine URL from where to fetch the datafile. + + Args: + sdk_key: Key uniquely identifying the datafile. + url: String representing URL from which to fetch the datafile. + url_template: String representing template which is filled in with + SDK key to determine URL from which to fetch the datafile. + + Returns: + String representing URL to fetch datafile from. + + Raises: + optimizely.exceptions.InvalidInputException if: + - One of sdk_key or url is not provided. + - url_template is invalid. + """ + # Ensure that either is provided by the user. + if sdk_key is None and url is None: + raise optimizely_exceptions.InvalidInputException('Must provide at least one of sdk_key or url.') + + # Return URL if one is provided or use template and SDK key to get it. + if url is None: try: - response.raise_for_status() - except requests_exceptions.HTTPError as err: - self.logger.error('Fetching datafile from {} failed. Error: {}'.format(self.datafile_url, str(err))) - return - - # Leave datafile and config unchanged if it has not been modified. - if response.status_code == http_status_codes.not_modified: - self.logger.debug('Not updating config as datafile has not updated since {}.'.format(self.last_modified)) - return - - self.set_last_modified(response.headers) - self._set_config(response.content) + return url_template.format(sdk_key=sdk_key) + except (AttributeError, KeyError): + raise optimizely_exceptions.InvalidInputException( + 'Invalid url_template {} provided.'.format(url_template)) - def fetch_datafile(self): - """ Fetch datafile and set ProjectConfig. """ + return url - request_headers = {} - if self.last_modified: - request_headers[enums.HTTPHeaders.IF_MODIFIED_SINCE] = self.last_modified + def set_update_interval(self, update_interval): + """ Helper method to set frequency at which datafile has to be polled and ProjectConfig updated. - response = requests.get(self.datafile_url, - headers=request_headers, - timeout=enums.ConfigManager.REQUEST_TIMEOUT) - self._handle_response(response) + Args: + update_interval: Time in seconds after which to update datafile. + """ + if not update_interval: + update_interval = enums.ConfigManager.DEFAULT_UPDATE_INTERVAL + self.logger.debug('Set config update interval to default value {}.'.format(update_interval)) - @property - def is_running(self): - """ Check if polling thread is alive or not. """ - return self._polling_thread.is_alive() + if not isinstance(update_interval, (int, float)): + raise optimizely_exceptions.InvalidInputException( + 'Invalid update_interval "{}" provided.'.format(update_interval) + ) - def _run(self): - """ Triggered as part of the thread which fetches the datafile and sleeps until next update interval. """ - try: - while self.is_running: - self.fetch_datafile() - time.sleep(self.update_interval) - except (OSError, OverflowError) as err: - self.logger.error('Error in time.sleep. ' - 'Provided update_interval value may be too big. Error: {}'.format(str(err))) - raise - - def start(self): - """ Start the config manager and the thread to periodically fetch datafile. """ - if not self.is_running: - self._polling_thread.start() + # If polling interval is less than minimum allowed interval then set it to default update interval. + if update_interval < enums.ConfigManager.MIN_UPDATE_INTERVAL: + self.logger.debug('update_interval value {} too small. Defaulting to {}'.format( + update_interval, + enums.ConfigManager.DEFAULT_UPDATE_INTERVAL) + ) + update_interval = enums.ConfigManager.DEFAULT_UPDATE_INTERVAL + + self.update_interval = update_interval + + def set_last_modified(self, response_headers): + """ Looks up and sets last modified time based on Last-Modified header in the response. + + Args: + response_headers: requests.Response.headers + """ + self.last_modified = response_headers.get(enums.HTTPHeaders.LAST_MODIFIED) + + def _handle_response(self, response): + """ Helper method to handle response containing datafile. + + Args: + response: requests.Response + """ + try: + response.raise_for_status() + except requests_exceptions.HTTPError as err: + self.logger.error('Fetching datafile from {} failed. Error: {}'.format(self.datafile_url, str(err))) + return + + # Leave datafile and config unchanged if it has not been modified. + if response.status_code == http_status_codes.not_modified: + self.logger.debug('Not updating config as datafile has not updated since {}.'.format(self.last_modified)) + return + + self.set_last_modified(response.headers) + self._set_config(response.content) + + def fetch_datafile(self): + """ Fetch datafile and set ProjectConfig. """ + + request_headers = {} + if self.last_modified: + request_headers[enums.HTTPHeaders.IF_MODIFIED_SINCE] = self.last_modified + + response = requests.get(self.datafile_url, + headers=request_headers, + timeout=enums.ConfigManager.REQUEST_TIMEOUT) + self._handle_response(response) + + @property + def is_running(self): + """ Check if polling thread is alive or not. """ + return self._polling_thread.is_alive() + + def _run(self): + """ Triggered as part of the thread which fetches the datafile and sleeps until next update interval. """ + try: + while self.is_running: + self.fetch_datafile() + time.sleep(self.update_interval) + except (OSError, OverflowError) as err: + self.logger.error('Error in time.sleep. ' + 'Provided update_interval value may be too big. Error: {}'.format(str(err))) + raise + + def start(self): + """ Start the config manager and the thread to periodically fetch datafile. """ + if not self.is_running: + self._polling_thread.start() diff --git a/optimizely/event/entity/user_event.py b/optimizely/event/entity/user_event.py index a6343d0d..024fae2e 100644 --- a/optimizely/event/entity/user_event.py +++ b/optimizely/event/entity/user_event.py @@ -27,3 +27,6 @@ def _get_time(self): def _get_uuid(self): return str(uuid.uuid4()) + + def __str__(self): + return str(self.__class__) + ": " + str(self.__dict__) diff --git a/optimizely/event/entity/visitor.py b/optimizely/event/entity/visitor.py index d9886b0e..014f1701 100644 --- a/optimizely/event/entity/visitor.py +++ b/optimizely/event/entity/visitor.py @@ -17,3 +17,6 @@ def __init__(self, snapshots, attributes, visitor_id): self.snapshots = snapshots self.attributes = attributes self.visitor_id = visitor_id + + def __str__(self): + return str(self.__class__) + ": " + str(self.__dict__) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py new file mode 100644 index 00000000..fed7c66c --- /dev/null +++ b/optimizely/event/event_processor.py @@ -0,0 +1,220 @@ +# Copyright 2019 Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import threading +import time + +from datetime import timedelta +from six.moves import queue + +from .entity.user_event import UserEvent +from .event_factory import EventFactory +from optimizely import logger as _logging +from optimizely.closeable import Closeable +from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher +from optimizely.helpers import validator + +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + + +class EventProcessor(ABC): + """ Class encapsulating event_processor functionality. Override with your own processor + providing process method. """ + + @abc.abstractmethod + def process(user_event): + pass + + +class BatchEventProcessor(EventProcessor, Closeable): + """ + BatchEventProcessor is a batched implementation of the EventProcessor. + The BatchEventProcessor maintains a single consumer thread that pulls events off of + the blocking queue and buffers them for either a configured batch size or for a + maximum duration before the resulting LogEvent is sent to the EventDispatcher. + """ + + _DEFAULT_QUEUE_CAPACITY = 1000 + _DEFAULT_BATCH_SIZE = 10 + _DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30) + _DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5) + _SHUTDOWN_SIGNAL = object() + _FLUSH_SIGNAL = object() + LOCK = threading.Lock() + + def __init__(self, + event_dispatcher, + logger, + default_start=False, + event_queue=None, + batch_size=None, + flush_interval=None, + timeout_interval=None): + self.event_dispatcher = event_dispatcher or default_event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) + self.batch_size = batch_size if self._validate_intantiation_props(batch_size) else self._DEFAULT_BATCH_SIZE + self.flush_interval = timedelta(milliseconds=flush_interval) if self._validate_intantiation_props(flush_interval) \ + else self._DEFAULT_FLUSH_INTERVAL + self.timeout_interval = timedelta(milliseconds=timeout_interval) \ + if self._validate_intantiation_props(timeout_interval) else self._DEFAULT_TIMEOUT_INTERVAL + self._disposed = False + self._is_started = False + self._current_batch = list() + + if default_start is True: + self.start() + + @property + def is_started(self): + return self._is_started + + @property + def disposed(self): + return self._disposed + + def _validate_intantiation_props(self, prop): + if prop is None or prop < 0 or not validator.is_finite_number(prop): + return False + + return True + + def _get_time_in_ms(self, _time=None): + if _time == 0: + return 0 + + return int(round((_time or time.time()) * 1000)) + + def start(self): + if self.is_started and not self.disposed: + self.logger.warning('Service already started') + return + + self.flushing_interval_deadline = self._get_time_in_ms() + self._get_time_in_ms(self.flush_interval.total_seconds()) + self.executor = threading.Thread(target=self._run) + self.executor.setDaemon(True) + self.executor.start() + + self._is_started = True + + def _run(self): + """ Scheduler method that periodically flushes events queue. """ + try: + while True: + if self._get_time_in_ms() > self.flushing_interval_deadline: + self._flush_queue() + + try: + item = self.event_queue.get(True, 0.05) + + except queue.Empty: + self.logger.debug('Empty queue, sleeping for 50ms.') + time.sleep(0.05) + continue + + if item == self._SHUTDOWN_SIGNAL: + self.logger.debug('Received shutdown signal.') + break + + if item == self._FLUSH_SIGNAL: + self.logger.debug('Received flush signal.') + self._flush_queue() + continue + + if isinstance(item, UserEvent): + self._add_to_batch(item) + + except Exception, exception: + self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) + + finally: + self.logger.info('Exiting processing loop. Attempting to flush pending events.') + self._flush_queue() + + def flush(self): + """ Adds flush signal to event_queue. """ + + self.event_queue.put(self._FLUSH_SIGNAL) + + def _flush_queue(self): + """ Flushes event_queue by dispatching events. """ + + if len(self._current_batch) == 0: + return + + with self.LOCK: + to_process_batch = list(self._current_batch) + self._current_batch = list() + + log_event = EventFactory.create_log_event(to_process_batch, self.logger) + + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception, e: + self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + + def process(self, user_event): + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return + + self.logger.debug('Received user_event: ' + str(user_event)) + + try: + self.event_queue.put_nowait(user_event) + except queue.Full: + self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize()))) + + def _add_to_batch(self, user_event): + if self._should_split(user_event): + self._flush_queue() + self._current_batch = list() + + # Reset the deadline if starting a new batch. + if len(self._current_batch) == 0: + self.flushing_interval_deadline = self._get_time_in_ms() + \ + self._get_time_in_ms(self.flush_interval.total_seconds()) + + with self.LOCK: + self._current_batch.append(user_event) + if len(self._current_batch) >= self.batch_size: + self._flush_queue() + + def _should_split(self, user_event): + if len(self._current_batch) == 0: + return False + + current_context = self._current_batch[-1].event_context + new_context = user_event.event_context + + if current_context.revision != new_context.revision: + return True + + if current_context.project_id != new_context.project_id: + return True + + return False + + def close(self): + """ Stops and disposes batch event processor. """ + self.logger.info('Start close.') + + self.event_queue.put(self._SHUTDOWN_SIGNAL) + self.executor.join(self.timeout_interval.total_seconds()) + + if self.executor.isAlive(): + self.logger.error('Timeout exceeded while attempting to close for ' + self.timeout_interval + ' ms.') + + self.logger.warning('Stopping Scheduler.') + self._is_started = False diff --git a/optimizely/event/log_event.py b/optimizely/event/log_event.py index ea34b17e..1e941e78 100644 --- a/optimizely/event/log_event.py +++ b/optimizely/event/log_event.py @@ -20,3 +20,6 @@ def __init__(self, url, params, http_verb=None, headers=None): self.params = params self.http_verb = http_verb or 'GET' self.headers = headers + + def __str__(self): + return str(self.__class__) + ": " + str(self.__dict__) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py new file mode 100644 index 00000000..c9e06724 --- /dev/null +++ b/tests/test_event_processor.py @@ -0,0 +1,368 @@ +# Copyright 2019, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import mock +import time +from datetime import timedelta +from six.moves import queue + +from . import base +from optimizely.logger import SimpleLogger +from optimizely.event.entity.visitor import Visitor +from optimizely.event.entity.decision import Decision +from optimizely.event.user_event_factory import UserEventFactory +from optimizely.event.event_processor import BatchEventProcessor + + +class CanonicalEvent(object): + + def __init__(self, experiment_id, variation_id, event_name, visitor_id, attributes, tags): + self._experiment_id = experiment_id + self._variation_id = variation_id + self._event_name = event_name + self._visitor_id = visitor_id + self._attributes = attributes or {} + self._tags = tags or {} + + def __eq__(self, other): + if other is None: + return False + + return (self._experiment_id == other._experiment_id and + self._variation_id == other._variation_id and + self._event_name == other._event_name and + self._visitor_id == other._visitor_id and + self._attributes == other._attributes and + self._tags == other._tags) + + +class TestEventDispatcher(object): + + IMPRESSION_EVENT_NAME = 'campaign_activated' + + def __init__(self, countdown_event=None): + self.countdown_event = countdown_event + self.expected_events = list() + self.actual_events = list() + + def compare_events(self): + if len(self.expected_events) != len(self.actual_events): + return False + + for index, event in enumerate(self.expected_events): + expected_event = event + actual_event = self.actual_events[index] + + if not expected_event == actual_event: + return False + + return True + + def dispatch_event(self, actual_log_event): + visitors = [] + log_event_params = json.loads(actual_log_event.params) + + if 'visitors' in log_event_params: + + for visitor in log_event_params['visitors']: + visitor_instance = Visitor(**visitor) + visitors.append(visitor_instance) + + if len(visitors) == 0: + return + + for visitor in visitors: + for snapshot in visitor.snapshots: + decisions = snapshot.get('decisions') or [Decision(None, None, None)] + for decision in decisions: + for event in snapshot.get('events'): + attributes = visitor.attributes + + self.actual_events.append(CanonicalEvent(decision.experiment_id, decision.variation_id, + event.get('key'), visitor.visitor_id, attributes, + event.get('event_tags'))) + + def expect_impression(self, experiment_id, variation_id, user_id, attributes=None): + self._expect(experiment_id, variation_id, self.IMPRESSION_EVENT_NAME, user_id, None) + + def expect_conversion(self, event_name, user_id, attributes=None, event_tags=None): + self._expect(None, None, event_name, user_id, attributes, event_tags) + + def _expect(self, experiment_id, variation_id, event_name, visitor_id, attributes, tags): + expected_event = CanonicalEvent(experiment_id, variation_id, event_name, visitor_id, attributes, tags) + self.expected_events.append(expected_event) + + +class BatchEventProcessorTest(base.BaseTest): + + DEFAULT_QUEUE_CAPACITY = 1000 + MAX_BATCH_SIZE = 10 + MAX_DURATION_MS = 1000 + MAX_TIMEOUT_INTERVAL_MS = 5000 + + def setUp(self, *args, **kwargs): + base.BaseTest.setUp(self, 'config_dict_with_multiple_experiments') + self.test_user_id = 'test_user' + self.event_name = 'test_event' + self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) + self.optimizely.logger = SimpleLogger() + + def tearDown(self): + self._event_processor.close() + + def _build_conversion_event(self, event_name, project_config=None): + config = project_config or self.project_config + return UserEventFactory.create_conversion_event(config, event_name, self.test_user_id, {}, {}) + + def _set_event_processor(self, event_dispatcher, logger): + self._event_processor = BatchEventProcessor(event_dispatcher, + logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_MS, + self.MAX_TIMEOUT_INTERVAL_MS + ) + + def test_drain_on_close(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_max_timeout(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_max_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + for i in range(0, self.MAX_BATCH_SIZE): + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + self._event_processor.flush() + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self._event_processor.process(user_event) + self._event_processor.flush() + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_mismatch_revision(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + self.project_config.revision = 1 + self.project_config.project_id = 'X' + + user_event_1 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_1) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self.project_config.revision = 2 + self.project_config.project_id = 'X' + + user_event_2 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_2) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_mismatch_project_id(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + self.project_config.revision = 1 + self.project_config.project_id = 'X' + + user_event_1 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_1) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self.project_config.revision = 1 + self.project_config.project_id = 'Y' + + user_event_2 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_2) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_stop_and_start(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self._event_processor.close() + + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self._event_processor.start() + self.assertStrictTrue(self._event_processor.is_started) + + self._event_processor.close() + self.assertStrictFalse(self._event_processor.is_started) + + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_init__negative_batchsize(self): + event_dispatcher = TestEventDispatcher() + + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + -5, + self.MAX_DURATION_MS, + self.MAX_TIMEOUT_INTERVAL_MS + ) + + # default batch size is 10. + self.assertEqual(self._event_processor.batch_size, 10) + + def test_init__NaN_batchsize(self): + event_dispatcher = TestEventDispatcher() + + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 'batch_size', + self.MAX_DURATION_MS, + self.MAX_TIMEOUT_INTERVAL_MS + ) + + # default batch size is 10. + self.assertEqual(self._event_processor.batch_size, 10) + + def test_init__negative_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + -100, + self.MAX_TIMEOUT_INTERVAL_MS + ) + + # default flush interval is 30s. + self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) + + def test_init__NaN_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + True, + self.MAX_TIMEOUT_INTERVAL_MS + ) + + # default flush interval is 30s. + self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) + + def test_init__negative_timeout_interval(self): + event_dispatcher = TestEventDispatcher() + + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_MS, + -100 + ) + + # default timeout interval is 5s. + self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) + + def test_init__NaN_timeout_interval(self): + event_dispatcher = TestEventDispatcher() + + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_MS, + False + ) + + # default timeout interval is 5s. + self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) diff --git a/tox.ini b/tox.ini index 7fb571f6..0d134f28 100644 --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,7 @@ # E121 - continuation line indentation is not a multiple of four # E127 - continuation line over-indented for visual indent # E722 - do not use bare 'except' -ignore = E111,E114,E121,E127, E722 +# W504 - line break after binary operator +ignore = E111,E114,E121,E127,E722,W504 exclude = optimizely/lib/pymmh3.py,*virtualenv* max-line-length = 120