11import os
22import platform
33import re
4+ import subprocess
45from pathlib import Path , PurePath , PurePosixPath
56from typing import Any , Dict , List , Optional , Tuple , Union
67
@@ -42,6 +43,83 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio
4243 return PurePosixPath (dest_path , rel_path )
4344
4445
46+ def download_gcs (gcs_path : str , local_path : str , is_dir : bool ):
47+ # check that output path exists
48+ if not os .path .exists (Path (local_path ).parent ):
49+ os .makedirs (Path (local_path ).parent )
50+
51+ # build command
52+ cmd = 'gcloud storage'
53+ # rsync with checksums to make file transfer faster for larger files
54+ cmd = cmd + ' rsync --checksums-only'
55+ # check if directory
56+ if is_dir :
57+ cmd = cmd + ' -r'
58+ cmd = cmd + ' ' + gcs_path + ' ' + local_path
59+
60+ print (cmd )
61+ # run command
62+ subprocess .run (cmd , shell = True )
63+
64+ if Path (Path (local_path )/ 'gcs_temp.txt' ).exists ():
65+ Path (Path (local_path )/ 'gcs_temp.txt' ).unlink ()
66+
67+
68+ def upload_gcs (local_path : str , gcs_path : str , is_dir : bool ):
69+ # check if path exists in cloud storage
70+ exists = len (subprocess .run (f'gcloud storage ls { gcs_path } ' , shell = True , capture_output = True , text = True ).stdout )
71+ # if path exists rsync
72+ if exists > 0 :
73+ cmd = 'gcloud storage rsync --checksums-only'
74+ # if directory is empty
75+ elif exists == 0 and len (os .listdir (local_path )) == 0 :
76+ # create a temporary file because GCS will not recognize empty directories
77+ Path (Path (local_path )/ 'gcs_temp.txt' ).touch ()
78+ # copy path to cloud storage
79+ cmd = 'gcloud storage cp -c'
80+ # else copy path to cloud storage
81+ else :
82+ cmd = 'gcloud storage cp -c'
83+ # check if directory
84+ if is_dir :
85+ cmd = cmd + ' -r'
86+ cmd = cmd + ' ' + str (Path (local_path ).resolve ()) + ' ' + gcs_path
87+
88+ print (cmd )
89+ # run command
90+ subprocess .run (cmd , shell = True )
91+
92+
93+ def prepare_dsub_cmd (flags : dict ):
94+ # set constant flags
95+ dsub_command = 'dsub'
96+ flags ['provider' ] = 'google-cls-v2'
97+ flags ['regions' ] = 'us-central1'
98+ flags ['user-project' ] = os .getenv ('GOOGLE_PROJECT' )
99+ flags ['project' ] = os .getenv ('GOOGLE_PROJECT' )
100+ flags ['network' ] = 'network'
101+ flags ['subnetwork' ] = 'subnetwork'
102+ flags ['service-account' ] = subprocess .run (['gcloud' , 'config' , 'get-value' , 'account' ], capture_output = True , text = True ).stdout .replace ('\n ' , '' )
103+
104+ # order flags according to flag_list
105+ flag_list = ["provider" , "regions" , "zones" , "location" , "user-project" , "project" , "network" , "subnetwork" , "service-account" , "image" , "env" ,
106+ "logging" , "input" , "input-recursive" , "mount" , "output" , "output-recursive" , "command" , "script" ]
107+ ordered_flags = {f :flags [f ] for f in flag_list if f in flags .keys ()}
108+
109+ # iteratively add flags to the command
110+ for flag in ordered_flags .keys ():
111+ if isinstance (ordered_flags .get (flag ), list ):
112+ for f in ordered_flags .get (flag ):
113+ dsub_command = dsub_command + " --" + flag + " " + f
114+ else :
115+ dsub_command = dsub_command + " --" + flag + " " + ordered_flags .get (flag )
116+
117+ # Wait for dsub job to complete
118+ dsub_command = dsub_command + " --wait"
119+ print (f"dsub command: { dsub_command } " )
120+ return dsub_command
121+
122+
45123# TODO consider a better default environment variable
46124# TODO environment currently a single string (e.g. 'TMPDIR=/OmicsIntegrator1'), should it be a list?
47125# run_container_singularity assumes a single string
@@ -65,6 +143,8 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol
65143 return run_container_docker (container , command , volumes , working_dir , environment )
66144 elif normalized_framework == 'singularity' :
67145 return run_container_singularity (container , command , volumes , working_dir , environment )
146+ elif normalized_framework == 'dsub' :
147+ return run_container_dsub (container , command , volumes , working_dir , environment )
68148 else :
69149 raise ValueError (f'{ framework } is not a recognized container framework. Choose "docker" or "singularity".' )
70150
@@ -223,6 +303,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[
223303 options = singularity_options ,
224304 bind = bind_paths )
225305
306+
226307# Because this is called independently for each file, the same local path can be mounted to multiple volumes
227308def prepare_volume (filename : Union [str , PurePath ], volume_base : Union [str , PurePath ]) -> Tuple [Tuple [PurePath , PurePath ], str ]:
228309 """
@@ -258,3 +339,72 @@ def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PureP
258339 src = parent
259340
260341 return (src , dest ), container_filename
342+
343+
344+ def run_container_dsub (container : str , command : List [str ], volumes : List [Tuple [PurePath , PurePath ]], working_dir : str , environment : str = 'SPRAS=True' ) -> str :
345+ """
346+ Runs a command in the Google Cloud using dsub.
347+ @param container: name of the container in the Google Cloud Container Registry
348+ @param command: command to run
349+ @param volumes: a list of volumes to mount where each item is a (source, destination) tuple
350+ @param working_dir: the working directory in the container
351+ @param environment: environment variables to set in the container
352+ @return: path of output from dsub
353+ """
354+ # Dictionary of flags for dsub command
355+ flags = dict ()
356+
357+ workspace_bucket = os .getenv ('WORKSPACE_BUCKET' )
358+ # Add path in the workspace bucket and label for dsub command for each volume
359+ dsub_volumes = [(src , dst , workspace_bucket + str (dst ), "INPUT_" + str (i ),) for i , (src , dst ) in enumerate (volumes )]
360+
361+ # Prepare command that will be run inside the container for dsub
362+ container_command = list ()
363+ for item in command :
364+ # Find if item is volume
365+ to_replace = [(str (path [1 ]), "${" + path [3 ]+ '}' ) for path in dsub_volumes if str (path [1 ]) in item ]
366+ # Replace volume path with dsub volume path
367+ if len (to_replace ) == 1 :
368+ # Get path that will be replaced
369+ path = to_replace [0 ][0 ]
370+ # Get dsub input variable that will replace path
371+ env_variable = to_replace [0 ][1 ]
372+ # Replace path with env_variable
373+ container_path = item .replace (path , env_variable )
374+ # Add / if there is no suffix
375+ if container_path == env_variable :
376+ container_path = container_path + '/'
377+ container_command .append (container_path )
378+ else :
379+ container_command .append (item )
380+
381+ # Add a command to copy the volumes to the workspace buckets
382+ container_command .append (('; cp -rf ' + f'/mnt/data/input/gs/{ workspace_bucket } { working_dir } /*' + ' $OUTPUT' ).replace ('gs://' , '' ))
383+
384+ # Make the command into a string
385+ flags ['command' ] = ' ' .join (container_command )
386+ flags ['command' ] = "'" + flags ['command' ] + "'"
387+
388+ # Push volumes to WORKSPACE_BUCKET
389+ for src , _dst , gcs_path , _env in dsub_volumes :
390+ upload_gcs (local_path = str (src ), gcs_path = gcs_path , is_dir = True )
391+
392+ # Prepare flags for dsub command
393+ flags ['image' ] = container
394+ flags ['env' ] = environment
395+ flags ['input-recursive' ] = [vol [3 ]+ '=' + vol [2 ] for vol in dsub_volumes ]
396+ flags ['output-recursive' ] = "OUTPUT=" + workspace_bucket + working_dir
397+ flags ['logging' ] = workspace_bucket + '/dsub/'
398+
399+ # Create dsub command
400+ dsub_command = prepare_dsub_cmd (flags )
401+
402+ # Run dsub as subprocess
403+ subprocess .run (dsub_command , shell = True )
404+
405+ # Pull output volumes from WORKSPACE_BUCKET
406+ for src , _dst , gcs_path , _env in dsub_volumes :
407+ download_gcs (local_path = str (src ), gcs_path = gcs_path , is_dir = True )
408+
409+ # return location of dsub logs in WORKSPACE_BUCKET
410+ return 'dsub logs: {logs}' .format (logs = flags ['logging' ])
0 commit comments