Skip to content

Working Distributed-HelloWorld #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Constants (User configurable)

APP_NAME = 'DistributedSomething' # Used to generate derivative names unique to the application.
APP_NAME = 'DistributedHelloWorld' # Used to generate derivative names unique to the application.

# DOCKER REGISTRY INFORMATION:
DOCKERHUB_TAG = 'user/distributed-something:sometag'
DOCKERHUB_TAG = 'distributedscience/distributed-helloworld:latest'

# AWS GENERAL SETTINGS:
AWS_REGION = 'us-east-1'
Expand All @@ -13,16 +13,16 @@

# EC2 AND ECS INFORMATION:
ECS_CLUSTER = 'default'
CLUSTER_MACHINES = 3
CLUSTER_MACHINES = 2
TASKS_PER_MACHINE = 1
MACHINE_TYPE = ['m4.xlarge']
MACHINE_TYPE = ['t2.micro']
MACHINE_PRICE = 0.10
EBS_VOL_SIZE = 30 # In GB. Minimum allowed is 22.
EBS_VOL_SIZE = 22 # In GB. Minimum allowed is 22.

# DOCKER INSTANCE RUNNING ENVIRONMENT:
DOCKER_CORES = 4 # Number of software processes to run inside a docker container
DOCKER_CORES = 1 # Number of software processes to run inside a docker container
CPU_SHARES = DOCKER_CORES * 1024 # ECS computing units assigned to each docker container (1024 units = 1 core)
MEMORY = 15000 # Memory assigned to the docker container in MB
MEMORY = 800 # Memory assigned to the docker container in MB
SECONDS_TO_START = 3*60 # Wait before the next process is initiated to avoid memory collisions

# SQS QUEUE INFORMATION:
Expand All @@ -40,3 +40,4 @@
NECESSARY_STRING = '' # Is there any string that should be in the file name to "count"?

# PUT ANYTHING SPECIFIC TO YOUR PROGRAM DOWN HERE
MY_NAME = 'FirstName LastName'
9 changes: 3 additions & 6 deletions files/exampleJob.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@
"_comment0": "All keys that contain '_comment' will be omitted from the job specification dictionary sent to SQS",
"_comment1": "The top part should be variables common to all jobs",
"_comment2": "These could be things like a master input location, run parameters, etc",
"input_location": "Somewhere/on/your/bucket",
"job_flag_1": "True",
"favorite_color": "Blue",
"_comment3": "The following groups are individual tasks, and each will be run in parallel",
"groups": [
{"job_flag_2": "A", "job_flag_3": "foo"},
{"job_flag_2": "A", "job_flag_3": "bar"},
{"job_flag_2": "B", "job_flag_3": "foo"},
{"job_flag_2": "B", "job_flag_3": "bar"}
{"ice_cream": "chocolate", "pizza": "pepperoni"},
{"ice_cream": "cookie dough", "pizza": "mushroom"}
]
}

7 changes: 6 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import print_function
import os, sys
import boto3
import configparser
import datetime
import json
import time
Expand Down Expand Up @@ -145,7 +146,11 @@ def generate_task_definition(AWS_PROFILE):
{
"name": "NECESSARY_STRING",
"value": NECESSARY_STRING
}
},
{
"name": "MY_NAME",
"value": MY_NAME
},
]
return task_definition, taskRoleArn

Expand Down
2 changes: 1 addition & 1 deletion worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#


FROM someuser/somedocker:sometag
FROM ubuntu:18.04

# Install S3FS
USER root
Expand Down
6 changes: 3 additions & 3 deletions worker/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
user = user
project = distributed-something
tag = sometag
user = distributedscience
project = distributed-helloworld
tag = latest

.DEFAULT_GOAL: build
build:
Expand Down
29 changes: 21 additions & 8 deletions worker/generic-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
DOWNLOAD_FILES = False
else:
DOWNLOAD_FILES = os.environ['DOWNLOAD_FILES']
MY_NAME = os.environ['MY_NAME']

localIn = '/home/ubuntu/local_input'

Expand Down Expand Up @@ -110,18 +111,19 @@ def runSomething(message):

# Parse your message somehow to pull out a name variable that's going to make sense to you when you want to look at the logs later
# What's commented out below will work, otherwise, create your own
#group_to_run = message["group"]
#groupkeys = list(group_to_run.keys())
#groupkeys.sort()
#metadataID = '-'.join(groupkeys)
group_to_run = message["group"]
groupkeys = list(group_to_run.keys())
groupkeys.sort()
metadataID = '-'.join(groupkeys)

# Add a handler with
# watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=str(metadataID),create_log_group=False)
# logger.addHandler(watchtowerlogger)
watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=str(metadataID),create_log_group=False)
logger.addHandler(watchtowerlogger)

# See if this is a message you've already handled, if you've so chosen
# First, build a variable called remoteOut that equals your unique prefix of where your output should be
# Then check if there are too many files
remoteOut = metadataID

if CHECK_IF_DONE_BOOL.upper() == 'TRUE':
try:
Expand All @@ -143,14 +145,25 @@ def runSomething(message):
# ie cmd = my-program --my-flag-1 True --my-flag-2 VARIABLE
# you should assign the variable "localOut" to the output location where you expect your program to put files

localOut = metadataID
local_file_name = os.path.join(localOut,'HelloWorld.txt')
if not os.path.exists(localOut):
os.makedirs(localOut,exist_ok=True)

cmd = f'printf "Hi, my name is {MY_NAME}, and my favorite {groupkeys[0]} is {group_to_run[groupkeys[0]]}, and my favorite {groupkeys[1]} is {group_to_run[groupkeys[1]]}" > {local_file_name}'

print('Running', cmd)
logger.info(cmd)
subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
#typically, changes to the subprocess command aren't needed at all
subp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
monitorAndLog(subp,logger)

# Figure out a done condition - a number of files being created, a particular file being created, an exit code, etc.

done = True

# If done, get the outputs and move them to S3
if [ENTER DONE CONDITION HERE]:
if done:
time.sleep(30)
mvtries=0
while mvtries <3:
Expand Down