1818import uuid
1919import os
2020import subprocess
21+ import boto3
22+ import glob
2123
2224# Custom form making
2325from wtforms .validators import Required
@@ -123,22 +125,39 @@ def workflow_running(pipeline_path, yaml_file):
123125 line = proc .stdout .readline ()
124126 if not line :
125127 break
126- print (str (line ))
127128 current_task .update_state (state = 'PROGRESS' , meta = {'msg' : str (line )})
128129 return 999
129130
130131@app .route ("/workflow_progress" )
131132def workflow_progress ():
132- print ("WORKFLOW RETURN" )
133133 jobid = request .values .get ('jobid' )
134134 print (jobid )
135135 if jobid :
136136 job = AsyncResult (jobid , app = celery )
137137 print (job .state )
138138 if job .state == 'PROGRESS' :
139139 return json .dumps (dict ( state = job .state , msg = job .result ['msg' ],))
140+
140141 elif job .state == 'SUCCESS' :
142+ ## S3 Upload process START
143+ output_counter = int (session .get ('output_count' , None ))
144+ output_folder_list = [ session .get ('output' + str (i ), None ) for i in range (output_counter )]
145+ logID = session .get ('logID' , None )
146+ bucket_name = 'openkbc-ms-result-bucket' # fixed bucket
147+ #bucket_dest = 's3://'+bucket_name+"/"+logID+"/"
148+
149+ s3 = boto3 .client ('s3' ) # Client set, S3
150+ for path in output_folder_list :
151+ filelist = glob .glob (path + "/*" ) # search all files
152+ for fname in filelist : # get name
153+ with open (fname , "rb" ) as f :
154+ s3 .upload_fileobj (f , bucket_name , logID + "/" + os .path .basename (fname )) # upload to s3
155+ ## S3 Upload process END
156+
141157 return json .dumps (dict ( state = job .state , msg = "done" ,))
158+
159+ elif job .state == 'FAILURE' :
160+ return json .dumps (dict ( state = job .state , msg = "failture" ,)) ## return somewhere to exit
142161 return '{}'
143162
144163@app .route ("/status" )
@@ -190,13 +209,26 @@ def _reform_yamlFile(selected_pipeline, data_dict):
190209 f = open (yamlFileName , "w" ) # write file with unique name
191210
192211 nested_items = [] # List for handing nested items
212+ output_count = 0 # Output key count(Tracking purpose)
193213 for key , value in data_dict .items ():
194214 if key .find ('--' )> - 1 : # Nested key has '--'
195215 subkeys = key .split ('--' )# 2 layers keys
196216 nested_items .append ([subkeys [0 ],subkeys [1 ],value ]) #make list
197217 else :
198- f .write (key + ": " + value + "\n " )
218+ ## Tracking output path and user ID
219+ if key .find ("Output" ) > - 1 or key .find ("output" ) > - 1 : ## key has 'output' string
220+ output_count += 1
221+ session ['output' + str (output_count )]= value # set session for output folder (Tracking purpose)
222+ session ['output_count' ] = output_count # set session for output counter (Tracking purpose)
223+
224+ if key .find ('logID' ) > - 1 : # Find log ID
225+ session ['logID' ] = value # set session for ID
226+ ## Tracking output path and user ID
199227
228+ f .write (key + ": " + value + "\n " ) ## Write new form of yaml
229+
230+ ### Add error handling here
231+ ### Add error handling here
200232 key1_unique = list (set ([x [0 ] for x in nested_items ])) # make a list of root key
201233 for x in key1_unique :
202234 f .write (x + ":" + "\n " ) # first line of nested key (root key)
@@ -206,6 +238,10 @@ def _reform_yamlFile(selected_pipeline, data_dict):
206238
207239 f .close ()
208240 return yamlFileName
209-
241+
242+ def get_filenames (path ):
243+ filelist = glob .glob (path + "/*" )
244+ return filelist
245+
210246if __name__ == '__main__' :
211247 app .run (host = '0.0.0.0' )
0 commit comments