Skip to content

Commit 68c1efa

Browse files
author
Dushyant Bhalgami
authored
Merge pull request #39 from topcoder-platform/Issue_38
Replace kafka simple consumer with group consumer
2 parents d57984c + 5935e1b commit 68c1efa

File tree

3 files changed

+25
-13
lines changed

3 files changed

+25
-13
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The following parameters can be set in config files or in env variables:
2020
- KAFKA_CLIENT_CERT_KEY: Kafka connection private key, optional;
2121
if not provided, then SSL connection is not used, direct insecure connection is used;
2222
if provided, it can be either path to private key file or private key content
23+
- KAFKA_GROUP_ID: the Kafka group id, default value is 'submission-processor'
2324
- SUBMISSION_CREATE_TOPIC: Kafka topic related to Submission creation, default value is 'submission.notification.create'
2425
- AVSCAN_TOPIC: Kafka topic related to AV Scan, default value is 'avscan.action.scan'
2526
- ACCESS_KEY_ID: the AWS access key id

config/default.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ module.exports = {
66
LOG_LEVEL: process.env.LOG_LEVEL || 'debug',
77
MAXFILESIZE: process.env.MAXFILESIZE || 4294967296,
88

9+
// Kafka group id
10+
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'submission-processor',
11+
912
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
1013
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT,
1114
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY,

src/app.js

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ const Kafka = require('no-kafka')
99
const co = require('co')
1010
const ProcessorService = require('./services/ProcessorService')
1111
const healthcheck = require('topcoder-healthcheck-dropin')
12-
const _ = require('lodash')
1312

1413
// create consumer
15-
const options = { connectionString: config.KAFKA_URL }
14+
const options = { connectionString: config.KAFKA_URL, groupId: config.KAFKA_GROUP_ID }
1615
if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) {
1716
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }
1817
}
19-
const consumer = new Kafka.SimpleConsumer(options)
18+
const consumer = new Kafka.GroupConsumer(options)
2019

2120
// data handler
2221
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
@@ -47,14 +46,12 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
4746
return
4847
}
4948

50-
5149
if (topic === config.SUBMISSION_CREATE_TOPIC && messageJSON.payload.fileType === 'url') {
5250
logger.debug(`Ignoring message in topic ${messageJSON.topic} with file type as url`)
5351
// ignore the message
5452
return
5553
}
5654

57-
5855
return co(function * () {
5956
switch (topic) {
6057
case config.SUBMISSION_CREATE_TOPIC:
@@ -67,9 +64,14 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
6764
throw new Error(`Invalid topic: ${topic}`)
6865
}
6966
})
70-
// commit offset
71-
.then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
72-
.catch((err) => logger.error(err))
67+
// commit offset regardless of errors
68+
.then(() => {
69+
consumer.commitOffset({ topic, partition, offset: m.offset })
70+
})
71+
.catch((err) => {
72+
logger.error(err)
73+
consumer.commitOffset({ topic, partition, offset: m.offset })
74+
})
7375
})
7476

7577
// check if there is kafka connection alive
@@ -85,12 +87,18 @@ function check () {
8587
return connected
8688
}
8789

90+
const topics = [config.SUBMISSION_CREATE_TOPIC, config.AVSCAN_TOPIC]
91+
8892
consumer
89-
.init()
90-
// consume configured topic
93+
.init([{
94+
subscriptions: topics,
95+
handler: dataHandler
96+
}])
97+
// consume configured topics
9198
.then(() => {
99+
logger.info('Initialized.......')
92100
healthcheck.init([check])
93-
const topics = [config.SUBMISSION_CREATE_TOPIC, config.AVSCAN_TOPIC]
94-
_.each(topics, (tp) => consumer.subscribe(tp, { time: Kafka.LATEST_OFFSET }, dataHandler))
101+
logger.info('Adding topics successfully.......')
102+
logger.info(topics)
103+
logger.info('Kick Start.......')
95104
})
96-
.catch((err) => logger.error(err))

0 commit comments

Comments
 (0)