@@ -82,6 +82,12 @@ def process_data(infile, outfile):
82
82
from cgatcore .pipeline .files import get_temp_filename , get_temp_dir
83
83
from cgatcore .pipeline .parameters import substitute_parameters , get_params
84
84
from cgatcore .pipeline .cluster import get_queue_manager , JobInfo
85
+ from cgatcore .pipeline .executors import SGEExecutor , SlurmExecutor , TorqueExecutor , LocalExecutor
86
+ try :
87
+ from cgatcore .pipeline .kubernetes import KubernetesExecutor
88
+ except ImportError :
89
+ KubernetesExecutor = None # Fallback if Kubernetes is not available
90
+
85
91
86
92
# talking to a cluster
87
93
try :
@@ -424,6 +430,50 @@ def interpolate_statement(statement, kwargs):
424
430
return statement
425
431
426
432
433
+ def get_executor (options = None ):
434
+ """
435
+ Return an executor instance based on the specified queue manager in options.
436
+
437
+ Parameters:
438
+ - options (dict): Dictionary containing execution options,
439
+ including "cluster_queue_manager".
440
+
441
+ Returns:
442
+ - Executor instance appropriate for the specified queue manager.
443
+ """
444
+ if options is None :
445
+ options = get_params ()
446
+
447
+ if options .get ("testing" , False ):
448
+ return LocalExecutor (** options )
449
+
450
+ # Check if to_cluster is explicitly set to False
451
+ if not options .get ("to_cluster" , True ): # Defaults to True if not specified
452
+ return LocalExecutor (** options )
453
+
454
+ queue_manager = options .get ("cluster_queue_manager" , None )
455
+
456
+ # Check for KubernetesExecutor
457
+ if queue_manager == "kubernetes" and KubernetesExecutor is not None :
458
+ return KubernetesExecutor (** options )
459
+
460
+ # Check for SGEExecutor (Sun Grid Engine)
461
+ elif queue_manager == "sge" and shutil .which ("qsub" ) is not None :
462
+ return SGEExecutor (** options )
463
+
464
+ # Check for SlurmExecutor
465
+ elif queue_manager == "slurm" and shutil .which ("sbatch" ) is not None :
466
+ return SlurmExecutor (** options )
467
+
468
+ # Check for TorqueExecutor
469
+ elif queue_manager == "torque" and shutil .which ("qsub" ) is not None :
470
+ return TorqueExecutor (** options )
471
+
472
+ # Fallback to LocalExecutor, not sure if this should raise an error though, feels like it should
473
+ else :
474
+ return LocalExecutor (** options )
475
+
476
+
427
477
def join_statements (statements , infile , outfile = None ):
428
478
'''join a chain of statements into a single statement.
429
479
@@ -1318,32 +1368,6 @@ class LocalArrayExecutor(LocalExecutor):
1318
1368
pass
1319
1369
1320
1370
1321
- def make_runner (** kwargs ):
1322
- """factory function returning an object capable of executing
1323
- a list of command line statements.
1324
- """
1325
-
1326
- run_as_array = "job_array" in kwargs and kwargs ["job_array" ] is not None
1327
-
1328
- # run on cluster if:
1329
- # * to_cluster is not defined or set to True
1330
- # * command line option without_cluster is set to False
1331
- # * an SGE session is present
1332
- run_on_cluster = will_run_on_cluster (kwargs )
1333
- if run_on_cluster :
1334
- if run_as_array :
1335
- runner = GridArrayExecutor (** kwargs )
1336
- else :
1337
- runner = GridExecutor (** kwargs )
1338
- else :
1339
- if run_as_array :
1340
- runner = LocalArrayExecutor (** kwargs )
1341
- else :
1342
- runner = LocalExecutor (** kwargs )
1343
-
1344
- return runner
1345
-
1346
-
1347
1371
def run (statement , ** kwargs ):
1348
1372
"""run a command line statement.
1349
1373
@@ -1442,7 +1466,7 @@ def run(statement, **kwargs):
1442
1466
"""
1443
1467
logger = get_logger ()
1444
1468
1445
- # combine options using priority
1469
+ # Combine options using priority
1446
1470
options = dict (list (get_params ().items ()))
1447
1471
caller_options = get_caller_locals ()
1448
1472
options .update (list (caller_options .items ()))
@@ -1451,7 +1475,7 @@ def run(statement, **kwargs):
1451
1475
del options ["self" ]
1452
1476
options .update (list (kwargs .items ()))
1453
1477
1454
- # inject params named tuple from TaskLibrary functions into option
1478
+ # Inject params named tuple from TaskLibrary functions into option
1455
1479
# dict. This allows overriding options set in the code with options set
1456
1480
# in a .yml file
1457
1481
if "params" in options :
@@ -1460,7 +1484,7 @@ def run(statement, **kwargs):
1460
1484
except AttributeError :
1461
1485
pass
1462
1486
1463
- # insert parameters supplied through simplified interface such
1487
+ # Insert parameters supplied through simplified interface such
1464
1488
# as job_memory, job_options, job_queue
1465
1489
options ['cluster' ]['options' ] = options .get (
1466
1490
'job_options' , options ['cluster' ]['options' ])
@@ -1483,34 +1507,33 @@ def run(statement, **kwargs):
1483
1507
1484
1508
options ["task_name" ] = calling_module + "." + get_calling_function ()
1485
1509
1486
- # build statements using parameter interpolation
1510
+ # Build statements using parameter interpolation
1487
1511
if isinstance (statement , list ):
1488
- statement_list = []
1489
- for stmt in statement :
1490
- statement_list .append (interpolate_statement (stmt , options ))
1512
+ statement_list = [interpolate_statement (stmt , options ) for stmt in statement ]
1491
1513
else :
1492
1514
statement_list = [interpolate_statement (statement , options )]
1493
1515
1494
1516
if len (statement_list ) == 0 :
1495
- logger .warn ("no statements found - no execution" )
1517
+ logger .warn ("No statements found - no execution" )
1496
1518
return []
1497
1519
1498
1520
if options .get ("dryrun" , False ):
1499
1521
for statement in statement_list :
1500
- logger .info ("dry -run: {}" .format (statement ))
1522
+ logger .info ("Dry -run: {}" .format (statement ))
1501
1523
return []
1502
1524
1503
- # execute statement list
1504
- runner = make_runner (** options )
1505
- with runner as r :
1506
- benchmark_data = r .run (statement_list )
1525
+ # Use get_executor to get the appropriate executor
1526
+ executor = get_executor (options ) # Updated to use get_executor
1527
+
1528
+ # Execute statement list within the context of the executor
1529
+ with executor as e :
1530
+ benchmark_data = e .run (statement_list )
1507
1531
1508
- # log benchmark_data
1532
+ # Log benchmark data
1509
1533
for data in benchmark_data :
1510
1534
logger .info (json .dumps (data ))
1511
1535
1512
- BenchmarkData = collections .namedtuple (
1513
- 'BenchmarkData' , sorted (benchmark_data [0 ]))
1536
+ BenchmarkData = collections .namedtuple ('BenchmarkData' , sorted (benchmark_data [0 ]))
1514
1537
return [BenchmarkData (** d ) for d in benchmark_data ]
1515
1538
1516
1539
0 commit comments