Skip to content

Commit ff63c20

Browse files
authored
[test] [server] Fix Flaky Test testProcessConsumerActionsError() (#1527)
Fix the flaky test `testProcessConsumerActionsError()` by splitting the error handling off into a dedicated method and unit testing that directly. 💥
1 parent b829352 commit ff63c20

File tree

2 files changed

+47
-47
lines changed

2 files changed

+47
-47
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1960,42 +1960,46 @@ void processConsumerActions(Store store) throws InterruptedException {
19601960
action.getFuture().completeExceptionally(e);
19611961
throw e;
19621962
} catch (Throwable e) {
1963-
if (action.getAttemptsCount() <= MAX_CONSUMER_ACTION_ATTEMPTS) {
1964-
LOGGER.warn("Failed to process consumer action {}, will retry later.", action, e);
1963+
if (!handleConsumerActionsError(e, action, actionProcessStartTimeInMs)) {
19651964
return;
19661965
}
1967-
LOGGER.error(
1968-
"Failed to execute consumer action {} after {} attempts. Total elapsed time: {}ms",
1969-
action,
1970-
action.getAttemptsCount(),
1971-
LatencyUtils.getElapsedTimeFromMsToMs(actionProcessStartTimeInMs),
1972-
e);
1973-
// Mark action as failed since it has exhausted all the retries.
1974-
action.getFuture().completeExceptionally(e);
1975-
// After MAX_CONSUMER_ACTION_ATTEMPTS retries we should give up and error the ingestion task.
1976-
PartitionConsumptionState state = partitionConsumptionStateMap.get(action.getPartition());
1977-
1978-
// Remove the action that is failed to execute recently (not necessarily the head of consumerActionsQueue).
1979-
if (consumerActionsQueue.remove(action)) {
1980-
partitionToPendingConsumerActionCountMap.get(action.getPartition()).decrementAndGet();
1981-
}
1982-
/**
1983-
* {@link state} can be null if the {@link OffsetRecord} from {@link storageMetadataService} was corrupted in
1984-
* {@link #processCommonConsumerAction}, so the {@link PartitionConsumptionState} was never created
1985-
*/
1986-
if (state == null || !state.isCompletionReported()) {
1987-
reportError(
1988-
"Error when processing consumer action: " + action,
1989-
action.getPartition(),
1990-
new VeniceException(e));
1991-
}
19921966
}
19931967
}
19941968
if (emitMetrics.get()) {
19951969
hostLevelIngestionStats.recordProcessConsumerActionLatency(Duration.between(startTime, Instant.now()).toMillis());
19961970
}
19971971
}
19981972

1973+
boolean handleConsumerActionsError(Throwable e, ConsumerAction action, long actionProcessStartTimeInMs) {
1974+
if (action.getAttemptsCount() <= MAX_CONSUMER_ACTION_ATTEMPTS) {
1975+
LOGGER.warn("Failed to process consumer action {}, will retry later.", action, e);
1976+
return false;
1977+
}
1978+
LOGGER.error(
1979+
"Failed to execute consumer action {} after {} attempts. Total elapsed time: {}ms",
1980+
action,
1981+
action.getAttemptsCount(),
1982+
LatencyUtils.getElapsedTimeFromMsToMs(actionProcessStartTimeInMs),
1983+
e);
1984+
// Mark action as failed since it has exhausted all the retries.
1985+
action.getFuture().completeExceptionally(e);
1986+
// After MAX_CONSUMER_ACTION_ATTEMPTS retries we should give up and error the ingestion task.
1987+
PartitionConsumptionState state = partitionConsumptionStateMap.get(action.getPartition());
1988+
1989+
// Remove the action that is failed to execute recently (not necessarily the head of consumerActionsQueue).
1990+
if (consumerActionsQueue.remove(action)) {
1991+
partitionToPendingConsumerActionCountMap.get(action.getPartition()).decrementAndGet();
1992+
}
1993+
/**
1994+
* {@link state} can be null if the {@link OffsetRecord} from {@link storageMetadataService} was corrupted in
1995+
* {@link #processCommonConsumerAction}, so the {@link PartitionConsumptionState} was never created
1996+
*/
1997+
if (state == null || !state.isCompletionReported()) {
1998+
reportError("Error when processing consumer action: " + action, action.getPartition(), new VeniceException(e));
1999+
}
2000+
return true;
2001+
}
2002+
19992003
/**
20002004
* Applies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from
20012005
* a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5321,31 +5321,27 @@ public void testMeasureLagWithCallToPubSub() {
53215321
/**
53225322
* When SIT encounters a corrupted {@link OffsetRecord} in {@link StoreIngestionTask#processCommonConsumerAction} and
53235323
* {@link StorageMetadataService#getLastOffset} throws an exception due to a deserialization error,
5324-
* {@link StoreIngestionTask#reportError(String, int, Exception)} should be called in order to trigger a Helix
5325-
* state transition without waiting 24+ hours for the Helix state transition timeout.
5324+
* {@link StoreIngestionTask#reportError(String, int, Exception)} from
5325+
* {@link StoreIngestionTask#handleConsumerActionsError(Throwable, ConsumerAction, long)} should be called in
5326+
* order to trigger a Helix state transition without waiting 24+ hours for the Helix state transition timeout.
53265327
*/
53275328
@Test
5328-
public void testProcessConsumerActionsError() throws Exception {
5329-
runTest(Collections.singleton(PARTITION_FOO), () -> {
5330-
storeIngestionTaskUnderTest.close(); // prevent the SIT polling thread run from interfering with the
5331-
// processConsumerActions()
5332-
5329+
public void testHandleConsumerActionsError() throws Exception {
5330+
final int p = 0; // partition number
5331+
runTest(Collections.singleton(p), () -> {
53335332
// This is an actual exception thrown when deserializing a corrupted OffsetRecord
53345333
String msg = "Received Magic Byte '6' which is not supported by InternalAvroSpecificSerializer. "
53355334
+ "The only supported Magic Byte for this implementation is '24'.";
5336-
when(mockStorageMetadataService.getLastOffset(any(), anyInt())).thenThrow(new VeniceMessageException(msg));
5337-
// To reach reportError(), bypass the conditional: action.getAttemptsCount() <= MAX_CONSUMER_ACTION_ATTEMPTS
5338-
for (int i = 0; i < StoreIngestionTask.MAX_CONSUMER_ACTION_ATTEMPTS + 1; i++) {
5339-
try {
5340-
storeIngestionTaskUnderTest.processConsumerActions(storeAndVersionConfigsUnderTest.store);
5341-
} catch (InterruptedException e) {
5342-
throw new RuntimeException(e);
5343-
}
5344-
}
5345-
ArgumentCaptor<VeniceException> captor = ArgumentCaptor.forClass(VeniceException.class);
5346-
verify(storeIngestionTaskUnderTest, timeout(TEST_TIMEOUT_MS).atLeast(1))
5347-
.reportError(anyString(), eq(PARTITION_FOO), captor.capture());
5348-
assertTrue(captor.getValue().getMessage().endsWith(msg));
5335+
Throwable e = new VeniceException(msg);
5336+
CompletableFuture<Void> future = new CompletableFuture<>();// CompletableFuture(null);
5337+
ConsumerAction action = mock(ConsumerAction.class);
5338+
when(action.getPartition()).thenReturn(p);
5339+
when(action.getFuture()).thenReturn(future);
5340+
when(action.getAttemptsCount()).thenReturn(StoreIngestionTask.MAX_CONSUMER_ACTION_ATTEMPTS + 1);
5341+
assertFalse(future.isCompletedExceptionally());
5342+
storeIngestionTaskUnderTest.handleConsumerActionsError(e, action, 0L);
5343+
verify(storeIngestionTaskUnderTest, times(1)).reportError(anyString(), eq(p), any(VeniceException.class));
5344+
assertTrue(future.isCompletedExceptionally());
53495345
}, AA_OFF);
53505346
}
53515347

0 commit comments

Comments
 (0)