Skip to content

Commit

Permalink
retrieve peakSamples and processedSamples query stats from results_ca…
Browse files Browse the repository at this point in the history
…che as well (#6591)

Signed-off-by: Erlan Zholdubai uulu <[email protected]>
  • Loading branch information
erlan-z authored Feb 18, 2025
1 parent a0e7e35 commit 7292d8d
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
* [BUGFIX] Compactor: Cleaner would delete bucket index when there is no block in bucket store. #6577
* [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595
* [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591

## 1.19.0 in progress

Expand Down
93 changes: 93 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,3 +859,96 @@ func TestQueryFrontendQueryRejection(t *testing.T) {
require.Contains(t, string(body), tripperware.QueryRejectErrorMessage)

}

func TestQueryFrontendStatsFromResultsCacheShouldBeSame(t *testing.T) {

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-querier.per-step-stats-enabled": strconv.FormatBool(true),
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-frontend.query-stats-enabled": strconv.FormatBool(true),
"-frontend.cache-queryable-samples-stats": strconv.FormatBool(true),
})

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

// Start the query-scheduler
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler))
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()

// Start the query-frontend.
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
require.NoError(t, s.Start(queryFrontend))

// Start all other services.
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")

querier := e2ecortex.NewQuerierWithConfigFile("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")

require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))

// Check if we're discovering memcache or not.
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers"))
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total"))

// Wait until both the distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push some series to Cortex.
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

seriesTimestamp := time.Now().Add(-10 * time.Minute)
series2Timestamp := seriesTimestamp.Add(1 * time.Minute)
series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"})
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query back the series.
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// First request that will hit the datasource.
resp, _, err := c.QueryRangeRaw(`{job="test"}`, seriesTimestamp.Add(-1*time.Minute), series2Timestamp.Add(1*time.Minute), 30*time.Second, map[string]string{})
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

values, err := queryFrontend.SumMetrics([]string{"cortex_query_samples_scanned_total"})
require.NoError(t, err)
numSamplesScannedTotal := e2e.SumValues(values)

// We send the same query to hit the results cache.
resp, _, err = c.QueryRangeRaw(`{job="test"}`, seriesTimestamp.Add(-1*time.Minute), series2Timestamp.Add(1*time.Minute), 30*time.Second, map[string]string{})
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

values, err = queryFrontend.SumMetrics([]string{"cortex_query_samples_scanned_total"})
require.NoError(t, err)
numSamplesScannedTotal2 := e2e.SumValues(values)

// we expect same amount of samples_scanned added to the metric despite the second query hit the cache.
require.Equal(t, numSamplesScannedTotal2, numSamplesScannedTotal*2)
}
1 change: 1 addition & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
"split_queries", splitQueries,
"status_code", statusCode,
"response_size", contentLength,
"samples_scanned", numScannedSamples,
}, stats.LoadExtraFields()...)

if numStoreGatewayTouchedPostings > 0 {
Expand Down
16 changes: 8 additions & 8 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,11 @@ func TestReportQueryStatsFormat(t *testing.T) {

tests := map[string]testCase{
"should not include query and header details if empty": {
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`,
},
"should include query length and string at the end": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 param_query=up`,
},
"should include query stats": {
queryStats: &querier_stats.QueryStats{
Expand All @@ -454,31 +454,31 @@ func TestReportQueryStatsFormat(t *testing.T) {
SplitQueries: 10,
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 query_storage_wall_time_seconds=6000`,
},
"should include user agent": {
header: http.Header{"User-Agent": []string{"Grafana"}},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 user_agent=Grafana`,
},
"should include response error": {
responseErr: errors.New("foo_err"),
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`,
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 error=foo_err`,
},
"should include query priority": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
queryStats: &querier_stats.QueryStats{
Priority: 99,
PriorityAssigned: true,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 priority=99 param_query=up`,
},
"should include data fetch min and max time": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
queryStats: &querier_stats.QueryStats{
DataSelectMaxTime: 1704153600000,
DataSelectMinTime: 1704067200000,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
},
"should include query stats with store gateway stats": {
queryStats: &querier_stats.QueryStats{
Expand All @@ -496,7 +496,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
StoreGatewayTouchedPostingBytes: 200,
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
},
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/querier/tripperware/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte

level.Debug(util_log.WithContext(ctx, log)).Log("msg", "handle hit", "start", r.GetStart(), "spanID", jaegerSpanID(ctx))

requests, responses, err := s.partition(r, extents)
requests, responses, err := s.partition(ctx, r, extents)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -647,7 +647,7 @@ func convertFromTripperwarePrometheusResponse(resp tripperware.Response) tripper

// partition calculates the required requests to satisfy req given the cached data.
// extents must be in order by start time.
func (s resultsCache) partition(req tripperware.Request, extents []tripperware.Extent) ([]tripperware.Request, []tripperware.Response, error) {
func (s resultsCache) partition(ctx context.Context, req tripperware.Request, extents []tripperware.Extent) ([]tripperware.Request, []tripperware.Response, error) {
var requests []tripperware.Request
var cachedResponses []tripperware.Response
start := req.GetStart()
Expand Down Expand Up @@ -678,7 +678,14 @@ func (s resultsCache) partition(req tripperware.Request, extents []tripperware.E
return nil, nil, err
}
// extract the overlap from the cached extent.
cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res))
promRes := s.extractor.Extract(start, req.GetEnd(), res).(*tripperware.PrometheusResponse)
cachedResponses = append(cachedResponses, promRes)

if queryStats := querier_stats.FromContext(ctx); queryStats != nil && promRes.Data.Stats != nil {
queryStats.AddScannedSamples(uint64(promRes.Data.Stats.Samples.TotalQueryableSamples))
queryStats.SetPeakSamples(max(queryStats.LoadPeakSamples(), uint64(promRes.Data.Stats.Samples.PeakSamples)))
}

start = extent.End
}

Expand Down Expand Up @@ -807,6 +814,7 @@ func extractStats(start, end int64, stats *tripperware.PrometheusResponseStats)
if start <= s.TimestampMs && s.TimestampMs <= end {
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, s)
result.Samples.TotalQueryableSamples += s.Value
result.Samples.PeakSamples = max(result.Samples.PeakSamples, s.Value)
}
}
return result
Expand Down
Loading

0 comments on commit 7292d8d

Please sign in to comment.