Skip to content

Commit 1d858b7

Browse files
authored
feat(eventProcessor): Adds EventProcessor and BatchEventProcessor
Summary ------- - Introduces an EventProcessor interface. - Introduces a BatchEventProcessor Buffering events within a queue before dispatching is an optimization that should prevent SDK implementations from exhausting resources while increasing throughput. This implementation relies on a BlockingCollection to buffer events received from one-to-many producers. A single consumer thread continuously polls from this queue to build a batch before emitting the batched LogEvent. Test plan --------- - Added unit tests.
1 parent d7201cb commit 1d858b7

File tree

8 files changed

+885
-262
lines changed

8 files changed

+885
-262
lines changed

optimizely/closeable.py

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Copyright 2019 Optimizely
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
import abc
15+
16+
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
17+
18+
19+
class Closeable(object):
20+
""" Class encapsulating closing functionality. Override with your own implementation
21+
for close method. """
22+
23+
@abc.abstractmethod
24+
def close(self):
25+
pass

optimizely/config_manager.py

+261-261
Large diffs are not rendered by default.

optimizely/event/entity/user_event.py

+3
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,6 @@ def _get_time(self):
2727

2828
def _get_uuid(self):
2929
return str(uuid.uuid4())
30+
31+
def __str__(self):
32+
return str(self.__class__) + ": " + str(self.__dict__)

optimizely/event/entity/visitor.py

+3
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@ def __init__(self, snapshots, attributes, visitor_id):
1717
self.snapshots = snapshots
1818
self.attributes = attributes
1919
self.visitor_id = visitor_id
20+
21+
def __str__(self):
22+
return str(self.__class__) + ": " + str(self.__dict__)

optimizely/event/event_processor.py

+220
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
# Copyright 2019 Optimizely
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
import abc
15+
import threading
16+
import time
17+
18+
from datetime import timedelta
19+
from six.moves import queue
20+
21+
from .entity.user_event import UserEvent
22+
from .event_factory import EventFactory
23+
from optimizely import logger as _logging
24+
from optimizely.closeable import Closeable
25+
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
26+
from optimizely.helpers import validator
27+
28+
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
29+
30+
31+
class EventProcessor(ABC):
32+
""" Class encapsulating event_processor functionality. Override with your own processor
33+
providing process method. """
34+
35+
@abc.abstractmethod
36+
def process(user_event):
37+
pass
38+
39+
40+
class BatchEventProcessor(EventProcessor, Closeable):
41+
"""
42+
BatchEventProcessor is a batched implementation of the EventProcessor.
43+
The BatchEventProcessor maintains a single consumer thread that pulls events off of
44+
the blocking queue and buffers them for either a configured batch size or for a
45+
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
46+
"""
47+
48+
_DEFAULT_QUEUE_CAPACITY = 1000
49+
_DEFAULT_BATCH_SIZE = 10
50+
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30)
51+
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5)
52+
_SHUTDOWN_SIGNAL = object()
53+
_FLUSH_SIGNAL = object()
54+
LOCK = threading.Lock()
55+
56+
def __init__(self,
57+
event_dispatcher,
58+
logger,
59+
default_start=False,
60+
event_queue=None,
61+
batch_size=None,
62+
flush_interval=None,
63+
timeout_interval=None):
64+
self.event_dispatcher = event_dispatcher or default_event_dispatcher
65+
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
66+
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY)
67+
self.batch_size = batch_size if self._validate_intantiation_props(batch_size) else self._DEFAULT_BATCH_SIZE
68+
self.flush_interval = timedelta(milliseconds=flush_interval) if self._validate_intantiation_props(flush_interval) \
69+
else self._DEFAULT_FLUSH_INTERVAL
70+
self.timeout_interval = timedelta(milliseconds=timeout_interval) \
71+
if self._validate_intantiation_props(timeout_interval) else self._DEFAULT_TIMEOUT_INTERVAL
72+
self._disposed = False
73+
self._is_started = False
74+
self._current_batch = list()
75+
76+
if default_start is True:
77+
self.start()
78+
79+
@property
80+
def is_started(self):
81+
return self._is_started
82+
83+
@property
84+
def disposed(self):
85+
return self._disposed
86+
87+
def _validate_intantiation_props(self, prop):
88+
if prop is None or prop < 0 or not validator.is_finite_number(prop):
89+
return False
90+
91+
return True
92+
93+
def _get_time_in_ms(self, _time=None):
94+
if _time == 0:
95+
return 0
96+
97+
return int(round((_time or time.time()) * 1000))
98+
99+
def start(self):
100+
if self.is_started and not self.disposed:
101+
self.logger.warning('Service already started')
102+
return
103+
104+
self.flushing_interval_deadline = self._get_time_in_ms() + self._get_time_in_ms(self.flush_interval.total_seconds())
105+
self.executor = threading.Thread(target=self._run)
106+
self.executor.setDaemon(True)
107+
self.executor.start()
108+
109+
self._is_started = True
110+
111+
def _run(self):
112+
""" Scheduler method that periodically flushes events queue. """
113+
try:
114+
while True:
115+
if self._get_time_in_ms() > self.flushing_interval_deadline:
116+
self._flush_queue()
117+
118+
try:
119+
item = self.event_queue.get(True, 0.05)
120+
121+
except queue.Empty:
122+
self.logger.debug('Empty queue, sleeping for 50ms.')
123+
time.sleep(0.05)
124+
continue
125+
126+
if item == self._SHUTDOWN_SIGNAL:
127+
self.logger.debug('Received shutdown signal.')
128+
break
129+
130+
if item == self._FLUSH_SIGNAL:
131+
self.logger.debug('Received flush signal.')
132+
self._flush_queue()
133+
continue
134+
135+
if isinstance(item, UserEvent):
136+
self._add_to_batch(item)
137+
138+
except Exception, exception:
139+
self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception))
140+
141+
finally:
142+
self.logger.info('Exiting processing loop. Attempting to flush pending events.')
143+
self._flush_queue()
144+
145+
def flush(self):
146+
""" Adds flush signal to event_queue. """
147+
148+
self.event_queue.put(self._FLUSH_SIGNAL)
149+
150+
def _flush_queue(self):
151+
""" Flushes event_queue by dispatching events. """
152+
153+
if len(self._current_batch) == 0:
154+
return
155+
156+
with self.LOCK:
157+
to_process_batch = list(self._current_batch)
158+
self._current_batch = list()
159+
160+
log_event = EventFactory.create_log_event(to_process_batch, self.logger)
161+
162+
try:
163+
self.event_dispatcher.dispatch_event(log_event)
164+
except Exception, e:
165+
self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e))
166+
167+
def process(self, user_event):
168+
if not isinstance(user_event, UserEvent):
169+
self.logger.error('Provided event is in an invalid format.')
170+
return
171+
172+
self.logger.debug('Received user_event: ' + str(user_event))
173+
174+
try:
175+
self.event_queue.put_nowait(user_event)
176+
except queue.Full:
177+
self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize())))
178+
179+
def _add_to_batch(self, user_event):
180+
if self._should_split(user_event):
181+
self._flush_queue()
182+
self._current_batch = list()
183+
184+
# Reset the deadline if starting a new batch.
185+
if len(self._current_batch) == 0:
186+
self.flushing_interval_deadline = self._get_time_in_ms() + \
187+
self._get_time_in_ms(self.flush_interval.total_seconds())
188+
189+
with self.LOCK:
190+
self._current_batch.append(user_event)
191+
if len(self._current_batch) >= self.batch_size:
192+
self._flush_queue()
193+
194+
def _should_split(self, user_event):
195+
if len(self._current_batch) == 0:
196+
return False
197+
198+
current_context = self._current_batch[-1].event_context
199+
new_context = user_event.event_context
200+
201+
if current_context.revision != new_context.revision:
202+
return True
203+
204+
if current_context.project_id != new_context.project_id:
205+
return True
206+
207+
return False
208+
209+
def close(self):
210+
""" Stops and disposes batch event processor. """
211+
self.logger.info('Start close.')
212+
213+
self.event_queue.put(self._SHUTDOWN_SIGNAL)
214+
self.executor.join(self.timeout_interval.total_seconds())
215+
216+
if self.executor.isAlive():
217+
self.logger.error('Timeout exceeded while attempting to close for ' + self.timeout_interval + ' ms.')
218+
219+
self.logger.warning('Stopping Scheduler.')
220+
self._is_started = False

optimizely/event/log_event.py

+3
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ def __init__(self, url, params, http_verb=None, headers=None):
2020
self.params = params
2121
self.http_verb = http_verb or 'GET'
2222
self.headers = headers
23+
24+
def __str__(self):
25+
return str(self.__class__) + ": " + str(self.__dict__)

0 commit comments

Comments
 (0)