Skip to content

Commit 8432a16

Browse files
committed
Fix flaky test
1 parent cc480ad commit 8432a16

File tree

1 file changed

+36
-22
lines changed

1 file changed

+36
-22
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,40 +1395,54 @@ private void checkAndHandleDoLMessage(
13951395
return;
13961396
}
13971397

1398-
// Check if this DoL matches the expected term and host
1398+
// Validate DoL matches expected term and host
13991399
long expectedTermId = currentDolStamp.getLeadershipTerm();
14001400
String expectedHostId = currentDolStamp.getHostId();
1401-
if (!expectedHostId.equals(consumedHostId) || consumedTermId < expectedTermId) {
1402-
// DoL message is either from a different host or an older term - ignore it
1401+
1402+
// Ignore DoL from different host
1403+
if (!expectedHostId.equals(consumedHostId)) {
1404+
LOGGER.debug(
1405+
"Replica: {} ignoring DoL from different host. Expected: {}, received: {}",
1406+
replicaId,
1407+
expectedHostId,
1408+
consumedHostId);
14031409
return;
14041410
}
14051411

1406-
if (consumedTermId == expectedTermId) {
1407-
// DoL (Declaration of Leadership) loopback complete - ready to switch to leader source topic
1408-
currentDolStamp.setDolConsumed(true);
1409-
long loopbackLatencyMs = currentDolStamp.getLatencyMs();
1410-
LOGGER.info(
1411-
"Replica {}: DoL loopback complete - successfully produced to and consumed back from local VT, "
1412-
+ "confirming replica is fully caught up and ready to switch to leader source topic. "
1413-
+ "Loopback latency: {} ms. [term={}, host={}, timestamp={}]. DolStamp={}",
1412+
// Ignore stale DoL from older term
1413+
if (consumedTermId < expectedTermId) {
1414+
LOGGER.debug(
1415+
"Replica: {} ignoring stale DoL from older term. Expected: {}, received: {}",
14141416
replicaId,
1415-
loopbackLatencyMs,
1416-
consumedTermId,
1417-
consumedHostId,
1418-
messageTimestamp,
1419-
currentDolStamp);
1420-
} else {
1421-
// Received a DoL stamp that doesn't match our expected state
1417+
expectedTermId,
1418+
consumedTermId);
1419+
return;
1420+
}
1421+
1422+
// Handle DoL from future term - indicates race or concurrent leadership change
1423+
if (consumedTermId > expectedTermId) {
14221424
LOGGER.warn(
1423-
"Replica: {} consumed DoL stamp with mismatched metadata. Expected: [term={}, host={}], Received: [term={}, host={}]. "
1424-
+ "This may indicate a stale message or concurrent leadership changes. Current DolStamp: {}",
1425+
"Replica: {} consumed DoL from future term. Expected: {}, received: {}. DolStamp: {}",
14251426
replicaId,
14261427
expectedTermId,
1427-
expectedHostId,
14281428
consumedTermId,
1429-
consumedHostId,
14301429
currentDolStamp);
1430+
return;
14311431
}
1432+
1433+
// DoL loopback complete - term and host match
1434+
currentDolStamp.setDolConsumed(true);
1435+
long loopbackLatencyMs = currentDolStamp.getLatencyMs();
1436+
LOGGER.info(
1437+
"Replica {}: DoL loopback complete - successfully produced to and consumed back from local VT, "
1438+
+ "confirming replica is fully caught up and ready to switch to leader source topic. "
1439+
+ "Loopback latency: {} ms. [term={}, host={}, timestamp={}]. DolStamp={}",
1440+
replicaId,
1441+
loopbackLatencyMs,
1442+
consumedTermId,
1443+
consumedHostId,
1444+
messageTimestamp,
1445+
currentDolStamp);
14321446
}
14331447

14341448
protected boolean shouldUseDolMechanism() {

0 commit comments

Comments
 (0)