Skip to content

Batch event processor #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
901d8f1
fix: remove unsupported package.
Jul 25, 2019
39113c3
Update .travis.yml
mariamjamal94 Jul 25, 2019
94e5df0
Update .travis.yml
mariamjamal94 Jul 25, 2019
8687672
Update .travis.yml
mariamjamal94 Jul 25, 2019
520f62c
Update .travis.yml
mariamjamal94 Jul 25, 2019
feb445b
Merge branch 'master' of https://github.com/Mariamjamal32/python-sdk
Jul 29, 2019
ae18085
Merge branch 'master' into mjamal/event_factory
Jul 29, 2019
90e694b
Merge branch 'mjamal/event_factory' of https://github.com/Mariamjamal…
Jul 29, 2019
d73e8cb
fix: resolve issues in _create_visitor class method
Jul 29, 2019
501b9c5
Merge branch 'master' of git://github.com/optimizely/python-sdk
Aug 1, 2019
018fb2d
Merge branch 'master' into mjamal/event_factory
Aug 1, 2019
e81e9a5
feat: add event_processor interface to support batch event processing.
Aug 1, 2019
ad59496
Merge remote-tracking branch 'remotes/upstream/sohail/pr-193' into ba…
Aug 1, 2019
fed4697
Merge branch 'sohail/pr-193' into batch_event_processor
mnoman09 Aug 1, 2019
f36754e
sync batch_event_processor with pr-193.
Aug 1, 2019
9f5f80d
Merge branch 'batch_event_processor' of https://github.com/Mariamjama…
Aug 1, 2019
32494b9
Update event_processor.py
mariamjamal94 Aug 2, 2019
373ef41
update: cater review comments.
Aug 2, 2019
0125d9f
update: review suggestions addressed.
Aug 7, 2019
59c5416
fix: address review comment to add abstract method from ABC in event_…
Aug 8, 2019
5c0632c
update event/event_processor.py
Aug 9, 2019
dce8efb
fix: remove stop and add close method in BatchEventProcessor.
Aug 9, 2019
6aca4ff
fix: reset indentation.
Aug 9, 2019
2d8d947
update BatchEventProcessor.
Aug 9, 2019
9171484
fix: remove stop from test_event_processor.
Aug 9, 2019
d52be4f
fix: logger level.
Aug 15, 2019
f55d6cf
update: fallback to defaults for invalid and negative values BatchEve…
Aug 16, 2019
3e55e2e
fix: linter error.
Aug 16, 2019
3154499
update: review suggestion catered.
Aug 16, 2019
06b5c25
update: import convention.
Aug 16, 2019
aab5ab4
update: remove uncertain log tests.
Aug 16, 2019
f4c23b1
update: handle 0ms flush_interval condition.
Aug 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions optimizely/closeable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2019 Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc

ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})


class Closeable(object):
""" Class encapsulating closing functionality. Override with your own implementation
for close method. """

@abc.abstractmethod
def close(self):
pass
522 changes: 261 additions & 261 deletions optimizely/config_manager.py

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions optimizely/event/entity/user_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ def _get_time(self):

def _get_uuid(self):
return str(uuid.uuid4())

def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
3 changes: 3 additions & 0 deletions optimizely/event/entity/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ def __init__(self, snapshots, attributes, visitor_id):
self.snapshots = snapshots
self.attributes = attributes
self.visitor_id = visitor_id

def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
220 changes: 220 additions & 0 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# Copyright 2019 Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import threading
import time

from datetime import timedelta
from six.moves import queue

from .entity.user_event import UserEvent
from .event_factory import EventFactory
from optimizely import logger as _logging
from optimizely.closeable import Closeable
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
from optimizely.helpers import validator

ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})


class EventProcessor(ABC):
""" Class encapsulating event_processor functionality. Override with your own processor
providing process method. """

@abc.abstractmethod
def process(user_event):
pass


class BatchEventProcessor(EventProcessor, Closeable):
"""
BatchEventProcessor is a batched implementation of the EventProcessor.
The BatchEventProcessor maintains a single consumer thread that pulls events off of
the blocking queue and buffers them for either a configured batch size or for a
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
"""

_DEFAULT_QUEUE_CAPACITY = 1000
_DEFAULT_BATCH_SIZE = 10
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30)
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5)
_SHUTDOWN_SIGNAL = object()
_FLUSH_SIGNAL = object()
LOCK = threading.Lock()

def __init__(self,
event_dispatcher,
logger,
default_start=False,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None):
self.event_dispatcher = event_dispatcher or default_event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY)
self.batch_size = batch_size if self._validate_intantiation_props(batch_size) else self._DEFAULT_BATCH_SIZE
self.flush_interval = timedelta(milliseconds=flush_interval) if self._validate_intantiation_props(flush_interval) \
else self._DEFAULT_FLUSH_INTERVAL
self.timeout_interval = timedelta(milliseconds=timeout_interval) \
if self._validate_intantiation_props(timeout_interval) else self._DEFAULT_TIMEOUT_INTERVAL
self._disposed = False
self._is_started = False
self._current_batch = list()

if default_start is True:
self.start()

@property
def is_started(self):
return self._is_started

@property
def disposed(self):
return self._disposed

def _validate_intantiation_props(self, prop):
if prop is None or prop < 0 or not validator.is_finite_number(prop):
return False

return True

def _get_time_in_ms(self, _time=None):
if _time == 0:
return 0

return int(round((_time or time.time()) * 1000))

def start(self):
if self.is_started and not self.disposed:
self.logger.warning('Service already started')
return

self.flushing_interval_deadline = self._get_time_in_ms() + self._get_time_in_ms(self.flush_interval.total_seconds())
self.executor = threading.Thread(target=self._run)
self.executor.setDaemon(True)
self.executor.start()

self._is_started = True

def _run(self):
""" Scheduler method that periodically flushes events queue. """
try:
while True:
if self._get_time_in_ms() > self.flushing_interval_deadline:
self._flush_queue()

try:
item = self.event_queue.get(True, 0.05)

except queue.Empty:
self.logger.debug('Empty queue, sleeping for 50ms.')
time.sleep(0.05)
continue

if item == self._SHUTDOWN_SIGNAL:
self.logger.debug('Received shutdown signal.')
break

if item == self._FLUSH_SIGNAL:
self.logger.debug('Received flush signal.')
self._flush_queue()
continue

if isinstance(item, UserEvent):
self._add_to_batch(item)

except Exception, exception:
self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception))

finally:
self.logger.info('Exiting processing loop. Attempting to flush pending events.')
self._flush_queue()

def flush(self):
""" Adds flush signal to event_queue. """

self.event_queue.put(self._FLUSH_SIGNAL)

def _flush_queue(self):
""" Flushes event_queue by dispatching events. """

if len(self._current_batch) == 0:
return

with self.LOCK:
to_process_batch = list(self._current_batch)
self._current_batch = list()

log_event = EventFactory.create_log_event(to_process_batch, self.logger)

try:
self.event_dispatcher.dispatch_event(log_event)
except Exception, e:
self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e))

def process(self, user_event):
if not isinstance(user_event, UserEvent):
self.logger.error('Provided event is in an invalid format.')
return

self.logger.debug('Received user_event: ' + str(user_event))

try:
self.event_queue.put_nowait(user_event)
except queue.Full:
self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())))

def _add_to_batch(self, user_event):
if self._should_split(user_event):
self._flush_queue()
self._current_batch = list()

# Reset the deadline if starting a new batch.
if len(self._current_batch) == 0:
self.flushing_interval_deadline = self._get_time_in_ms() + \
self._get_time_in_ms(self.flush_interval.total_seconds())

with self.LOCK:
self._current_batch.append(user_event)
if len(self._current_batch) >= self.batch_size:
self._flush_queue()

def _should_split(self, user_event):
if len(self._current_batch) == 0:
return False

current_context = self._current_batch[-1].event_context
new_context = user_event.event_context

if current_context.revision != new_context.revision:
return True

if current_context.project_id != new_context.project_id:
return True

return False

def close(self):
""" Stops and disposes batch event processor. """
self.logger.info('Start close.')

self.event_queue.put(self._SHUTDOWN_SIGNAL)
self.executor.join(self.timeout_interval.total_seconds())

if self.executor.isAlive():
self.logger.error('Timeout exceeded while attempting to close for ' + self.timeout_interval + ' ms.')

self.logger.warning('Stopping Scheduler.')
self._is_started = False
3 changes: 3 additions & 0 deletions optimizely/event/log_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ def __init__(self, url, params, http_verb=None, headers=None):
self.params = params
self.http_verb = http_verb or 'GET'
self.headers = headers

def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
Loading