Skip to content

Commit

Permalink
Replace SQS(GoAWS) with Redis Queue (#502)
Browse files Browse the repository at this point in the history
* replace sqs with a redis queue

* remove sqs dependency and update dependencies

* fix unit tests
  • Loading branch information
TShapinsky authored Jun 20, 2024
1 parent 982235d commit fbff97a
Show file tree
Hide file tree
Showing 10 changed files with 2,854 additions and 1,469 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ INFLUXDB_ADMIN_USER=admin
INFLUXDB_DB=alfalfa
INFLUXDB_HOST=influxdb
INFLUXDB_HTTP_AUTH_ENABLED=true
JOB_QUEUE_URL=http://goaws:4100/queue/local-queue1
JOB_QUEUE=Alfalfa Job Queue
LOGGING=false
MONGO_DB_NAME=alfalfa
MONGO_URL=mongodb://mongo:27017
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
- name: Start dependencies
run: |
printenv
docker compose up -d mongo redis minio mc goaws
docker compose up -d mongo redis minio mc
- name: Run tests with pytest
run: |
Expand Down
4,225 changes: 2,815 additions & 1,410 deletions alfalfa_web/package-lock.json

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion alfalfa_web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"license": "ISC",
"dependencies": {
"@aws-sdk/client-s3": "^3.348.0",
"@aws-sdk/client-sqs": "^3.348.0",
"@aws-sdk/credential-providers": "^3.348.0",
"@aws-sdk/s3-presigned-post": "^3.348.0",
"@aws-sdk/s3-request-presigner": "^3.348.0",
Expand Down
60 changes: 24 additions & 36 deletions alfalfa_web/server/api.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { fromEnv } from "@aws-sdk/credential-providers";
import { createPresignedPost } from "@aws-sdk/s3-presigned-post";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
Expand All @@ -22,13 +21,10 @@ class AlfalfaAPI {
this.pub = redis.duplicate();
this.pub.connect();

this.redisJobQueue = process.env.JOB_QUEUE || "Alfalfa Job Queue";

const credentials = fromEnv();
const region = process.env.REGION || "us-east-1";
this.sqs = new SQSClient({
credentials,
endpoint: new URL(process.env.JOB_QUEUE_URL).origin,
region
});
this.s3 = new S3Client({
credentials,
endpoint: process.env.S3_URL_EXTERNAL || process.env.S3_URL,
Expand Down Expand Up @@ -201,24 +197,17 @@ class AlfalfaAPI {

const { startDatetime, endDatetime, timescale, realtime, externalClock } = data;

const messageBody = {
job: `alfalfa_worker.jobs.${sim_type === "MODELICA" ? "modelica" : "openstudio"}.StepRun`,
params: {
run_id: run.ref_id,
start_datetime: startDatetime,
end_datetime: endDatetime,
timescale: `${timescale || 5}`,
realtime: `${!!realtime}`,
external_clock: `${!!externalClock}`
}
const job = `alfalfa_worker.jobs.${sim_type === "MODELICA" ? "modelica" : "openstudio"}.StepRun`;
const params = {
run_id: run.ref_id,
start_datetime: startDatetime,
end_datetime: endDatetime,
timescale: `${timescale || 5}`,
realtime: `${!!realtime}`,
external_clock: `${!!externalClock}`
};
await this.sqs.send(
new SendMessageCommand({
MessageBody: JSON.stringify(messageBody),
QueueUrl: process.env.JOB_QUEUE_URL,
MessageGroupId: "Alfalfa"
})
);

await this.sendJobToQueue(job, params);
};

advanceRun = async (run) => {
Expand Down Expand Up @@ -309,24 +298,23 @@ class AlfalfaAPI {
createRunFromModel = async (model) => {
const runId = uuidv1();
const job = `alfalfa_worker.jobs.${model.model_name.endsWith(".fmu") ? "modelica" : "openstudio"}.CreateRun`;
const params = {
model_id: model.ref_id,
run_id: runId
};

await this.sendJobToQueue(job, params);

return { runId };
};

sendJobToQueue = async (job, params) => {
const messageBody = {
job,
params: {
model_id: model.ref_id,
run_id: runId
}
params
};

await this.sqs.send(
new SendMessageCommand({
MessageBody: JSON.stringify(messageBody),
QueueUrl: process.env.JOB_QUEUE_URL,
MessageGroupId: "Alfalfa"
})
);

return { runId };
await this.redis.lPush(this.redisJobQueue, JSON.stringify(messageBody));
};

setAlias = async (alias, run) => {
Expand Down
21 changes: 10 additions & 11 deletions alfalfa_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ class Dispatcher(DispatcherLoggerMixin):
def __init__(self, workdir: Path):
super().__init__()
connections_manager = AlafalfaConnectionsManager()
self.sqs_queue = connections_manager.sqs_queue
self.redis = connections_manager.redis
self.job_queue = os.environ['JOB_QUEUE']

self.logger.info(f"Job queue url is {self.sqs_queue}")
self.logger.info(f"Job queue key is {self.job_queue}")

self.workdir = workdir
if not Path.exists(self.workdir):
Expand All @@ -51,9 +52,8 @@ def process_message(self, message):
}
"""
try:
message_body = json.loads(message.body)
message_body = json.loads(message)
self.logger.info(f"Processing message of {message_body}")
message.delete()
job = message_body.get('job')
if job:
params = message_body.get('params', {})
Expand All @@ -68,15 +68,14 @@ def run(self):
"""
self.logger.info("Entering dispatcher run")
while True:
# WaitTimeSeconds triggers long polling that will wait for events to enter queue
# BRPOP Blocks until there is a message in the queue
# Receive Message
try:
messages = self.sqs_queue.receive_messages(MaxNumberOfMessages=1, WaitTimeSeconds=20)
if len(messages) > 0:
message = messages[0]
self.logger.info('Message Received with payload: %s' % message.body)
# Process Message
self.process_message(message)
[key, message] = self.redis.brpop(self.job_queue)
message = message.decode()
self.logger.info('Message Received with payload: %s' % message)
# Process Message
self.process_message(message)
except BaseException as e:
tb = traceback.format_exc()
self.logger.info("Exception caught in dispatcher.run: {} with {}".format(e, tb))
Expand Down
2 changes: 0 additions & 2 deletions alfalfa_worker/lib/alfalfa_connections_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ def __init__(self):
boto3 is the AWS SDK for Python for different types of services (S3, EC2, etc.)
"""
# boto3
self.sqs = boto3.resource('sqs', region_name=os.environ['REGION'], endpoint_url=os.environ['JOB_QUEUE_URL'])
self.sqs_queue = self.sqs.Queue(url=os.environ['JOB_QUEUE_URL'])
self.s3 = boto3.resource('s3', region_name=os.environ['REGION'], endpoint_url=os.environ['S3_URL'])
self.s3_bucket = self.s3.Bucket(os.environ['S3_BUCKET'])

Expand Down
4 changes: 0 additions & 4 deletions alfalfa_worker/lib/alfalfa_connections_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,3 @@ def __init__(self) -> None:

# Redis
self.redis = Redis(host=os.environ['REDIS_HOST'])

# Setup SQS
self.sqs = boto3.resource('sqs', region_name=os.environ['REGION'], endpoint_url=os.environ['JOB_QUEUE_URL'])
self.sqs_queue = self.sqs.Queue(url=os.environ['JOB_QUEUE_URL'])
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- GIT_COMMIT
- JOB_QUEUE_URL
- JOB_QUEUE
- MONGO_DB_NAME
- MONGO_URL
- NODE_ENV
Expand Down Expand Up @@ -86,7 +86,7 @@ services:
- INFLUXDB_ADMIN_USER
- INFLUXDB_DB
- INFLUXDB_HOST
- JOB_QUEUE_URL
- JOB_QUEUE
- LOGLEVEL=INFO
- MONGO_DB_NAME
- MONGO_URL
Expand Down
2 changes: 1 addition & 1 deletion tests/worker/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def env_setup(monkeypatch):
monkeypatch.setenv('S3_URL', 'http://localhost:9000')
monkeypatch.setenv('REDIS_HOST', 'localhost')
monkeypatch.setenv('S3_BUCKET', 'alfalfa')
monkeypatch.setenv('JOB_QUEUE_URL', 'http://localhost:4100/queue/local-queue1')
monkeypatch.setenv('JOB_QUEUE', 'Alfalfa Job Queue')
monkeypatch.setenv('MONGO_URL', 'mongodb://localhost:27017')
monkeypatch.setenv('MONGO_DB_NAME', 'alfalfa_test')
monkeypatch.setenv('REGION', 'us-west-1')

0 comments on commit fbff97a

Please sign in to comment.