Skip to content

Commit bbb1f67

Browse files
committed
Added more metrics for kafka consumption call
1 parent 229a550 commit bbb1f67

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

Diff for: notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public KafkaNotification(Configuration applicationProperties) throws AtlasExcept
117117
Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
118118

119119
properties = ConfigurationConverter.getProperties(kafkaConf);
120-
pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
120+
pollTimeOutMs = 200L;
121121
consumerClosedErrorMsg = kafkaConf.getString("error.message.consumer_closed", DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE);
122122

123123
//Override default configs
@@ -156,7 +156,7 @@ protected KafkaNotification(Properties properties) {
156156
LOG.info("==> KafkaNotification()");
157157

158158
this.properties = properties;
159-
this.pollTimeOutMs = 1000L;
159+
this.pollTimeOutMs = 200L;
160160

161161
LOG.info("<== KafkaNotification()");
162162
}

Diff for: webapp/src/main/java/org/apache/atlas/notification/ObjectPropEventConsumer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ public ObjectPropConsumer(NotificationConsumer<ObjectPropEvent> consumer) {
343343

344344
@Override
345345
public void doWork() {
346+
// INC kafka batch size -> reduce kafka calls
346347
long lineStart = System.currentTimeMillis();
347348

348349
LOG.info("ObjectPropConsumer::doWork() [Line 1] ==> Entered doWork()");
@@ -387,7 +388,7 @@ public void doWork() {
387388
lineStart = System.currentTimeMillis();
388389

389390
for (AtlasKafkaMessage<ObjectPropEvent> msg : messages) {
390-
long msgStart = System.currentTimeMillis(); // track each message individually
391+
long msgStart = System.currentTimeMillis();
391392

392393
LOG.info("ObjectPropConsumer::doWork() -> Msg consumed on offset : {} with value : {}",
393394
msg.getOffset(), msg.toString());
@@ -401,7 +402,7 @@ public void doWork() {
401402
msgStart = System.currentTimeMillis();
402403

403404
if (res) {
404-
long commitOffset = msg.getOffset() + 1;
405+
long commitOffset = msg.getOffset() + 1; // Reduce this to a single batch commit
405406
consumer.commit(msg.getTopicPartition(), commitOffset);
406407
subTaskSuccess++;
407408
LOG.info("ObjectPropConsumer::doWork() [Line 7-c] => commit offset done in {} ms",
@@ -431,6 +432,7 @@ public void doWork() {
431432
LOG.info("ObjectPropConsumer::doWork() [Line 9] => incremented ASSETS_PROPAGATION_FAILED_COUNT in {} ms",
432433
(System.currentTimeMillis() - lineStart));
433434
}
435+
// move redis above than kafka
434436
lineStart = System.currentTimeMillis();
435437

436438
} catch (IllegalStateException ex) {

0 commit comments

Comments
 (0)