Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi service deployment to avoid silent failures of kafka consumers #75

Merged
merged 5 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ workflows:
- "build-dev":
filters:
branches:
only: [dev, architectt1-feature/emailTemplatesClean]
only: [dev]
- "build-prod":
filters:
branches:
Expand Down
16 changes: 8 additions & 8 deletions connect/connectNotificationServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,13 @@ if (config.ENABLE_EMAILS) {
}

// init database, it will clear and re-create all tables
notificationServer
.initDatabase()
.then(() => notificationServer.start())
.catch((e) => {
console.log(e); // eslint-disable-line no-console
notificationServer.logger.error('Notification server errored out');
});
// notificationServer
// .initDatabase()
// .then(() => notificationServer.startKafkaConsumers())
// .catch((e) => {
// console.log(e); // eslint-disable-line no-console
// notificationServer.logger.error('Notification server errored out');
// });

// if no need to init database, then directly start the server:
// notificationServer.start();
notificationServer.startKafkaConsumers();
32 changes: 20 additions & 12 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ AWS_SECRET_ACCESS_KEY=$(eval "echo \$${ENV}_AWS_SECRET_ACCESS_KEY")
AWS_ACCOUNT_ID=$(eval "echo \$${ENV}_AWS_ACCOUNT_ID")
AWS_REPOSITORY=$(eval "echo \$${ENV}_AWS_REPOSITORY")
AWS_ECS_CLUSTER=$(eval "echo \$${ENV}_AWS_ECS_CLUSTER")
AWS_ECS_SERVICE=$(eval "echo \$${ENV}_AWS_ECS_SERVICE")
AWS_ECS_SERVICE_API=$(eval "echo \$${ENV}_AWS_ECS_SERVICE")
AWS_ECS_SERVICE_CONSUMERS=$(eval "echo \$${ENV}_AWS_ECS_SERVICE_CONSUMERS")

KAFKA_CLIENT_CERT=$(eval "echo \$${ENV}_KAFKA_CLIENT_CERT")
KAFKA_CLIENT_CERT_KEY=$(eval "echo \$${ENV}_KAFKA_CLIENT_CERT_KEY")
Expand Down Expand Up @@ -98,9 +99,9 @@ deploy_cluster() {

#family="nginx-api-dev-task"

make_task_def
register_definition
update_result=$(aws ecs update-service --cluster $AWS_ECS_CLUSTER --service $AWS_ECS_SERVICE --task-definition $revision )
make_task_def $1 $2 $3 $4
register_definition $1
update_result=$(aws ecs update-service --cluster $AWS_ECS_CLUSTER --service $1 --task-definition $revision )
#echo $update_result
result=$(echo $update_result | $JQ '.service.taskDefinition' )
echo $result
Expand All @@ -119,8 +120,9 @@ make_task_def(){
"name": "%s",
"image": "%s.dkr.ecr.%s.amazonaws.com/%s:%s",
"essential": true,
"memory": 1536,
"cpu": 768,
"memory": 768,
"cpu": 512,
"entryPoint": ["%s", "%s", "%s"],
"environment": [
{
"name": "ENV",
Expand Down Expand Up @@ -261,11 +263,11 @@ make_task_def(){
}
]'

task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $KAFKA_GROUP_ID $KAFKA_URL $DATABASE_URL $AUTHSECRET $TC_API_BASE_URL $TC_API_V3_BASE_URL $TC_API_V4_BASE_URL $TC_API_V5_BASE_URL $MESSAGE_API_BASE_URL $CONNECT_URL $ENABLE_EMAILS $MENTION_EMAIL $REPLY_EMAIL_PREFIX $REPLY_EMAIL_DOMAIN $REPLY_EMAIL_FROM $DEFAULT_REPLY_EMAIL $ENABLE_DEV_MODE $DEV_MODE_EMAIL $LOG_LEVEL $VALID_ISSUERS $PORT "$API_CONTEXT_PATH" "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV)
task_def=$(printf "$task_template" $1 $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $2 $3 $4 $ENV "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $KAFKA_GROUP_ID $KAFKA_URL $DATABASE_URL $AUTHSECRET $TC_API_BASE_URL $TC_API_V3_BASE_URL $TC_API_V4_BASE_URL $TC_API_V5_BASE_URL $MESSAGE_API_BASE_URL $CONNECT_URL $ENABLE_EMAILS $MENTION_EMAIL $REPLY_EMAIL_PREFIX $REPLY_EMAIL_DOMAIN $REPLY_EMAIL_FROM $DEFAULT_REPLY_EMAIL $ENABLE_DEV_MODE $DEV_MODE_EMAIL $LOG_LEVEL $VALID_ISSUERS $PORT "$API_CONTEXT_PATH" "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV)
}

register_definition() {
if revision=$(aws ecs register-task-definition --container-definitions "$task_def" --family $family | $JQ '.taskDefinition.taskDefinitionArn'); then
if revision=$(aws ecs register-task-definition --container-definitions "$task_def" --family $1 2> /dev/null | $JQ '.taskDefinition.taskDefinitionArn'); then
echo "Revision: $revision"
else
echo "Failed to register task definition"
Expand All @@ -277,13 +279,13 @@ register_definition() {
check_service_status() {
counter=0
sleep 60
servicestatus=`aws ecs describe-services --service $AWS_ECS_SERVICE --cluster $AWS_ECS_CLUSTER | $JQ '.services[].events[0].message'`
servicestatus=`aws ecs describe-services --service $1 --cluster $AWS_ECS_CLUSTER | $JQ '.services[].events[0].message'`
while [[ $servicestatus != *"steady state"* ]]
do
echo "Current event message : $servicestatus"
echo "Waiting for 30 seconds to check the service status...."
sleep 30
servicestatus=`aws ecs describe-services --service $AWS_ECS_SERVICE --cluster $AWS_ECS_CLUSTER | $JQ '.services[].events[0].message'`
servicestatus=`aws ecs describe-services --service $1 --cluster $AWS_ECS_CLUSTER | $JQ '.services[].events[0].message'`
counter=`expr $counter + 1`
if [[ $counter -gt $COUNTER_LIMIT ]] ; then
echo "Service does not reach steady state within 10 minutes. Please check"
Expand All @@ -295,5 +297,11 @@ check_service_status() {

configure_aws_cli
push_ecr_image
deploy_cluster
check_service_status

deploy_cluster $AWS_ECS_SERVICE_API "npm" "run" "startAPI"

deploy_cluster $AWS_ECS_SERVICE_CONSUMERS "npm" "run" "start"

check_service_status $AWS_ECS_SERVICE_API

check_service_status $AWS_ECS_SERVICE_CONSUMERS
12 changes: 12 additions & 0 deletions index-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const notificationServer = require('./index');

// // init database, it will clear and re-create all tables
// notificationServer
// .initDatabase()
// .then(() => notificationServer.startAPI())
// .catch((e) => {
// console.log(e); // eslint-disable-line no-console
// notificationServer.logger.error('Notification API server errored out');
// });

module.exports = notificationServer.startAPI();
18 changes: 14 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,24 @@ function getAllHandlers() {
}

/**
* Start the notification server.
* Start the notification API server.
*/
function start() {
function startAPI() {
// load app only after config is set
const app = require('./src/app');
app.start();
}

/**
* Start the event bus consumer.
*/
function startKafkaConsumers() {
if (_.isEmpty(handlers)) {
throw new errors.ValidationError('Missing handler(s).');
}
// load app only after config is set
const app = require('./src/app');
app.start(handlers, notificationServiceHandlers);
app.startKafkaConsumer(handlers, notificationServiceHandlers);
}

/**
Expand All @@ -122,7 +131,8 @@ module.exports = {
addTopicHandler,
removeTopicHandler,
getAllHandlers,
start,
startAPI,
startKafkaConsumers,
initDatabase,
addNotificationServiceHandler,

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "./index.js",
"scripts": {
"start": "node connect/connectNotificationServer",
"startAPI": "node index-api",
"lint": "eslint *.js src config test connect || true",
"lint:fix": "eslint *.js --fix src config test connect || true",
"postinstall": "npm run build",
Expand Down
31 changes: 16 additions & 15 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const models = require('./models');
const Kafka = require('no-kafka');

/**
* Start Kafka consumer.
* Start Kafka consumer for event bus events.
* @param {Object} handlers the handlers
* @param {Array} notificationServiceHandlers list of notification service handlers
*/
Expand Down Expand Up @@ -74,6 +74,7 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
});
});


consumer
.init()
.then(() => _.each(_.keys(handlers),
Expand All @@ -85,11 +86,9 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
}

/**
* Start the notification server.
* @param {Object} handlers the handlers
* @param {Array} notificationServiceHandlers list of notification service handlers
* Start the notifications API server.
*/
function start(handlers, notificationServiceHandlers) {
function start() {
const app = express();
app.set('port', config.PORT);

Expand Down Expand Up @@ -157,19 +156,21 @@ function start(handlers, notificationServiceHandlers) {
}
});

models
.init()
.then(() => {
app.listen(app.get('port'), () => {
logger.info(`Express server listening on port ${app.get('port')}`);
});

startKafkaConsumer(handlers, notificationServiceHandlers);
})
.catch((err) => logger.error(err));
// models
// .init()
// .then(() => {
// app.listen(app.get('port'), () => {
// logger.info(`Express server listening on port ${app.get('port')}`);
// });
// })
// .catch((err) => logger.error(err));
app.listen(app.get('port'), () => {
logger.info(`Express server listening on port ${app.get('port')}`);
});
}

// Exports
module.exports = {
start,
startKafkaConsumer,
};