Skip to content

Commit

Permalink
[server][dvc] Lag measurement safeguard and logging improvements (#802)
Browse files Browse the repository at this point in the history
[server][dvc] Lag measurement safeguard and logging improvements

It seems that sometimes the offset lag used in lag measurement can
be negative, which results in the lag being considered acceptable
and the replica becoming ready to serve (prematurely).

It seems that negative lag is a somewhat normal phenomenon since
the formula is basically max_offset - consumed_offset, and the
former is cached, so if stale it could be smaller than the latter
(which of course doesn't make sense, but is an artifact of the
caching). This is a benign case.

However, there is also a case where the max_offset retrieval can
fail, in which case a negative error code would be returned. This
also leads to negative lag, but it is a mistake.

This commit adds safeguards to prevent prematurely declaring
ready-to-serve. There are a bunch of code paths interacting with
offset retrieval, which makes this commit a bit big. In order to
improve maintainability, the tricky bits are centralized as much
as possible into a new SIT::measureLagWithCallToPubSub function.

Miscellaneous:

- Added a get_partition_latest_offset_with_retry_error metric.

- Deleted the SLOPPY_OFFSET_CATCHUP_THRESHOLD in SIT, which is
  not necessary and is, well, sloppy.

- Introduced a new EmptyPubSubMessageHeaders which is immutable
  (the previous EMPTY_MSG_HEADERS in VeniceWriter was mutable).

- Fixed a NPE in AbstractVeniceStatsReporter.

- Fixed a concurrency issue about closing the non-threadsafe
  Kafka consumer in PartitionOffsetFetcher.

- Fixed the sensor name of KafkaConsumerServiceStats's offset 
  present/absent metrics. Added a test for this as well.

- Tweaked the API in AbstractVeniceStats so that it is less
  error-prone. The registerSensor function with sensorFullName
  parameter is now private, to avoid misuse.

- Rewrote the TopicPartitionsOffsetsTracker so that it is a bit 
  more efficient, not stale, and uses less code.

- Deleted two functions from LFSIT which were used only to 
  specify default values to some parameters in a deeper 
  function. Kept only measureRTOffsetLagForSingleRegion, which 
  is the first function of the call chain.

There are also various log tweaks to make logs less voluminous:

- Deleted two log lines from SIT::getDefaultReadyToServeChecker
  - "Reopen partition {}_{} for reading after ready-to-serve."
  - "Partition {} synced offset"

- Passed shouldLogLag false in SIT::updateOffsetLagInMetadata.

- TopicPartitionsOffsetsTracker doesn't periodically log a 
  bunch of lines anymore.

Test changes:

- Overhauled DataProviders in StoreIngestionTaskTest. Rather
  than using a bunch of booleans which are hard reason about
  in test logs, we now use enums, which makes it easy to read
  which test permutations passed or failed. In one case, this 
  allows squashing 2 booleans (of which 1 of 4 permutations is
  invalid) into a single 3 items enum, thus reducing the test 
  permutations count.

- Fixed testOffsetSyncBeforeGracefulShutDown flakiness in 
  StoreIngestionTaskTest.

- SITTest::testGetAndUpdateLeaderCompletedState which was 
  mutating the static instance of VW.EMPTY_MSG_HEADERS, thus 
  contaminating subsequent tests (when running in the same 
  JVM, as is the case in IntelliJ, but not in Gradle).

- Added some unit tests for parent stats.
  • Loading branch information
FelixGV authored Jan 10, 2024
1 parent 31a57f7 commit 459d9e0
Show file tree
Hide file tree
Showing 25 changed files with 945 additions and 642 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.davinci.ingestion;

public enum LagType {
OFFSET_LAG("Offset"), TIME_LAG("Time");

private final String prettyString;

LagType(String prettyString) {
this.prettyString = prettyString;
}

public String prettyString() {
return this.prettyString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.ingestion.LagType;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.davinci.replication.merge.MergeConflictResolver;
import com.linkedin.davinci.replication.merge.MergeConflictResolverFactory;
Expand Down Expand Up @@ -1307,9 +1308,15 @@ protected boolean checkAndLogIfLagIsAcceptableForHybridStore(
long offsetLag,
long offsetThreshold,
boolean shouldLogLag,
boolean isOffsetBasedLag,
LagType lagType,
long latestConsumedProducerTimestamp) {
boolean isLagAcceptable = offsetLag <= offsetThreshold;
boolean isLagAcceptable = super.checkAndLogIfLagIsAcceptableForHybridStore(
pcs,
offsetLag,
offsetThreshold,
false, // We'll take care of logging at the end of this function
lagType,
latestConsumedProducerTimestamp);

if (isLagAcceptable && isHybridFollower(pcs)) {
if (!getServerConfig().isLeaderCompleteStateCheckInFollowerEnabled()) {
Expand All @@ -1331,20 +1338,17 @@ protected boolean checkAndLogIfLagIsAcceptableForHybridStore(
if (shouldLogLag) {
String lagLogFooter;
if (isHybridFollower(pcs)) {
lagLogFooter = ". Leader Complete State: {" + pcs.getLeaderCompleteState().toString()
+ "}, Last update In Ms: {" + pcs.getLastLeaderCompleteStateUpdateInMs() + "}.";
lagLogFooter = "Leader Complete State: {" + pcs.getLeaderCompleteState().toString() + "}, Last update In Ms: {"
+ pcs.getLastLeaderCompleteStateUpdateInMs() + "}.";
} else {
lagLogFooter = "";
}
LOGGER.info(
"{} [{} lag] partition {} is {}lagging. {}Lag: [{}] {} Threshold [{}]{}",
isOffsetBasedLag ? "Offset" : "Time",
consumerTaskId,
logLag(
lagType,
pcs.getPartition(),
(isLagAcceptable ? "not " : ""),
(isOffsetBasedLag ? "" : "The latest producer timestamp is " + latestConsumedProducerTimestamp + ". "),
isLagAcceptable,
latestConsumedProducerTimestamp,
offsetLag,
(isLagAcceptable ? "<" : ">"),
offsetThreshold,
lagLogFooter);
}
Expand Down Expand Up @@ -1399,10 +1403,13 @@ public long getRegionHybridOffsetLag(int regionId) {
}

// Fall back to calculate offset lag in the old way
return (cachedPubSubMetadataGetter
.getOffset(getTopicManager(kafkaSourceAddress), currentLeaderTopic, pcs.getUserPartition()) - 1)
- pcs.getLeaderConsumedUpstreamRTOffset(kafkaSourceAddress);
return measureLagWithCallToPubSub(
kafkaSourceAddress,
currentLeaderTopic,
pcs.getUserPartition(),
pcs.getLeaderConsumedUpstreamRTOffset(kafkaSourceAddress));
})
.filter(VALID_LAG)
.sum();

return minZeroLag(offsetLag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class CachedPubSubMetadataGetter {
* @return Users of this method should be aware that Kafka will actually
* return the next available offset rather the latest used offset. Therefore,
* the value will be 1 offset greater than what's expected.
*
* TODO: Refactor this using PubSubTopicPartition
*/
long getOffset(TopicManager topicManager, PubSubTopic pubSubTopic, int partitionId) {
final String sourcePubSubServer = topicManager.getPubSubBootstrapServers();
Expand Down
Loading

0 comments on commit 459d9e0

Please sign in to comment.