Skip to content

Commit d574ecf

Browse files
committed
Update DQ creation in sync with DS
1 parent 6c01326 commit d574ecf

File tree

2 files changed

+43
-23
lines changed

2 files changed

+43
-23
lines changed

config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# SQS QUEUE INFORMATION:
2929
SQS_QUEUE_NAME = APP_NAME + 'Queue'
3030
SQS_MESSAGE_VISIBILITY = 1*60 # Timeout (secs) for messages in flight (average time to be processed)
31-
SQS_DEAD_LETTER_QUEUE = 'arn:aws:sqs:some-region:111111100000:DeadMessages'
31+
SQS_DEAD_LETTER_QUEUE = 'user_DeadMessages'
3232

3333
# LOG GROUP INFORMATION:
3434
LOG_GROUP_NAME = APP_NAME

run.py

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from __future__ import print_function
21
import os, sys
32
import boto3
43
import configparser
@@ -11,6 +10,8 @@
1110
from email.mime.text import MIMEText
1211

1312
from config import *
13+
14+
1415
WAIT_TIME = 60
1516
MONITOR_TIME = 60
1617

@@ -47,22 +48,12 @@
4748
]
4849
}
4950

50-
SQS_DEFINITION = {
51-
"DelaySeconds": "0",
52-
"MaximumMessageSize": "262144",
53-
"MessageRetentionPeriod": "1209600",
54-
"ReceiveMessageWaitTimeSeconds": "0",
55-
"RedrivePolicy": "{\"deadLetterTargetArn\":\"" + SQS_DEAD_LETTER_QUEUE + "\",\"maxReceiveCount\":\"10\"}",
56-
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY)
57-
}
58-
5951

6052
#################################
6153
# AUXILIARY FUNCTIONS
6254
#################################
6355

6456
def generate_task_definition(AWS_PROFILE):
65-
taskRoleArn = False
6657
task_definition = TASK_DEFINITION.copy()
6758

6859
config = configparser.ConfigParser()
@@ -79,6 +70,7 @@ def generate_task_definition(AWS_PROFILE):
7970
print ("Using role for credentials", config[profile_name]['role_arn'])
8071
taskRoleArn = config[profile_name]['role_arn']
8172
else:
73+
taskRoleArn = False
8274
if config.has_option(profile_name, 'source_profile'):
8375
creds = configparser.ConfigParser()
8476
creds.read(f"{os.environ['HOME']}/.aws/credentials")
@@ -88,6 +80,11 @@ def generate_task_definition(AWS_PROFILE):
8880
elif config.has_option(profile_name, 'aws_access_key_id'):
8981
aws_access_key = config[profile_name]['aws_access_key_id']
9082
aws_secret_key = config[profile_name]['aws_secret_access_key']
83+
elif profile_name == 'default':
84+
creds = configparser.ConfigParser()
85+
creds.read(f"{os.environ['HOME']}/.aws/credentials")
86+
aws_access_key = creds['default']['aws_access_key_id']
87+
aws_secret_key = creds['default']['aws_secret_access_key']
9188
else:
9289
print ("Problem getting credentials")
9390
task_definition['containerDefinitions'][0]['environment'] += [
@@ -101,7 +98,7 @@ def generate_task_definition(AWS_PROFILE):
10198
}]
10299

103100
sqs = boto3.client('sqs')
104-
queue_name = get_queue_url(sqs)
101+
queue_name = get_queue_url(sqs, SQS_QUEUE_NAME)
105102
task_definition['containerDefinitions'][0]['environment'] += [
106103
{
107104
'name': 'APP_NAME',
@@ -188,18 +185,41 @@ def create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME):
188185
ecs.create_service(cluster=ECS_CLUSTER, serviceName=ECS_SERVICE_NAME, taskDefinition=ECS_TASK_NAME, desiredCount=0)
189186
print('Service created')
190187

191-
def get_queue_url(sqs):
188+
def get_queue_url(sqs, queue_name):
192189
result = sqs.list_queues()
190+
queue_url = None
193191
if 'QueueUrls' in result.keys():
194192
for u in result['QueueUrls']:
195-
if u.split('/')[-1] == SQS_QUEUE_NAME:
196-
return u
197-
return None
193+
if u.split('/')[-1] == queue_name:
194+
queue_url = u
195+
return queue_url
198196

199197
def get_or_create_queue(sqs):
200-
u = get_queue_url(sqs)
201-
if u is None:
198+
queue_url = get_queue_url(sqs, SQS_QUEUE_NAME)
199+
dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE)
200+
if dead_url is None:
201+
print("Creating DeadLetter queue")
202+
sqs.create_queue(QueueName=SQS_DEAD_LETTER_QUEUE)
203+
time.sleep(WAIT_TIME)
204+
dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE)
205+
else:
206+
print (f'DeadLetter queue {SQS_DEAD_LETTER_QUEUE} already exists.')
207+
if queue_url is None:
202208
print('Creating queue')
209+
response = sqs.get_queue_attributes(QueueUrl=dead_url, AttributeNames=["QueueArn"])
210+
dead_arn = response["Attributes"]["QueueArn"]
211+
212+
SQS_DEFINITION = {
213+
"DelaySeconds": "0",
214+
"MaximumMessageSize": "262144",
215+
"MessageRetentionPeriod": "1209600",
216+
"ReceiveMessageWaitTimeSeconds": "0",
217+
"RedrivePolicy": '{"deadLetterTargetArn":"'
218+
+ dead_arn
219+
+ '","maxReceiveCount":"10"}',
220+
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY),
221+
}
222+
203223
sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION)
204224
time.sleep(WAIT_TIME)
205225
else:
@@ -367,10 +387,10 @@ def setup():
367387
AWS_CREDENTIAL_FILE_NAME = os.environ['HOME'] + '/.aws/credentials'
368388
sqs = boto3.client('sqs')
369389
get_or_create_queue(sqs)
370-
ecs = boto3.client('ecs')
371-
get_or_create_cluster(ecs)
372-
update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE)
373-
create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME)
390+
# ecs = boto3.client('ecs')
391+
# get_or_create_cluster(ecs)
392+
# update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE)
393+
# create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME)
374394

375395
#################################
376396
# SERVICE 2: SUBMIT JOB

0 commit comments

Comments
 (0)