Skip to content

Commit e0e56d3

Browse files
authored
[server][davinci] Implement Heartbeat Monitoring Service (#834)
* [server][davinci] Implement Heartbeat Monitoring Service This PR implements the heartbeat monitoring service. The service functions on two ends. Getting notified for partitions that should be monitored (as leader or follower), and notified when heartbeats are consumed (in the context of leader or follower consumption). Stats are then reported and updated from a monitoring thread that is seperate from ingestion tasks. The purpose of this is to enable monitoring to be a true catch all. For a hybrid store that is assigned Leader/Follower in helix, theres no acceptable situation where the server should not be consuming records for a prolonged duration. Ingestion tasks may die, get paused, or recycled. That is fine, but if Helix says we should be up, then we'll report based on that.
1 parent 4115afc commit e0e56d3

25 files changed

+782
-49
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,9 @@ public DaVinciBackend(
258258
// TODO: consider how/if a repair task would be valid for Davinci users?
259259
null,
260260
pubSubClientsFactory,
261-
Optional.empty());
261+
Optional.empty(),
262+
// TODO: It would be good to monitor heartbeats like this from davinci, but needs some work
263+
null);
262264

263265
ingestionService.start();
264266
ingestionService.addIngestionNotifier(ingestionListener);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.linkedin.davinci.notifier.PushMonitorNotifier;
1313
import com.linkedin.davinci.notifier.VeniceNotifier;
1414
import com.linkedin.davinci.stats.ParticipantStateTransitionStats;
15+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
1516
import com.linkedin.davinci.storage.StorageMetadataService;
1617
import com.linkedin.davinci.storage.StorageService;
1718
import com.linkedin.venice.common.VeniceSystemStoreType;
@@ -93,6 +94,7 @@ public class HelixParticipationService extends AbstractVeniceService
9394
private HelixPartitionStatusAccessor partitionPushStatusAccessor;
9495
private ThreadPoolExecutor leaderFollowerHelixStateTransitionThreadPool;
9596
private VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor;
97+
private final HeartbeatMonitoringService heartbeatMonitoringService;
9698

9799
// This is ONLY for testing purpose.
98100
public ThreadPoolExecutor getLeaderFollowerHelixStateTransitionThreadPool() {
@@ -111,10 +113,12 @@ public HelixParticipationService(
111113
String clusterName,
112114
int port,
113115
String hostname,
114-
CompletableFuture<SafeHelixManager> managerFuture) {
116+
CompletableFuture<SafeHelixManager> managerFuture,
117+
HeartbeatMonitoringService heartbeatMonitoringService) {
115118
this.ingestionService = storeIngestionService;
116119
this.storageService = storageService;
117120
this.clusterName = clusterName;
121+
this.heartbeatMonitoringService = heartbeatMonitoringService;
118122
// The format of instance name must be "$host_$port", otherwise Helix can not get these information correctly.
119123
this.participantName = Utils.getHelixNodeIdentifier(hostname, port);
120124
this.zkAddress = zkAddress;
@@ -194,7 +198,8 @@ public boolean startInner() {
194198
futureVersionStateTransitionStats,
195199
helixReadOnlyStoreRepository,
196200
partitionPushStatusAccessorFuture,
197-
instance.getNodeId());
201+
instance.getNodeId(),
202+
heartbeatMonitoringService);
198203
} else {
199204
leaderFollowerParticipantModelFactory = new LeaderFollowerPartitionStateModelFactory(
200205
ingestionBackend,
@@ -203,7 +208,8 @@ public boolean startInner() {
203208
stateTransitionStats,
204209
helixReadOnlyStoreRepository,
205210
partitionPushStatusAccessorFuture,
206-
instance.getNodeId());
211+
instance.getNodeId(),
212+
heartbeatMonitoringService);
207213
}
208214
LOGGER.info(
209215
"LeaderFollower threadPool info: strategy = {}, max future state transition thread = {}",

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModel.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import com.linkedin.davinci.ingestion.VeniceIngestionBackend;
55
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask;
66
import com.linkedin.davinci.stats.ParticipantStateTransitionStats;
7+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
78
import com.linkedin.venice.common.VeniceSystemStoreUtils;
89
import com.linkedin.venice.exceptions.VeniceException;
910
import com.linkedin.venice.exceptions.VeniceNoStoreException;
1011
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
1112
import com.linkedin.venice.helix.HelixState;
1213
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
14+
import com.linkedin.venice.meta.Store;
1315
import com.linkedin.venice.meta.Version;
1416
import com.linkedin.venice.utils.LatencyUtils;
1517
import java.util.concurrent.CompletableFuture;
@@ -57,6 +59,8 @@ public class LeaderFollowerPartitionStateModel extends AbstractPartitionStateMod
5759
private final LeaderFollowerIngestionProgressNotifier notifier;
5860
private final ParticipantStateTransitionStats threadPoolStats;
5961

62+
private final HeartbeatMonitoringService heartbeatMonitoringService;
63+
6064
public LeaderFollowerPartitionStateModel(
6165
VeniceIngestionBackend ingestionBackend,
6266
VeniceStoreVersionConfig storeAndServerConfigs,
@@ -65,7 +69,8 @@ public LeaderFollowerPartitionStateModel(
6569
ReadOnlyStoreRepository metadataRepo,
6670
CompletableFuture<HelixPartitionStatusAccessor> partitionPushStatusAccessorFuture,
6771
String instanceName,
68-
ParticipantStateTransitionStats threadPoolStats) {
72+
ParticipantStateTransitionStats threadPoolStats,
73+
HeartbeatMonitoringService heartbeatMonitoringService) {
6974
super(
7075
ingestionBackend,
7176
metadataRepo,
@@ -75,6 +80,7 @@ public LeaderFollowerPartitionStateModel(
7580
instanceName);
7681
this.notifier = notifier;
7782
this.threadPoolStats = threadPoolStats;
83+
this.heartbeatMonitoringService = heartbeatMonitoringService;
7884
}
7985

8086
@Transition(to = HelixState.STANDBY_STATE, from = HelixState.OFFLINE_STATE)
@@ -83,8 +89,9 @@ public void onBecomeStandbyFromOffline(Message message, NotificationContext cont
8389
String resourceName = message.getResourceName();
8490
String storeName = Version.parseStoreFromKafkaTopicName(resourceName);
8591
int version = Version.parseVersionFromKafkaTopicName(resourceName);
86-
boolean isRegularStoreCurrentVersion = getStoreRepo().getStoreOrThrow(storeName).getCurrentVersion() == version
87-
&& !VeniceSystemStoreUtils.isSystemStore(storeName);
92+
Store store = getStoreRepo().getStoreOrThrow(storeName);
93+
boolean isRegularStoreCurrentVersion =
94+
store.getCurrentVersion() == version && !VeniceSystemStoreUtils.isSystemStore(storeName);
8895

8996
/**
9097
* For regular store current version, firstly create a latch, then start ingestion and wait for ingestion
@@ -97,6 +104,7 @@ public void onBecomeStandbyFromOffline(Message message, NotificationContext cont
97104
try {
98105
long startTimeForSettingUpNewStorePartitionInNs = System.nanoTime();
99106
setupNewStorePartition();
107+
heartbeatMonitoringService.addFollowerLagMonitor(store.getVersion(version).get(), getPartition());
100108
logger.info(
101109
"Completed setting up new store partition for {} partition {}. Total elapsed time: {} ms",
102110
resourceName,
@@ -118,6 +126,11 @@ public void onBecomeStandbyFromOffline(Message message, NotificationContext cont
118126
@Transition(to = HelixState.LEADER_STATE, from = HelixState.STANDBY_STATE)
119127
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
120128
LeaderSessionIdChecker checker = new LeaderSessionIdChecker(leaderSessionId.incrementAndGet(), leaderSessionId);
129+
String resourceName = message.getResourceName();
130+
String storeName = Version.parseStoreFromKafkaTopicName(resourceName);
131+
int version = Version.parseVersionFromKafkaTopicName(resourceName);
132+
Store store = getStoreRepo().getStoreOrThrow(storeName);
133+
heartbeatMonitoringService.addLeaderLagMonitor(store.getVersion(version).get(), getPartition());
121134
executeStateTransition(
122135
message,
123136
context,
@@ -127,6 +140,11 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte
127140
@Transition(to = HelixState.STANDBY_STATE, from = HelixState.LEADER_STATE)
128141
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
129142
LeaderSessionIdChecker checker = new LeaderSessionIdChecker(leaderSessionId.incrementAndGet(), leaderSessionId);
143+
String resourceName = message.getResourceName();
144+
String storeName = Version.parseStoreFromKafkaTopicName(resourceName);
145+
int version = Version.parseVersionFromKafkaTopicName(resourceName);
146+
Store store = getStoreRepo().getStoreOrThrow(storeName);
147+
heartbeatMonitoringService.addFollowerLagMonitor(store.getVersion(version).get(), getPartition());
130148
executeStateTransition(
131149
message,
132150
context,
@@ -135,6 +153,11 @@ public void onBecomeStandbyFromLeader(Message message, NotificationContext conte
135153

136154
@Transition(to = HelixState.OFFLINE_STATE, from = HelixState.STANDBY_STATE)
137155
public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
156+
String resourceName = message.getResourceName();
157+
String storeName = Version.parseStoreFromKafkaTopicName(resourceName);
158+
int version = Version.parseVersionFromKafkaTopicName(resourceName);
159+
Store store = getStoreRepo().getStoreOrThrow(storeName);
160+
heartbeatMonitoringService.removeLagMonitor(store.getVersion(version).get(), getPartition());
138161
executeStateTransition(message, context, () -> stopConsumption(true));
139162
}
140163

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModelDualPoolFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.linkedin.davinci.config.VeniceConfigLoader;
44
import com.linkedin.davinci.ingestion.VeniceIngestionBackend;
55
import com.linkedin.davinci.stats.ParticipantStateTransitionStats;
6+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
67
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
78
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
89
import com.linkedin.venice.utils.Utils;
@@ -24,15 +25,17 @@ public LeaderFollowerPartitionStateModelDualPoolFactory(
2425
ParticipantStateTransitionStats futureVersionStateTransitionStats,
2526
ReadOnlyStoreRepository metadataRepo,
2627
CompletableFuture<HelixPartitionStatusAccessor> partitionPushStatusAccessorFuture,
27-
String instanceName) {
28+
String instanceName,
29+
HeartbeatMonitoringService heartbeatMonitoringService) {
2830
super(
2931
ingestionBackend,
3032
configService,
3133
executorService,
3234
stateTransitionStats,
3335
metadataRepo,
3436
partitionPushStatusAccessorFuture,
35-
instanceName);
37+
instanceName,
38+
heartbeatMonitoringService);
3639
this.futureVersionExecutorService = futureVersionExecutorService;
3740
this.futureVersionStateTransitionStats = futureVersionStateTransitionStats;
3841
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModelFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.linkedin.davinci.config.VeniceConfigLoader;
44
import com.linkedin.davinci.ingestion.VeniceIngestionBackend;
55
import com.linkedin.davinci.stats.ParticipantStateTransitionStats;
6+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
67
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
78
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
89
import com.linkedin.venice.utils.HelixUtils;
@@ -17,6 +18,7 @@
1718
public class LeaderFollowerPartitionStateModelFactory extends AbstractStateModelFactory {
1819
private final LeaderFollowerIngestionProgressNotifier leaderFollowerStateModelNotifier =
1920
new LeaderFollowerIngestionProgressNotifier();
21+
private final HeartbeatMonitoringService heartbeatMonitoringService;
2022

2123
public LeaderFollowerPartitionStateModelFactory(
2224
VeniceIngestionBackend ingestionBackend,
@@ -25,7 +27,8 @@ public LeaderFollowerPartitionStateModelFactory(
2527
ParticipantStateTransitionStats stateTransitionStats,
2628
ReadOnlyStoreRepository metadataRepo,
2729
CompletableFuture<HelixPartitionStatusAccessor> partitionPushStatusAccessorFuture,
28-
String instanceName) {
30+
String instanceName,
31+
HeartbeatMonitoringService heartbeatMonitoringService) {
2932
super(
3033
ingestionBackend,
3134
configService,
@@ -34,6 +37,7 @@ public LeaderFollowerPartitionStateModelFactory(
3437
metadataRepo,
3538
partitionPushStatusAccessorFuture,
3639
instanceName);
40+
this.heartbeatMonitoringService = heartbeatMonitoringService;
3741

3842
// Add a new notifier to let state model knows ingestion has caught up the lag so that it can complete the offline
3943
// to standby state transition.
@@ -52,7 +56,8 @@ public LeaderFollowerPartitionStateModel createNewStateModel(String resourceName
5256
getStoreMetadataRepo(),
5357
partitionPushStatusAccessorFuture,
5458
instanceName,
55-
getStateTransitionStats(resourceName));
59+
getStateTransitionStats(resourceName),
60+
heartbeatMonitoringService);
5661
}
5762

5863
/**

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
* The class also holds latches that can be used in SM in the cases when state transitions
2020
* need to coordinate with ingestion progress.
2121
*/
22-
public abstract class StateModelIngestionProgressNotifier implements VeniceNotifier {
22+
public class StateModelIngestionProgressNotifier implements VeniceNotifier {
2323
private final Logger logger = LogManager.getLogger(this.getClass());
2424
private final Map<String, CountDownLatch> stateModelToIngestionCompleteFlagMap = new VeniceConcurrentHashMap<>();
2525
private final Map<String, Boolean> stateModelToSuccessMap = new VeniceConcurrentHashMap<>();

clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,8 @@ private void initializeIsolatedIngestionServer() {
782782
isDaVinciClient,
783783
repairService,
784784
pubSubClientsFactory,
785-
sslFactory);
785+
sslFactory,
786+
null);
786787
storeIngestionService.start();
787788
storeIngestionService.addIngestionNotifier(new IsolatedIngestionNotifier(this));
788789
ingestionBackend = new DefaultIngestionBackend(storageMetadataService, storeIngestionService, storageService);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
3131
import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats;
3232
import com.linkedin.davinci.stats.StoreBufferServiceStats;
33+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
3334
import com.linkedin.davinci.storage.StorageEngineRepository;
3435
import com.linkedin.davinci.storage.StorageMetadataService;
3536
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
@@ -229,7 +230,8 @@ public KafkaStoreIngestionService(
229230
boolean isDaVinciClient,
230231
RemoteIngestionRepairService remoteIngestionRepairService,
231232
PubSubClientsFactory pubSubClientsFactory,
232-
Optional<SSLFactory> sslFactory) {
233+
Optional<SSLFactory> sslFactory,
234+
HeartbeatMonitoringService heartbeatMonitoringService) {
233235
this.cacheBackend = cacheBackend;
234236
this.recordTransformer = recordTransformer;
235237
this.storageMetadataService = storageMetadataService;
@@ -514,6 +516,7 @@ public void handleStoreDeleted(Store store) {
514516
.setPubSubTopicRepository(pubSubTopicRepository)
515517
.setRunnableForKillIngestionTasksForNonCurrentVersions(
516518
serverConfig.getIngestionMemoryLimit() > 0 ? () -> killConsumptionTaskForNonCurrentVersions() : null)
519+
.setHeartbeatMonitoringService(heartbeatMonitoringService)
517520
.build();
518521
}
519522

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.linkedin.davinci.ingestion.LagType;
2424
import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper;
2525
import com.linkedin.davinci.schema.merge.MergeRecordHelper;
26+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
2627
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
2728
import com.linkedin.davinci.storage.chunking.ChunkingAdapter;
2829
import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter;
@@ -155,6 +156,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
155156
private final Set<String> nativeReplicationSourceVersionTopicKafkaURLSingletonSet;
156157
private final VeniceWriterFactory veniceWriterFactory;
157158

159+
private final HeartbeatMonitoringService heartbeatMonitoringService;
160+
158161
/**
159162
* Leader must maintain producer DIV states separate from drainers, because leader is always ahead of drainer;
160163
* if leader and drainer share the same DIV validator, leader will pollute the data in shared DIV validator;
@@ -209,6 +212,7 @@ public LeaderFollowerStoreIngestionTask(
209212
cacheBackend,
210213
recordTransformer,
211214
builder.getLeaderFollowerNotifiers());
215+
this.heartbeatMonitoringService = builder.getHeartbeatMonitoringService();
212216
/**
213217
* We are going to apply fast leader failover for per user store system store since it is time sensitive, and if the
214218
* split-brain problem happens in prod, we could design a way to periodically produce snapshot to the meta system
@@ -2089,6 +2093,36 @@ private void propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
20892093
}
20902094
}
20912095

2096+
@Override
2097+
protected void recordHeartbeatReceived(
2098+
PartitionConsumptionState partitionConsumptionState,
2099+
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
2100+
String kafkaUrl) {
2101+
if (heartbeatMonitoringService == null) {
2102+
// Not enabled!
2103+
return;
2104+
}
2105+
2106+
if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
2107+
for (int subPartition: PartitionUtils
2108+
.getSubPartitions(partitionConsumptionState.getUserPartition(), amplificationFactor)) {
2109+
heartbeatMonitoringService.recordLeaderHeartbeat(
2110+
storeName,
2111+
versionNumber,
2112+
subPartition,
2113+
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
2114+
consumerRecord.getValue().producerMetadata.messageTimestamp);
2115+
}
2116+
} else {
2117+
heartbeatMonitoringService.recordFollowerHeartbeat(
2118+
storeName,
2119+
versionNumber,
2120+
partitionConsumptionState.getUserPartition(),
2121+
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
2122+
consumerRecord.getValue().producerMetadata.messageTimestamp);
2123+
}
2124+
}
2125+
20922126
/**
20932127
* The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to
20942128
* local VT if needed. It's decided based on the function output of {@link #shouldProduceToVersionTopic} and message type.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE;
88
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
99
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
10+
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
1011
import static java.util.concurrent.TimeUnit.HOURS;
1112
import static java.util.concurrent.TimeUnit.MILLISECONDS;
1213
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -2254,6 +2255,13 @@ public void processConsumerRecord(
22542255
}
22552256
}
22562257

2258+
protected void recordHeartbeatReceived(
2259+
PartitionConsumptionState partitionConsumptionState,
2260+
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
2261+
String kafkaUrl) {
2262+
// No Op
2263+
}
2264+
22572265
/**
22582266
* Retrieve current LeaderFollowerState from partition's PCS. This method is used by IsolatedIngestionServer to sync
22592267
* user-partition LeaderFollower status from child process to parent process in ingestion isolation.
@@ -2296,8 +2304,7 @@ private boolean shouldSyncOffset(
22962304
* TODO: if we know some other types of Control Messages are frequent as START_OF_SEGMENT and END_OF_SEGMENT in the future,
22972305
* we need to consider to exclude them to avoid the issue described above.
22982306
*/
2299-
if (controlMessageType != ControlMessageType.START_OF_SEGMENT
2300-
&& controlMessageType != ControlMessageType.END_OF_SEGMENT) {
2307+
if (controlMessageType != START_OF_SEGMENT && controlMessageType != ControlMessageType.END_OF_SEGMENT) {
23012308
syncOffset = true;
23022309
}
23032310
} else {
@@ -2774,6 +2781,14 @@ private int internalProcessConsumerRecord(
27742781
consumerRecord.getTopicPartition().getPartitionNumber(),
27752782
consumerRecord.getOffset(),
27762783
partitionConsumptionState);
2784+
try {
2785+
if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue()
2786+
&& Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
2787+
recordHeartbeatReceived(partitionConsumptionState, consumerRecord, kafkaUrl);
2788+
}
2789+
} catch (Exception e) {
2790+
LOGGER.error("Failed to record Record heartbeat with message: ", e);
2791+
}
27772792
} else {
27782793
sizeOfPersistedData = processKafkaDataMessage(
27792794
consumerRecord,
@@ -3787,7 +3802,7 @@ protected void recordProcessedRecordStats(
37873802
}
37883803

37893804
protected boolean isSegmentControlMsg(ControlMessageType msgType) {
3790-
return ControlMessageType.START_OF_SEGMENT.equals(msgType) || ControlMessageType.END_OF_SEGMENT.equals(msgType);
3805+
return START_OF_SEGMENT.equals(msgType) || ControlMessageType.END_OF_SEGMENT.equals(msgType);
37913806
}
37923807

37933808
/**

0 commit comments

Comments
 (0)