Skip to content

Commit 229a550

Browse files
committed
Added more metrics for kafka consumption call
1 parent 06841a8 commit 229a550

File tree

1 file changed

+37
-16
lines changed

1 file changed

+37
-16
lines changed

Diff for: notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java

+37-16
Original file line numberDiff line numberDiff line change
@@ -99,50 +99,71 @@ public void wakeup() {
9999
}
100100

101101
private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
102-
List<AtlasKafkaMessage<T>> messages = new ArrayList();
102+
long methodStart = System.currentTimeMillis();
103+
long stepStart = methodStart; // For individual steps timing
103104

105+
List<AtlasKafkaMessage<T>> messages = new ArrayList<>();
106+
LOG.info("receive() => Start of method at {} ms", methodStart);
107+
108+
// Poll for records
104109
ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null;
110+
LOG.info("receive() [After kafkaConsumer.poll] completed in {} ms", (System.currentTimeMillis() - stepStart));
111+
stepStart = System.currentTimeMillis();
105112

106113
if (records != null) {
107-
LOG.info("ObjectPropagate -> Found kafkaRecords : {}", records.count());
114+
LOG.info("receive() => Found kafkaRecords: {} in {} ms", records.count(), (System.currentTimeMillis() - stepStart));
115+
stepStart = System.currentTimeMillis();
116+
108117
for (ConsumerRecord<?, ?> record : records) {
109-
// if (LOG.isDebugEnabled()) {
110-
LOG.info("ObjectPropagate -> Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}",
111-
record.topic(), record.partition(), record.offset(), record.key(), record.value());
112-
// }
118+
LOG.info("receive() => Received Message topic={}, partition={}, offset={}, key={}, value={} in {} ms",
119+
record.topic(), record.partition(), record.offset(), record.key(), record.value(),
120+
(System.currentTimeMillis() - stepStart));
121+
stepStart = System.currentTimeMillis();
113122

114123
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
115124
if (MapUtils.isNotEmpty(lastCommittedPartitionOffset)
116125
&& lastCommittedPartitionOffset.containsKey(topicPartition)
117126
&& record.offset() < lastCommittedPartitionOffset.get(topicPartition)) {
118127

119128
commit(topicPartition, record.offset());
120-
LOG.info("ObjectPropagate -> Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}",
121-
record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
129+
LOG.info("receive() => Skipping already processed message: topic={}, partition={}, offset={}. Last processed offset={} in {} ms",
130+
record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition),
131+
(System.currentTimeMillis() - stepStart));
132+
stepStart = System.currentTimeMillis();
122133
continue;
123134
}
124135

125136
T message = null;
126-
127137
try {
128-
LOG.info("ObjectPropagate -> Message converting to kafkaMessage");
138+
LOG.info("receive() => Converting message to kafkaMessage in {} ms", (System.currentTimeMillis() - stepStart));
139+
stepStart = System.currentTimeMillis();
140+
129141
message = deserializer.deserialize(record.value().toString());
130-
LOG.info("ObjectPropagate -> Message converted to kafkaMessage : {}", message.toString());
142+
LOG.info("receive() => Message converted to kafkaMessage: {} in {} ms",
143+
message.toString(), (System.currentTimeMillis() - stepStart));
144+
stepStart = System.currentTimeMillis();
131145
} catch (OutOfMemoryError excp) {
132-
LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}",
133-
record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp);
146+
LOG.error("receive() => Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={} in {} ms",
147+
record.topic(), record.partition(), record.offset(), record.key(), record.value(),
148+
(System.currentTimeMillis() - stepStart), excp);
134149
}
135150

136151
if (message == null) {
137152
continue;
138153
}
139-
LOG.info("ObjectPropagate -> Message added to kafkaMessage batch");
154+
LOG.info("receive() => Adding message to batch in {} ms", (System.currentTimeMillis() - stepStart));
155+
stepStart = System.currentTimeMillis();
156+
140157
messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
141-
deserializer.getMsgCreated(), deserializer.getSpooled()));
158+
deserializer.getMsgCreated(), deserializer.getSpooled()));
159+
LOG.info("receive() => Batch size now: {} after {} ms", messages.size(), (System.currentTimeMillis() - stepStart));
160+
stepStart = System.currentTimeMillis();
142161
}
143162
}
144163

164+
long totalTime = System.currentTimeMillis() - methodStart;
165+
LOG.info("receive() => End of method, total execution time: {} ms", totalTime);
145166
return messages;
146-
147167
}
168+
148169
}

0 commit comments

Comments
 (0)