Skip to content

Commit

Permalink
Hotfix 0.4.161 stuck consumer metric and repair (#782)
Browse files Browse the repository at this point in the history
* [server] Introduced a repair service for stuck shared consumer (#757)

* [server] Introduced a repair service for stuck shared consumer

Recently, we noticed that some shared consumers were stuck since they
were blocked by producing to some non-existing topics, and many other
store versions got affected since the stuck consumer is being shared
by many other stores.
This PR introduces a way to detect and repair stuck consumer for above
situation.
In the high level, there will be a thread periodically check whether consumer#poll
is being invoked regularly or not, and if not, the thread will scan all
the ingestion tasks to see whether they are producing to any non-existing topics
or not. And for the store ingestion task, which is talking to some non-existing topic,
the repair thread will try to close the stuck KafkaProducer and kill the corresponding
ingestion task.
This PR also checked the non-existing topics for a configurable duration to tolerate
tranisent topic metadata propagaton delay.

5 new server configs:

server.stuck.consumer.repair.enabled : default true
server.stuck.consumer.repair.second : default 1 min
server.stuck.consumer.repair.threshold.second : default 5 mins
server.non.existing.topic.ingestion.task.kill.threshold.second: default 15 mins
server.non.existing.topic.check.retry.interval.second: default 1 min


3 new server metrics:

.StuckConsumerRepair--stuck_consumer_found.OccurrenceRate
.StuckConsumerRepair--ingestion_task_repair.OccurrenceRate
.StuckConsumerRepair--repair_failure.OccurrenceRate

* [server][da-vinci] Construct the MaxElapsedTimeSinceLastPoll Gauge metric with function instead of value (#781)

Constructing Venice Gauge metric with value is pointless; Venice Gauge metric
can only work with function.

* [client][server] Emit system store metric only for server (#764)

[server][client] Emit system store metric only for server
System store metrics are useful only in server side. This PR prevents emission of meta system store metrics in DaVinci/thin clients.

Co-authored-by: Sourav Maji <[email protected]>

---------

Co-authored-by: Min Huang <[email protected]>
Co-authored-by: Sourav Maji <[email protected]>
Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
4 people authored Dec 1, 2023
1 parent 9a2bf95 commit 8abb661
Show file tree
Hide file tree
Showing 20 changed files with 704 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_IDLE_TIME_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_WORKER_THREADS;
import static com.linkedin.venice.ConfigKeys.SERVER_NODE_CAPACITY_RCU;
import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_NUM_SCHEMA_FAST_CLASS_WARMUP;
import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_NO_READ_THRESHOLD_SECONDS;
Expand Down Expand Up @@ -110,6 +112,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_SSL_HANDSHAKE_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_STORE_TO_EARLY_TERMINATION_THRESHOLD_MS_MAP;
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED;
Expand Down Expand Up @@ -445,6 +450,12 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final long ingestionHeartbeatIntervalMs;

private final boolean stuckConsumerRepairEnabled;
private final int stuckConsumerRepairIntervalSecond;
private final int stuckConsumerDetectionRepairThresholdSecond;
private final int nonExistingTopicIngestionTaskKillThresholdSecond;
private final int nonExistingTopicCheckRetryIntervalSecond;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
}
Expand Down Expand Up @@ -731,6 +742,21 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
metaStoreWriterCloseConcurrency = serverProperties.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
ingestionHeartbeatIntervalMs =
serverProperties.getLong(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1));

stuckConsumerRepairEnabled = serverProperties.getBoolean(SERVER_STUCK_CONSUMER_REPAIR_ENABLED, true);
stuckConsumerRepairIntervalSecond = serverProperties.getInt(SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND, 60);
stuckConsumerDetectionRepairThresholdSecond =
serverProperties.getInt(SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND, 5 * 60); // 5 mins
if (stuckConsumerRepairEnabled && stuckConsumerDetectionRepairThresholdSecond < stuckConsumerRepairIntervalSecond) {
throw new VeniceException(
"Config for " + SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND + ": "
+ stuckConsumerDetectionRepairThresholdSecond + " should be equal to or larger than "
+ SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND + ": " + stuckConsumerRepairIntervalSecond);
}
nonExistingTopicIngestionTaskKillThresholdSecond =
serverProperties.getInt(SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND, 15 * 60); // 15 mins
nonExistingTopicCheckRetryIntervalSecond =
serverProperties.getInt(SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND, 60); // 1min
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1284,4 +1310,24 @@ public int getMetaStoreWriterCloseConcurrency() {
public long getIngestionHeartbeatIntervalMs() {
return ingestionHeartbeatIntervalMs;
}

public boolean isStuckConsumerRepairEnabled() {
return stuckConsumerRepairEnabled;
}

public int getStuckConsumerRepairIntervalSecond() {
return stuckConsumerRepairIntervalSecond;
}

public int getStuckConsumerDetectionRepairThresholdSecond() {
return stuckConsumerDetectionRepairThresholdSecond;
}

public int getNonExistingTopicIngestionTaskKillThresholdSecond() {
return nonExistingTopicIngestionTaskKillThresholdSecond;
}

public int getNonExistingTopicCheckRetryIntervalSecond() {
return nonExistingTopicCheckRetryIntervalSecond;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.StuckConsumerRepairStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
Expand All @@ -16,15 +17,24 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -56,6 +66,9 @@ public class AggKafkaConsumerService extends AbstractVeniceService {
private final TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier;
private final Function<String, String> kafkaClusterUrlResolver;

private final Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping = new VeniceConcurrentHashMap<>();
private ScheduledExecutorService stuckConsumerRepairExecutorService;

public AggKafkaConsumerService(
final PubSubConsumerAdapterFactory consumerFactory,
TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier,
Expand All @@ -65,7 +78,8 @@ public AggKafkaConsumerService(
KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler,
final MetricsRepository metricsRepository,
TopicExistenceChecker topicExistenceChecker,
final PubSubMessageDeserializer pubSubDeserializer) {
final PubSubMessageDeserializer pubSubDeserializer,
Consumer<String> killIngestionTaskRunnable) {
this.consumerFactory = consumerFactory;
this.readCycleDelayMs = serverConfig.getKafkaReadCycleDelayMs();
this.numOfConsumersPerKafkaCluster = serverConfig.getConsumerPoolSizePerKafkaCluster();
Expand All @@ -83,6 +97,26 @@ public AggKafkaConsumerService(
this.pubSubDeserializer = pubSubDeserializer;
this.sslPropertiesSupplier = sslPropertiesSupplier;
this.kafkaClusterUrlResolver = serverConfig.getKafkaClusterUrlResolver();

if (serverConfig.isStuckConsumerRepairEnabled()) {
this.stuckConsumerRepairExecutorService = Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory(this.getClass().getName() + "-StuckConsumerRepair"));
int intervalInSeconds = serverConfig.getStuckConsumerRepairIntervalSecond();
this.stuckConsumerRepairExecutorService.scheduleAtFixedRate(
getStuckConsumerDetectionAndRepairRunnable(
kafkaServerToConsumerServiceMap,
versionTopicStoreIngestionTaskMapping,
TimeUnit.SECONDS.toMillis(serverConfig.getStuckConsumerDetectionRepairThresholdSecond()),
TimeUnit.SECONDS.toMillis(serverConfig.getNonExistingTopicIngestionTaskKillThresholdSecond()),
TimeUnit.SECONDS.toMillis(serverConfig.getNonExistingTopicCheckRetryIntervalSecond()),
new StuckConsumerRepairStats(metricsRepository),
killIngestionTaskRunnable),
intervalInSeconds,
intervalInSeconds,
TimeUnit.SECONDS);
LOGGER.info("Started stuck consumer repair service with checking interval: {} seconds", intervalInSeconds);
}

LOGGER.info("Successfully initialized AggKafkaConsumerService");
}

Expand All @@ -100,6 +134,106 @@ public void stopInner() throws Exception {
for (KafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) {
consumerService.stop();
}
if (this.stuckConsumerRepairExecutorService != null) {
this.stuckConsumerRepairExecutorService.shutdownNow();
}
}

protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
Map<String, KafkaConsumerService> kafkaServerToConsumerServiceMap,
Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping,
long stuckConsumerRepairThresholdMs,
long nonExistingTopicIngestionTaskKillThresholdMs,
long nonExistingTopicRetryIntervalMs,
StuckConsumerRepairStats stuckConsumerRepairStats,
Consumer<String> killIngestionTaskRunnable) {
return () -> {
/**
* The following logic can be further optimized in the following way:
* 1. If the max delay of previous run is much smaller than the threshold.
* 2. In the next run, the max possible delay will be schedule interval + previous max delay ms, and if it is
* still below the threshold, this function can return directly.
* We are not adopting such optimization right now because:
* 1. Extra state to maintain.
* 2. The schedule interval is supposed to be high.
* 3. The check is cheap when there is no stuck consumer.
*/
boolean scanStoreIngestionTaskToFixStuckConsumer = false;
for (KafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) {
long maxDelayMs = consumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool();
if (maxDelayMs >= stuckConsumerRepairThresholdMs) {
scanStoreIngestionTaskToFixStuckConsumer = true;
LOGGER.warn("Found some consumer has stuck for {} ms, will start the repairing procedure", maxDelayMs);
break;
}
}
if (!scanStoreIngestionTaskToFixStuckConsumer) {
return;
}
stuckConsumerRepairStats.recordStuckConsumerFound();

/**
* Collect a list of SITs, whose version topic doesn't exist by checking {@link StoreIngestionTask#isProducingVersionTopicHealthy()},
* and this function will continue to check the version topic healthiness for a period of {@link nonExistingTopicIngestionTaskKillThresholdMs}
* to tolerate transient topic discovery issue.
*/
Map<String, StoreIngestionTask> versionTopicIngestionTaskMappingForNonExistingTopic = new HashMap<>();
versionTopicStoreIngestionTaskMapping.forEach((vt, sit) -> {
try {
if (!sit.isProducingVersionTopicHealthy()) {
versionTopicIngestionTaskMappingForNonExistingTopic.put(vt, sit);
LOGGER.warn("The producing version topic:{} is not healthy", vt);
}
} catch (Exception e) {
LOGGER.error("Got exception while checking topic existence for version topic: {}", vt, e);
}
});
int maxAttempts =
(int) Math.ceil((double) nonExistingTopicIngestionTaskKillThresholdMs / nonExistingTopicRetryIntervalMs);
for (int cnt = 0; cnt < maxAttempts; ++cnt) {
Iterator<Map.Entry<String, StoreIngestionTask>> iterator =
versionTopicIngestionTaskMappingForNonExistingTopic.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, StoreIngestionTask> entry = iterator.next();
String versionTopic = entry.getKey();
StoreIngestionTask sit = entry.getValue();
try {
if (sit.isProducingVersionTopicHealthy()) {
/**
* If the version topic becomes available after retries, remove it from the tracking map.
*/
iterator.remove();
LOGGER.info("The producing version topic:{} becomes healthy", versionTopic);
}
} catch (Exception e) {
LOGGER.error("Got exception while checking topic existence for version topic: {}", versionTopic, e);
} finally {
Utils.sleep(nonExistingTopicRetryIntervalMs);
}
}
}

AtomicBoolean repairSomeIngestionTask = new AtomicBoolean(false);
versionTopicIngestionTaskMappingForNonExistingTopic.forEach((vt, sit) -> {
LOGGER.warn(
"The ingestion topics (version topic) are not healthy for "
+ "store version: {}, will kill the ingestion task to try to unblock shared consumer",
vt);
/**
* The following function call will interrupt all the stuck {@link org.apache.kafka.clients.producer.KafkaProducer#send} call
* to non-existing topics.
*/
sit.closeVeniceWriters(false);
killIngestionTaskRunnable.accept(vt);
repairSomeIngestionTask.set(true);
stuckConsumerRepairStats.recordIngestionTaskRepair();
});
if (!repairSomeIngestionTask.get()) {
LOGGER.error(
"Didn't find any suspicious ingestion task, and please contact developers to investigate it further");
stuckConsumerRepairStats.recordRepairFailure();
}
};
}

/**
Expand Down Expand Up @@ -235,6 +369,8 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L

consumerService.startConsumptionIntoDataReceiver(pubSubTopicPartition, lastOffset, dataReceiver);

versionTopicStoreIngestionTaskMapping.put(storeIngestionTask.getVersionTopic().getName(), storeIngestionTask);

return dataReceiver;
}

Expand All @@ -260,6 +396,7 @@ public long getLatestOffsetFor(
*/
void unsubscribeAll(PubSubTopic versionTopic) {
kafkaServerToConsumerServiceMap.values().forEach(consumerService -> consumerService.unsubscribeAll(versionTopic));
versionTopicStoreIngestionTaskMapping.remove(versionTopic.getName());
}

void pauseConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected KafkaConsumerService(
: createKafkaConsumerServiceStats(
metricsRepository,
kafkaClusterAlias,
this::getMaxElapsedTimeSinceLastPollInConsumerPool);
this::getMaxElapsedTimeMSSinceLastPollInConsumerPool);
for (int i = 0; i < numOfConsumersPerKafkaCluster; ++i) {
/**
* We need to assign a unique client id across all the storage nodes, otherwise, they will fail into the same throttling bucket.
Expand Down Expand Up @@ -324,7 +324,7 @@ private KafkaConsumerServiceStats createKafkaConsumerServiceStats(
getMaxElapsedTimeSinceLastPollInConsumerPool);
}

private long getMaxElapsedTimeSinceLastPollInConsumerPool() {
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool() {
long maxElapsedTimeSinceLastPollInConsumerPool = -1;
int slowestTaskId = -1;
long elapsedTimeSinceLastPoll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ public void handleStoreDeleted(Store store) {
kafkaClusterBasedRecordThrottler,
metricsRepository,
new MetadataRepoBasedTopicExistingCheckerImpl(this.getMetadataRepo()),
pubSubDeserializer);
pubSubDeserializer,
(topicName) -> this.killConsumptionTask(topicName));
/**
* After initializing a {@link AggKafkaConsumerService} service, it doesn't contain KafkaConsumerService yet until
* a new Kafka cluster is registered; here we explicitly create KafkaConsumerService for the local Kafka cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public LeaderFollowerStoreIngestionTask(
}

@Override
protected void closeVeniceWriters(boolean doFlush) {
public void closeVeniceWriters(boolean doFlush) {
if (veniceWriter.isPresent()) {
veniceWriter.get().close(doFlush);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,13 @@ public void run() {
LOGGER.info("{} has been killed.", consumerTaskId);
statusReportAdapter.reportKilled(partitionConsumptionStateMap.values(), e);
doFlush = false;
if (isCurrentVersion.getAsBoolean()) {
/**
* Current version can be killed if {@link AggKafkaConsumerService} discovers there are some issues with
* the producing topics, and here will report metrics for such case.
*/
handleIngestionException(e);
}
} catch (VeniceChecksumException e) {
/**
* It's possible to receive checksum verification failure exception here from the above syncOffset() call.
Expand Down Expand Up @@ -1521,7 +1528,7 @@ private void internalClose(boolean doFlush) {
LOGGER.info("Store ingestion task for store: {} is closed", kafkaVersionTopic);
}

protected void closeVeniceWriters(boolean doFlush) {
public void closeVeniceWriters(boolean doFlush) {
}

protected void closeVeniceViewWriters() {
Expand Down Expand Up @@ -3387,6 +3394,10 @@ public PubSubTopic getVersionTopic() {
return versionTopic;
}

public PubSubTopic getRealtimeTopic() {
return realTimeTopic;
}

public boolean isMetricsEmissionEnabled() {
return emitMetrics.get();
}
Expand Down Expand Up @@ -3770,4 +3781,21 @@ protected boolean shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey, KafkaMessag
protected void maybeSendIngestionHeartbeat() {
// No op, heartbeat is only useful for L/F hybrid stores.
}

/**
* This function is checking the following conditions:
* 1. Whether the version topic exists or not.
*/
public boolean isProducingVersionTopicHealthy() {
if (isDaVinciClient) {
/**
* DaVinci doesn't produce to any topics.
*/
return true;
}
if (!topicManagerRepository.getTopicManager().containsTopic(this.versionTopic)) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public KafkaConsumerServiceStats(
*/
registerSensor(
"max_elapsed_time_since_last_successful_poll",
new Gauge(getMaxElapsedTimeSinceLastPollInConsumerPool.getAsLong()));
new Gauge(() -> getMaxElapsedTimeSinceLastPollInConsumerPool.getAsLong()));
// consumer record number per second returned by Kafka consumer poll.
pollResultNumSensor = registerSensor("consumer_poll_result_num", new Avg(), new Total());
pollNonZeroResultNumSensor = registerSensor("consumer_poll_non_zero_result_num", new Avg(), new Total());
Expand Down
Loading

0 comments on commit 8abb661

Please sign in to comment.