Skip to content

Commit

Permalink
fix: Adjusted concurrent queue size to match write concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhargav Dodla committed Feb 5, 2025
1 parent d578328 commit 6c42453
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def _map_by_partition(
)
end_time = time.time()
print(
f"INFO!!! Processed batch with size {pdf_row_count} in {int((end_time - start_time) * 1000)} milliseconds"
f"INFO: Processed batch with size {pdf_row_count} in {int((end_time - start_time) * 1000)} milliseconds"
)
yield pd.DataFrame(
[pd.Series(range(1, 2))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ def online_write_batch(
rows is written to the online store. Can be used to
display progress.
"""
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"{current_time_in_ms} Starting online_write_batch method...")

def on_success(result, concurrent_queue):
concurrent_queue.get_nowait()
Expand All @@ -429,7 +431,9 @@ def on_failure(exc, concurrent_queue):

write_concurrency = config.online_store.write_concurrency
rate_limiter = SlidingWindowRateLimiter(write_concurrency, 1)
concurrent_queue: queue.Queue = queue.Queue(maxsize=write_concurrency / 5)
concurrent_queue: queue.Queue = queue.Queue(maxsize=write_concurrency)
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"{current_time_in_ms} Using write concurrency: {write_concurrency}")

project = config.project
ttl = (
Expand All @@ -438,6 +442,11 @@ def on_failure(exc, concurrent_queue):
or 0
)
session: Session = self._get_session(config)

current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(
f"{current_time_in_ms} {session.session_id} Starting online_write_batch session created..."
)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)

Expand Down Expand Up @@ -466,8 +475,16 @@ def on_failure(exc, concurrent_queue):

# Wait until the rate limiter allows
if not rate_limiter.acquire():
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
:-3
]
print(f"{current_time_in_ms} Rate limit exceeded, waiting...")
while not rate_limiter.acquire():
time.sleep(0.001)
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
:-3
]
print(f"{current_time_in_ms} Rate limit released, continuing...")

future = session.execute_async(batch)
concurrent_queue.put(future)
Expand All @@ -486,12 +503,22 @@ def on_failure(exc, concurrent_queue):
if progress:
progress(1)
# Wait for all tasks to complete
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(
f"{current_time_in_ms} {session.session_id} waiting for queue to be empty"
)
while not concurrent_queue.empty():
time.sleep(0.001)
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"{current_time_in_ms} {session.session_id} queue is empty")

# correction for the last missing call to `progress`:
if progress:
progress(1)
current_time_in_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(
f"{current_time_in_ms} {session.session_id} Done online_write_batch method..."
)

def online_read(
self,
Expand Down

0 comments on commit 6c42453

Please sign in to comment.