From 8ae5cb547fa72e427f34ad75da333e0e67d49650 Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Fri, 6 Mar 2020 22:53:18 +0100 Subject: [PATCH] first fully working version of DF --- worker/Dockerfile | 8 ++-- worker/fiji-worker.py | 95 ++++++++++++++++++++----------------------- worker/run-worker.sh | 3 +- 3 files changed, 49 insertions(+), 57 deletions(-) diff --git a/worker/Dockerfile b/worker/Dockerfile index f269fd2..b9b9608 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -9,7 +9,7 @@ FROM fiji/fiji:fiji-openjdk-8 USER root -# Install S3FS +# Install S3FS RUN apt-get -y update && \ apt-get -y upgrade && \ @@ -40,7 +40,7 @@ RUN apt-get -y upgrade python2.7 RUN apt-get -y install python-pip RUN \ - pip install awscli + pip install awscli # Install boto3 @@ -54,8 +54,7 @@ RUN \ # SETUP NEW ENTRYPOINT -RUN mkdir -p /home/ubuntu/ -WORKDIR /home/ubuntu +WORKDIR /opt/fiji COPY fiji-worker.py . COPY instance-monitor.py . COPY run-worker.sh . @@ -63,4 +62,3 @@ RUN chmod 755 run-worker.sh ENTRYPOINT ["./run-worker.sh"] CMD [""] - diff --git a/worker/fiji-worker.py b/worker/fiji-worker.py index 8b6376e..f05a686 100644 --- a/worker/fiji-worker.py +++ b/worker/fiji-worker.py @@ -6,7 +6,7 @@ import os import re import subprocess -import sys +import sys import time import watchtower import string @@ -16,7 +16,6 @@ ################################# DATA_ROOT = '/home/ubuntu/bucket' -LOCAL_OUTPUT = '/home/ubuntu/local_output' QUEUE_URL = os.environ['SQS_QUEUE_URL'] AWS_BUCKET = os.environ['AWS_BUCKET'] LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] @@ -33,7 +32,7 @@ class JobQueue(): def __init__(self, queueURL): self.client = boto3.client('sqs') self.queueURL = queueURL - + def readMessage(self): response = self.client.receive_message(QueueUrl=self.queueURL, WaitTimeSeconds=20) if 'Messages' in response.keys(): @@ -63,7 +62,7 @@ def monitorAndLog(process,logger): break if output: print(output.strip()) - logger.info(output) + logger.info(output) def printandlog(text,logger): print(text) @@ -82,72 +81,69 @@ def stringify_metadata_dict(mdict): def runFIJI(message): #List the directories in the bucket- this prevents a strange s3fs error rootlist=os.listdir(DATA_ROOT) - for eachSubDir in rootlist: - subDirName=os.path.join(DATA_ROOT,eachSubDir) - if os.path.isdir(subDirName): - trashvar=os.system('ls '+subDirName) # Configure the logs logger = logging.getLogger(__name__) - + # Read the metadata string - + # Prepare paths and parameters - localOut = LOCAL_OUTPUT + localOut = 'output' remoteOut = message['output_file_location'] # Start loggging now that we have a job we care about metadataID = stringify_metadata_dict(message['Metadata']) - watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=metadataID,create_log_group=False) - logger.addHandler(watchtowerlogger) - + metadata_for_log_name=metadataID.replace('*','.') + watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=metadata_for_log_name,create_log_group=False) + logger.addHandler(watchtowerlogger) + # Build and run FIJI command - cmd = '../../opt/fiji/Fiji.app/ImageJ-linux64 --ij2 --headless --console --run "../../opt/fiji/Fiji.app/plugins/'+SCRIPT_NAME+'" "' - cmd += stringify_metadata_dict(message['shared_metadata']) + ', ' + metadataID+ '"' + cmd = ["Fiji.app/ImageJ-linux64","--ij2", "--headless", "--console", "--run",os.path.join("Fiji.app/plugins",SCRIPT_NAME)] + cmd.append(stringify_metadata_dict(message['shared_metadata']) + ', ' + metadataID) print('Running', cmd) logger.info(cmd) - - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) monitorAndLog(subp,logger) - + # Get the outputs and move them to S3 - - # Figure out how many output files there were + + # Figure out how many output files there were - thanks https://stackoverflow.com/a/29769297 print('Checking output folder size') - cmd = "find "+localOut+" -type f | wc -l" - logger.info - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out,err = subp.communicate() - if int(out)>=int(EXPECTED_NUMBER_FILES): + filenum = 0 + for _, _, filenames in os.walk(localOut): + filenum += len(filenames) + print(localOut,filenum) + if filenum>=int(EXPECTED_NUMBER_FILES): mvtries=0 while mvtries <3: - try: - printandlog('Move attempt #'+str(mvtries+1),logger) - cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive' - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out,err = subp.communicate() - printandlog('== OUT \n'+out, logger) - if err == '': - break - else: - printandlog('== ERR \n'+err,logger) - mvtries+=1 - except: - printandlog('Move failed',logger) - printandlog('== ERR \n'+err,logger) - time.sleep(30) - mvtries+=1 + try: + printandlog('Move attempt #'+str(mvtries+1),logger) + cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive' + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out,err = subp.communicate() + printandlog('== OUT \n'+out, logger) + if err == '': + break + else: + printandlog('== ERR \n'+err,logger) + mvtries+=1 + except: + printandlog('Move failed',logger) + printandlog('== ERR \n'+err,logger) + time.sleep(30) + mvtries+=1 if mvtries<3: - printandlog('SUCCESS',logger) - logger.removeHandler(watchtowerlogger) - return 'SUCCESS' + printandlog('SUCCESS',logger) + logger.removeHandler(watchtowerlogger) + return 'SUCCESS' else: - printandlog('OUTPUT PROBLEM. Giving up on '+metadataID,logger) - logger.removeHandler(watchtowerlogger) - return 'OUTPUT_PROBLEM' - + printandlog('OUTPUT PROBLEM. Only '+str(filenum)+' files detected. Giving up on '+metadataID,logger) + logger.removeHandler(watchtowerlogger) + return 'OUTPUT_PROBLEM' + ################################# # MAIN WORKER LOOP @@ -179,4 +175,3 @@ def main(): print('Worker started') main() print('Worker finished') - diff --git a/worker/run-worker.sh b/worker/run-worker.sh index 8a48031..66cf880 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -31,5 +31,4 @@ wget -P /opt/fiji/Fiji.app/plugins/ $SCRIPT_DOWNLOAD_URL python instance-monitor.py & # 5. RUN FIJI WORKER -python fiji-worker.py |& tee $k.out - +python fiji-worker.py |& tee $k.out \ No newline at end of file