Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import com.linkedin.davinci.stats.ingestion.heartbeat.AggregatedHeartbeatLagEntry;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.VeniceMetricsConfig;
import com.linkedin.venice.throttle.VeniceAdaptiveThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.concurrent.LatencyPercentileProvider;
import com.linkedin.venice.utils.concurrent.LatencyPercentileProvider.LatencyType;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
Expand All @@ -21,6 +24,16 @@
/**
* This class contains service to periodically refresh all the signals for throttlers and update all registered throttler
* based on new signal values.
*
* <p><b>Read-latency signal source</b> is controlled by
* {@link com.linkedin.venice.stats.VeniceMetricsConfig#USE_SELF_CONTAINED_STATS}:
* <ul>
* <li>{@code false} — reads p99 from the Tehuti windowed percentile registered by
* {@code ServerHttpRequestStats} via {@code metricsRepository.getMetric(...).value()}.</li>
* <li>{@code true} — reads p99 from an independent {@link LatencyPercentileProvider} owned by
* this service, fed by {@code ServerHttpRequestStats} at recording time, so the signal is
* functional even when the Tehuti dependency is removed.</li>
* </ul>
*/
public class AdaptiveThrottlerSignalService extends AbstractVeniceService {
public static final long HEARTBEAT_LAG_LIMIT = TimeUnit.MINUTES.toMillis(10);
Expand All @@ -40,6 +53,14 @@ public class AdaptiveThrottlerSignalService extends AbstractVeniceService {
private final HeartbeatMonitoringService heartbeatMonitoringService;
private final List<VeniceAdaptiveThrottler> throttlerList = new ArrayList<>();
private final ScheduledExecutorService updateService;
/**
* Independent read-latency p99 source; non-null only when
* {@link #useSelfContainedStats} is {@code true}. Exposed via
* {@link #getLatencyPercentileProvider()} so the recording side (for example
* {@code ServerHttpRequestStats}) can feed it.
*/
private final LatencyPercentileProvider latencyPercentileProvider;
private final boolean useSelfContainedStats;
private boolean singleGetLatencySignal = false;
private boolean multiGetLatencySignal = false;
private boolean readComputeLatencySignal = false;
Expand All @@ -60,6 +81,8 @@ public AdaptiveThrottlerSignalService(
this.adaptiveThrottlerSignalRefreshIntervalInSeconds =
veniceServerConfig.getAdaptiveThrottlerSignalRefreshIntervalInSeconds();
this.metricsRepository = metricsRepository;
this.useSelfContainedStats = VeniceMetricsConfig.useSelfContainedStats(metricsRepository);
this.latencyPercentileProvider = useSelfContainedStats ? new LatencyPercentileProvider() : null;
this.updateService = Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory("AdaptiveThrottlerSignalService", veniceServerConfig.getLogContext()));
this.heartbeatMonitoringService = heartbeatMonitoringService;
Expand All @@ -75,6 +98,11 @@ public AdaptiveThrottlingServiceStats getAdaptiveThrottlingServiceStats() {
return adaptiveThrottlingServiceStats;
}

/** @return the independent p99 provider, or {@code null} when using the legacy Tehuti path. */
public LatencyPercentileProvider getLatencyPercentileProvider() {
return latencyPercentileProvider;
}

public void refreshSignalAndThrottler() {
// Update all the signals in one shot;
updateReadLatencySignal();
Expand All @@ -86,24 +114,32 @@ public void refreshSignalAndThrottler() {
}

void updateReadLatencySignal() {
Metric hostSingleGetLatencyP99Metric = metricsRepository.getMetric(SINGLE_GET_LATENCY_P99_METRIC_NAME);
Metric hostMultiGetLatencyP99Metric = metricsRepository.getMetric(MULTI_GET_LATENCY_P99_METRIC_NAME);
Metric hostReadComputeLatencyP99Metric = metricsRepository.getMetric(READ_COMPUTE_LATENCY_P99_METRIC_NAME);
double hostSingleGetLatencyP99 = 0;
double hostMultiGetLatencyP99 = 0;
double hostReadComputeLatencyP99 = 0;

if (hostSingleGetLatencyP99Metric != null) {
hostSingleGetLatencyP99 = hostSingleGetLatencyP99Metric.value();
singleGetLatencySignal = hostSingleGetLatencyP99 > singleGetLatencyP99Threshold;
}
if (hostMultiGetLatencyP99Metric != null) {
hostMultiGetLatencyP99 = hostMultiGetLatencyP99Metric.value();
multiGetLatencySignal = hostMultiGetLatencyP99 > multiGetLatencyP99Threshold;
}
if (hostReadComputeLatencyP99Metric != null) {
hostReadComputeLatencyP99 = hostReadComputeLatencyP99Metric.value();
readComputeLatencySignal = hostReadComputeLatencyP99 > readComputeLatencyP99Threshold;
double hostSingleGetLatencyP99;
double hostMultiGetLatencyP99;
double hostReadComputeLatencyP99;
if (useSelfContainedStats) {
hostSingleGetLatencyP99 = latencyPercentileProvider.getP99(LatencyType.SINGLE_GET);
hostMultiGetLatencyP99 = latencyPercentileProvider.getP99(LatencyType.MULTI_GET);
hostReadComputeLatencyP99 = latencyPercentileProvider.getP99(LatencyType.READ_COMPUTE);
applyLatencySignals(hostSingleGetLatencyP99, hostMultiGetLatencyP99, hostReadComputeLatencyP99);
} else {
Metric hostSingleGetLatencyP99Metric = metricsRepository.getMetric(SINGLE_GET_LATENCY_P99_METRIC_NAME);
Metric hostMultiGetLatencyP99Metric = metricsRepository.getMetric(MULTI_GET_LATENCY_P99_METRIC_NAME);
Metric hostReadComputeLatencyP99Metric = metricsRepository.getMetric(READ_COMPUTE_LATENCY_P99_METRIC_NAME);
hostSingleGetLatencyP99 = 0;
hostMultiGetLatencyP99 = 0;
hostReadComputeLatencyP99 = 0;

if (hostSingleGetLatencyP99Metric != null) {
hostSingleGetLatencyP99 = hostSingleGetLatencyP99Metric.value();
}
if (hostMultiGetLatencyP99Metric != null) {
hostMultiGetLatencyP99 = hostMultiGetLatencyP99Metric.value();
}
if (hostReadComputeLatencyP99Metric != null) {
hostReadComputeLatencyP99 = hostReadComputeLatencyP99Metric.value();
}
applyLatencySignals(hostSingleGetLatencyP99, hostMultiGetLatencyP99, hostReadComputeLatencyP99);
Comment on lines +133 to +142
}
LOGGER.info(
"Update read latency signal. singleGet: {} {}, multiGet: {} {}, readCompute: {} {}",
Expand All @@ -115,6 +151,12 @@ void updateReadLatencySignal() {
readComputeLatencySignal);
}

private void applyLatencySignals(double sg, double mg, double rc) {
singleGetLatencySignal = sg > singleGetLatencyP99Threshold;
multiGetLatencySignal = mg > multiGetLatencyP99Threshold;
readComputeLatencySignal = rc > readComputeLatencyP99Threshold;
}

void updateHeartbeatLatencySignal() {
AggregatedHeartbeatLagEntry maxLeaderHeartbeatLag = heartbeatMonitoringService.getMaxLeaderHeartbeatLag();
if (maxLeaderHeartbeatLag != null) {
Expand Down
Loading