From 9a4bd61007f3f418e4a3346549881efb10a662cb Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Tue, 4 Feb 2025 18:48:05 -0800 Subject: [PATCH] fix: Removed rate limiting --- .../cassandra_online_store.py | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index cf69dabd9af..d9c8476487d 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -231,7 +231,7 @@ def allow(self) -> bool: return False def __enter__(self): - """Ctimext manager entry: Wait until a token is available.""" + """Context manager entry: Wait until a token is available.""" while not self.allow(): time.sleep(0.01) # Small delay to prevent busy-waiting return self @@ -414,7 +414,7 @@ def on_failure(exc, semaphore, futures, future, lock): write_concurrency = config.online_store.write_concurrency - rate_limiter = RateLimiter(rate=write_concurrency, interval=1) + # rate_limiter = RateLimiter(rate=write_concurrency, interval=1) semaphore = threading.Semaphore(write_concurrency) lock = threading.Lock() @@ -451,26 +451,26 @@ def on_failure(exc, semaphore, futures, future, lock): timestamp, ) batch.add(insert_cql, params) - with rate_limiter: - semaphore.acquire() - future = session.execute_async(batch) - futures.append(future) - future.add_callbacks( - partial( - on_success, - semaphore=semaphore, - futures=futures, - future=future, - lock=lock, - ), - partial( - on_failure, - semaphore=semaphore, - futures=futures, - future=future, - lock=lock, - ), - ) + # with rate_limiter: + semaphore.acquire() + future = session.execute_async(batch) + futures.append(future) + future.add_callbacks( + partial( + on_success, + semaphore=semaphore, + futures=futures, + future=future, + lock=lock, + ), + partial( + on_failure, + semaphore=semaphore, + futures=futures, + future=future, + lock=lock, + ), + ) # TODO: Make this efficient by leveraging continuous writes rather # than blocking until all writes are done. We may need to rate limit