From f78f1f0c40c0363dd2582c036dd3cfa45b4d9ed7 Mon Sep 17 00:00:00 2001 From: bethac07 Date: Fri, 20 Jan 2023 18:12:32 -0500 Subject: [PATCH] Working Distributed-HelloWorld --- config.py | 15 ++++++++------- files/exampleJob.json | 9 +++------ run.py | 7 ++++++- worker/Dockerfile | 2 +- worker/Makefile | 6 +++--- worker/generic-worker.py | 29 +++++++++++++++++++++-------- 6 files changed, 42 insertions(+), 26 deletions(-) diff --git a/config.py b/config.py index 7d49abc..2bb0e64 100644 --- a/config.py +++ b/config.py @@ -1,9 +1,9 @@ # Constants (User configurable) -APP_NAME = 'DistributedSomething' # Used to generate derivative names unique to the application. +APP_NAME = 'DistributedHelloWorld' # Used to generate derivative names unique to the application. # DOCKER REGISTRY INFORMATION: -DOCKERHUB_TAG = 'user/distributed-something:sometag' +DOCKERHUB_TAG = 'distributedscience/distributed-helloworld:latest' # AWS GENERAL SETTINGS: AWS_REGION = 'us-east-1' @@ -13,16 +13,16 @@ # EC2 AND ECS INFORMATION: ECS_CLUSTER = 'default' -CLUSTER_MACHINES = 3 +CLUSTER_MACHINES = 2 TASKS_PER_MACHINE = 1 -MACHINE_TYPE = ['m4.xlarge'] +MACHINE_TYPE = ['t2.micro'] MACHINE_PRICE = 0.10 -EBS_VOL_SIZE = 30 # In GB. Minimum allowed is 22. +EBS_VOL_SIZE = 22 # In GB. Minimum allowed is 22. # DOCKER INSTANCE RUNNING ENVIRONMENT: -DOCKER_CORES = 4 # Number of software processes to run inside a docker container +DOCKER_CORES = 1 # Number of software processes to run inside a docker container CPU_SHARES = DOCKER_CORES * 1024 # ECS computing units assigned to each docker container (1024 units = 1 core) -MEMORY = 15000 # Memory assigned to the docker container in MB +MEMORY = 800 # Memory assigned to the docker container in MB SECONDS_TO_START = 3*60 # Wait before the next process is initiated to avoid memory collisions # SQS QUEUE INFORMATION: @@ -40,3 +40,4 @@ NECESSARY_STRING = '' # Is there any string that should be in the file name to "count"? # PUT ANYTHING SPECIFIC TO YOUR PROGRAM DOWN HERE +MY_NAME = 'FirstName LastName' \ No newline at end of file diff --git a/files/exampleJob.json b/files/exampleJob.json index 25552e8..fc92e6b 100644 --- a/files/exampleJob.json +++ b/files/exampleJob.json @@ -2,14 +2,11 @@ "_comment0": "All keys that contain '_comment' will be omitted from the job specification dictionary sent to SQS", "_comment1": "The top part should be variables common to all jobs", "_comment2": "These could be things like a master input location, run parameters, etc", - "input_location": "Somewhere/on/your/bucket", - "job_flag_1": "True", + "favorite_color": "Blue", "_comment3": "The following groups are individual tasks, and each will be run in parallel", "groups": [ - {"job_flag_2": "A", "job_flag_3": "foo"}, - {"job_flag_2": "A", "job_flag_3": "bar"}, - {"job_flag_2": "B", "job_flag_3": "foo"}, - {"job_flag_2": "B", "job_flag_3": "bar"} + {"ice_cream": "chocolate", "pizza": "pepperoni"}, + {"ice_cream": "cookie dough", "pizza": "mushroom"} ] } diff --git a/run.py b/run.py index 35c2334..77148fa 100644 --- a/run.py +++ b/run.py @@ -1,6 +1,7 @@ from __future__ import print_function import os, sys import boto3 +import configparser import datetime import json import time @@ -145,7 +146,11 @@ def generate_task_definition(AWS_PROFILE): { "name": "NECESSARY_STRING", "value": NECESSARY_STRING - } + }, + { + "name": "MY_NAME", + "value": MY_NAME + }, ] return task_definition, taskRoleArn diff --git a/worker/Dockerfile b/worker/Dockerfile index 80708f4..c372f80 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -6,7 +6,7 @@ # -FROM someuser/somedocker:sometag +FROM ubuntu:18.04 # Install S3FS USER root diff --git a/worker/Makefile b/worker/Makefile index 69a251b..45ee625 100644 --- a/worker/Makefile +++ b/worker/Makefile @@ -1,6 +1,6 @@ -user = user -project = distributed-something -tag = sometag +user = distributedscience +project = distributed-helloworld +tag = latest .DEFAULT_GOAL: build build: diff --git a/worker/generic-worker.py b/worker/generic-worker.py index 1f00228..54b4dff 100644 --- a/worker/generic-worker.py +++ b/worker/generic-worker.py @@ -44,6 +44,7 @@ DOWNLOAD_FILES = False else: DOWNLOAD_FILES = os.environ['DOWNLOAD_FILES'] +MY_NAME = os.environ['MY_NAME'] localIn = '/home/ubuntu/local_input' @@ -110,18 +111,19 @@ def runSomething(message): # Parse your message somehow to pull out a name variable that's going to make sense to you when you want to look at the logs later # What's commented out below will work, otherwise, create your own - #group_to_run = message["group"] - #groupkeys = list(group_to_run.keys()) - #groupkeys.sort() - #metadataID = '-'.join(groupkeys) + group_to_run = message["group"] + groupkeys = list(group_to_run.keys()) + groupkeys.sort() + metadataID = '-'.join(groupkeys) # Add a handler with - # watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=str(metadataID),create_log_group=False) - # logger.addHandler(watchtowerlogger) + watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=str(metadataID),create_log_group=False) + logger.addHandler(watchtowerlogger) # See if this is a message you've already handled, if you've so chosen # First, build a variable called remoteOut that equals your unique prefix of where your output should be # Then check if there are too many files + remoteOut = metadataID if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: @@ -143,14 +145,25 @@ def runSomething(message): # ie cmd = my-program --my-flag-1 True --my-flag-2 VARIABLE # you should assign the variable "localOut" to the output location where you expect your program to put files + localOut = metadataID + local_file_name = os.path.join(localOut,'HelloWorld.txt') + if not os.path.exists(localOut): + os.makedirs(localOut,exist_ok=True) + + cmd = f'printf "Hi, my name is {MY_NAME}, and my favorite {groupkeys[0]} is {group_to_run[groupkeys[0]]}, and my favorite {groupkeys[1]} is {group_to_run[groupkeys[1]]}" > {local_file_name}' + print('Running', cmd) logger.info(cmd) - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + #typically, changes to the subprocess command aren't needed at all + subp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) monitorAndLog(subp,logger) # Figure out a done condition - a number of files being created, a particular file being created, an exit code, etc. + + done = True + # If done, get the outputs and move them to S3 - if [ENTER DONE CONDITION HERE]: + if done: time.sleep(30) mvtries=0 while mvtries <3: