Skip to content

Commit d578328

Browse files
author
Bhargav Dodla
committed
fix: Using sliding window rate limiter
1 parent 5710727 commit d578328

File tree

1 file changed

+27
-1
lines changed

1 file changed

+27
-1
lines changed

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,24 @@ def __exit__(self, exc_type, exc_val, exc_tb):
242242
pass
243243

244244

245+
class SlidingWindowRateLimiter:
246+
def __init__(self, max_calls, period):
247+
self.max_calls = max_calls
248+
self.period = period
249+
self.timestamps = [0] * max_calls
250+
self.index = 0
251+
252+
def acquire(self):
253+
now = time.time()
254+
window_start = now - self.period
255+
256+
if self.timestamps[self.index] <= window_start:
257+
self.timestamps[self.index] = now
258+
self.index = (self.index + 1) % self.max_calls
259+
return True
260+
return False
261+
262+
245263
class CassandraOnlineStore(OnlineStore):
246264
"""
247265
Cassandra/Astra DB online store implementation for Feast.
@@ -410,7 +428,8 @@ def on_failure(exc, concurrent_queue):
410428
raise Exception("Error writing a batch") from exc
411429

412430
write_concurrency = config.online_store.write_concurrency
413-
concurrent_queue: queue.Queue = queue.Queue(maxsize=write_concurrency)
431+
rate_limiter = SlidingWindowRateLimiter(write_concurrency, 1)
432+
concurrent_queue: queue.Queue = queue.Queue(maxsize=write_concurrency / 5)
414433

415434
project = config.project
416435
ttl = (
@@ -444,6 +463,12 @@ def on_failure(exc, concurrent_queue):
444463
timestamp,
445464
)
446465
batch.add(insert_cql, params)
466+
467+
# Wait until the rate limiter allows
468+
if not rate_limiter.acquire():
469+
while not rate_limiter.acquire():
470+
time.sleep(0.001)
471+
447472
future = session.execute_async(batch)
448473
concurrent_queue.put(future)
449474
future.add_callbacks(
@@ -460,6 +485,7 @@ def on_failure(exc, concurrent_queue):
460485
# this happens N-1 times, will be corrected outside:
461486
if progress:
462487
progress(1)
488+
# Wait for all tasks to complete
463489
while not concurrent_queue.empty():
464490
time.sleep(0.001)
465491

0 commit comments

Comments
 (0)