Skip to content

Commit

Permalink
fix: Removed rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhargav Dodla committed Feb 5, 2025
1 parent a29b5ae commit 9a4bd61
Showing 1 changed file with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def allow(self) -> bool:
return False

def __enter__(self):
"""Ctimext manager entry: Wait until a token is available."""
"""Context manager entry: Wait until a token is available."""
while not self.allow():
time.sleep(0.01) # Small delay to prevent busy-waiting
return self
Expand Down Expand Up @@ -414,7 +414,7 @@ def on_failure(exc, semaphore, futures, future, lock):

write_concurrency = config.online_store.write_concurrency

rate_limiter = RateLimiter(rate=write_concurrency, interval=1)
# rate_limiter = RateLimiter(rate=write_concurrency, interval=1)
semaphore = threading.Semaphore(write_concurrency)
lock = threading.Lock()

Expand Down Expand Up @@ -451,26 +451,26 @@ def on_failure(exc, semaphore, futures, future, lock):
timestamp,
)
batch.add(insert_cql, params)
with rate_limiter:
semaphore.acquire()
future = session.execute_async(batch)
futures.append(future)
future.add_callbacks(
partial(
on_success,
semaphore=semaphore,
futures=futures,
future=future,
lock=lock,
),
partial(
on_failure,
semaphore=semaphore,
futures=futures,
future=future,
lock=lock,
),
)
# with rate_limiter:
semaphore.acquire()
future = session.execute_async(batch)
futures.append(future)
future.add_callbacks(
partial(
on_success,
semaphore=semaphore,
futures=futures,
future=future,
lock=lock,
),
partial(
on_failure,
semaphore=semaphore,
futures=futures,
future=future,
lock=lock,
),
)

# TODO: Make this efficient by leveraging continuous writes rather
# than blocking until all writes are done. We may need to rate limit
Expand Down

0 comments on commit 9a4bd61

Please sign in to comment.