19
19
"""
20
20
21
21
import logging
22
+ import queue
22
23
import threading
23
24
import time
24
25
from datetime import datetime
@@ -399,24 +400,17 @@ def online_write_batch(
399
400
display progress.
400
401
"""
401
402
402
- def on_success (result , semaphore , futures , future , lock ):
403
- semaphore .release ()
404
- with lock :
405
- futures .remove (future )
403
+ def on_success (result , concurrent_queue ):
404
+ concurrent_queue .get_nowait ()
406
405
407
- def on_failure (exc , semaphore , futures , future , lock ):
408
- semaphore .release ()
409
- with lock :
410
- futures .remove (future )
406
+ def on_failure (exc , concurrent_queue ):
407
+ concurrent_queue .get_nowait ()
411
408
logger .exception (f"Error writing a batch: { exc } " )
412
409
print (f"Error writing a batch: { exc } " )
413
410
raise Exception ("Error writing a batch" ) from exc
414
411
415
412
write_concurrency = config .online_store .write_concurrency
416
-
417
- # rate_limiter = RateLimiter(rate=write_concurrency, interval=1)
418
- semaphore = threading .Semaphore (write_concurrency )
419
- lock = threading .Lock ()
413
+ concurrent_queue : queue .Queue = queue .Queue (maxsize = write_concurrency )
420
414
421
415
project = config .project
422
416
ttl = (
@@ -428,7 +422,6 @@ def on_failure(exc, semaphore, futures, future, lock):
428
422
keyspace : str = self ._keyspace
429
423
fqtable = CassandraOnlineStore ._fq_table_name (keyspace , project , table )
430
424
431
- futures = []
432
425
insert_cql = self ._get_cql_statement (
433
426
config ,
434
427
"insert4" ,
@@ -451,58 +444,29 @@ def on_failure(exc, semaphore, futures, future, lock):
451
444
timestamp ,
452
445
)
453
446
batch .add (insert_cql , params )
454
- # with rate_limiter:
455
- semaphore .acquire ()
456
447
future = session .execute_async (batch )
457
- futures . append (future )
448
+ concurrent_queue . put (future )
458
449
future .add_callbacks (
459
450
partial (
460
451
on_success ,
461
- semaphore = semaphore ,
462
- futures = futures ,
463
- future = future ,
464
- lock = lock ,
452
+ concurrent_queue = concurrent_queue ,
465
453
),
466
454
partial (
467
455
on_failure ,
468
- semaphore = semaphore ,
469
- futures = futures ,
470
- future = future ,
471
- lock = lock ,
456
+ concurrent_queue = concurrent_queue ,
472
457
),
473
458
)
474
459
475
- # TODO: Make this efficient by leveraging continuous writes rather
476
- # than blocking until all writes are done. We may need to rate limit
477
- # the writes to reduce the impact on read performance.
478
- # if len(futures) >= write_concurrency:
479
- # # Raises exception if at least one of the batch fails
480
- # self._wait_for_futures(futures)
481
- # futures.clear()
482
-
483
460
# this happens N-1 times, will be corrected outside:
484
461
if progress :
485
462
progress (1 )
486
- while futures :
463
+ while not concurrent_queue . empty () :
487
464
time .sleep (0.001 )
488
- # if len(futures) > 0:
489
- # self._wait_for_futures(futures)
490
- # futures.clear()
491
465
492
466
# correction for the last missing call to `progress`:
493
467
if progress :
494
468
progress (1 )
495
469
496
- def _wait_for_futures (self , futures ):
497
- try :
498
- for future in futures :
499
- future .result ()
500
- futures = []
501
- except Exception as exc :
502
- logger .error (f"Error writing a batch: { exc } " )
503
- print (f"Error writing a batch: { exc } " )
504
- raise Exception ("Error writing a batch" ) from exc
505
-
506
470
def online_read (
507
471
self ,
508
472
config : RepoConfig ,
0 commit comments