Skip to content

Commit

Permalink
pkg/parcacol: Add QueryRange filtering for delta queries
Browse files Browse the repository at this point in the history
  • Loading branch information
metalmatze committed May 12, 2023
1 parent 8b3f48a commit 9caa2fa
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 73 deletions.
151 changes: 124 additions & 27 deletions pkg/parcacol/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (q *Querier) QueryRange(
filterExpr := logicalplan.And(exprs...)

if queryParts.Delta {
return q.queryRangeDelta(ctx, filterExpr, step, queryParts.Meta.SampleType.Unit)
return q.queryRangeDelta(ctx, filterExpr, step, queryParts.Meta.SampleType.Unit, filterQuery)
}

return q.queryRangeNonDelta(ctx, filterExpr, step, filterQuery)
Expand All @@ -331,7 +331,16 @@ const (
ColumnValueSum = "sum(" + ColumnValue + ")"
)

func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, sampleTypeUnit string) ([]*pb.MetricsSeries, error) {
func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, sampleTypeUnit, filterQuery string) ([]*pb.MetricsSeries, error) {
groupBy := []logicalplan.Expr{
logicalplan.DynCol(ColumnLabels),
logicalplan.Duration(step),
}
if filterQuery != "" {
// If we have a filter query we need to group by the stacktrace column as well to be able to filter them out.
groupBy = append(groupBy, logicalplan.Col(ColumnStacktrace))
}

records := []arrow.Record{}
rows := 0
err := q.engine.ScanTable(q.tableName).
Expand All @@ -343,10 +352,7 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
logicalplan.Sum(logicalplan.Col(ColumnValue)),
logicalplan.Count(logicalplan.Col(ColumnValue)),
},
[]logicalplan.Expr{
logicalplan.DynCol(ColumnLabels),
logicalplan.Duration(step),
},
groupBy,
).
Execute(ctx, func(ctx context.Context, r arrow.Record) error {
r.Retain()
Expand All @@ -371,18 +377,32 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
Timestamp int
ValueCount int
ValueSum int
Stacktrace int
}{
DurationSum: -1,
PeriodSum: -1,
Timestamp: -1,
ValueCount: -1,
ValueSum: -1,
Stacktrace: -1,
}

matchingStacktraces := map[string]bool{}
labelColumnIndices := []int{}
labelsetToIndex := map[string]int{}

labelSet := labels.Labels{}
resSeries := []*pb.MetricsSeries{}
labelsetToIndex := map[string]int{}

// These structs are only used when filtering for specific stacktraces.
// They are used as intermediate helpers before being converted to the final MetricsSeries.
type metricsSeriesStacktrace struct {
durationSum int64
periodSum int64
valueSum int64
valueCount int64
}
resSeriesStacktraces := map[int]map[int64]metricsSeriesStacktrace{}

for _, ar := range records {
fields := ar.Schema().Fields()
Expand All @@ -403,6 +423,9 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
case ColumnValueSum:
columnIndices.ValueSum = i
continue
case ColumnStacktrace:
columnIndices.Stacktrace = i
continue
}

if strings.HasPrefix(field.Name, "labels.") {
Expand All @@ -425,6 +448,9 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
if columnIndices.ValueSum == -1 {
return nil, errors.New("sum(value) column not found")
}
if columnIndices.Stacktrace == -1 && filterQuery != "" {
return nil, errors.New("stacktrace column not found")
}

for i := 0; i < int(ar.NumRows()); i++ {
labelSet = labelSet[:0]
Expand Down Expand Up @@ -454,6 +480,7 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
resSeries = append(resSeries, &pb.MetricsSeries{Labelset: &profilestorepb.LabelSet{Labels: pbLabelSet}})
index = len(resSeries) - 1
labelsetToIndex[s] = index
resSeriesStacktraces[index] = map[int64]metricsSeriesStacktrace{}
}

ts := ar.Column(columnIndices.Timestamp).(*array.Int64).Value(i)
Expand All @@ -462,30 +489,66 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
valueSum := ar.Column(columnIndices.ValueSum).(*array.Int64).Value(i)
valueCount := ar.Column(columnIndices.ValueCount).(*array.Int64).Value(i)

// TODO: We should do these period and duration calculations in frostDB,
// so that we can push these down as projections.
if filterQuery == "" {
resSeries[index].Samples = append(resSeries[index].Samples,
deltaMetricsSample(ts, durationSum, periodSum, valueSum, valueCount, sampleTypeUnit),
)
} else {
// If we have a filter query we iterate through each stacktrace for each timestamp.
// Only if a stacktrace matches the filter query we add its values to the intermediate metricsSeriesStacktrace.
// After all timestamps have been seen we compute the final values and valuesPerSecond below.
stacktrace := ar.Column(columnIndices.Stacktrace).(*array.Binary).Value(i)

matches, found := matchingStacktraces[string(stacktrace)]
if !found {
matches, err = q.matchingStacktrace(ctx, string(stacktrace), filterQuery)
if err != nil {
return nil, err
}
matchingStacktraces[string(stacktrace)] = matches
}

// Because we store the period with each sample yet query for the sum(period) we need to normalize by the amount of values (rows in a database).
period := periodSum / valueCount
// Because we store the duration with each sample yet query for the sum(duration) we need to normalize by the amount of values (rows in a database).
duration := durationSum / valueCount
if sample, found := resSeriesStacktraces[index][ts]; found {
if matches {
sample.durationSum += durationSum
sample.periodSum += periodSum
sample.valueSum += valueSum
sample.valueCount += valueCount
resSeriesStacktraces[index][ts] = sample
}
continue
}

// If we have a CPU samples value type we make sure we always do the next calculation with cpu nanoseconds.
// If we already have CPU nanoseconds we don't need to multiply by the period.
valuePerSecondSum := valueSum
if sampleTypeUnit != "nanoseconds" {
valuePerSecondSum = valueSum * period
var sample metricsSeriesStacktrace
if matches {
// only set values if we have a match otherwise they will be 0
sample = metricsSeriesStacktrace{
durationSum: durationSum,
periodSum: periodSum,
valueSum: valueSum,
valueCount: valueCount,
}
}
resSeriesStacktraces[index][ts] = sample
}
}
}

valuePerSecond := float64(valuePerSecondSum) / float64(duration)

series := resSeries[index]
series.Samples = append(series.Samples, &pb.MetricsSample{
Timestamp: timestamppb.New(timestamp.Time(ts)),
Value: valueSum,
ValuePerSecond: valuePerSecond,
Duration: duration,
})
if filterQuery != "" {
// We have aggregated the metric samples for each timestamp if the underlying stacktrace match.
// Now we need to convert those raw values to the metric samples value and valuePerSecond.
for i, times := range resSeriesStacktraces {
for ts, sample := range times {
resSeries[i].Samples = append(resSeries[i].Samples,
deltaMetricsSample(
ts,
sample.durationSum,
sample.periodSum,
sample.valueSum,
sample.valueCount,
sampleTypeUnit,
))
}
}
}

Expand All @@ -499,6 +562,40 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
return resSeries, nil
}

func deltaMetricsSample(ts, durationSum, periodSum, valueSum, valueCount int64, sampleTypeUnit string) *pb.MetricsSample {
// If the valueCount is zero we cannot do any calculations.
// We simply return a sample with all values set to 0.
if valueCount == 0 {
return &pb.MetricsSample{
Timestamp: timestamppb.New(timestamp.Time(ts)),
// everything else is 0
}
}

// TODO: We should do these period and duration calculations in frostDB, so that we can push these down as projections.

// Because we store the period with each sample yet query for the sum(period) we need to normalize by the amount of values (rows in a database).
period := periodSum / valueCount
// Because we store the duration with each sample yet query for the sum(duration) we need to normalize by the amount of values (rows in a database).
duration := durationSum / valueCount

// If we have a CPU samples value type we make sure we always do the next calculation with cpu nanoseconds.
// If we already have CPU nanoseconds we don't need to multiply by the period.
valuePerSecondSum := valueSum
if sampleTypeUnit != "nanoseconds" {
valuePerSecondSum = valueSum * period
}

valuePerSecond := float64(valuePerSecondSum) / float64(duration)

return &pb.MetricsSample{
Timestamp: timestamppb.New(timestamp.Time(ts)),
Value: valueSum,
ValuePerSecond: valuePerSecond,
Duration: duration,
}
}

func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, filterQuery string) ([]*pb.MetricsSeries, error) {
groupBy := []logicalplan.Expr{
logicalplan.DynCol(ColumnLabels),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ const MetricsTooltip = ({
<td className="w-1/4">Value</td>
<td className="w-3/4">
{valueFormatter(highlighted.valuePerSecond, sampleUnit, 5)}{' '}
{highlighted.valuePercentage > 0 &&
<>({valueFormatter(highlighted.valuePercentage, 'percentage', 2)})</>
}
</td>
</tr>
{delta && (
Expand Down
5 changes: 1 addition & 4 deletions ui/packages/shared/profile/src/MetricsGraph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ export interface HighlightedSeries {
timestamp: number;
value: number;
valuePerSecond: number;
valuePercentage: number;
duration: number;
x: number;
y: number;
Expand Down Expand Up @@ -141,7 +140,7 @@ export const RawMetricsGraph = ({
values: s.samples.reduce<number[][]>(function (agg: number[][], d: MetricsSample) {
if (d.timestamp !== undefined && d.valuePerSecond !== undefined) {
const t = (+d.timestamp.seconds * 1e9 + d.timestamp.nanos) / 1e6; // https://github.com/microsoft/TypeScript/issues/5710#issuecomment-157886246
agg.push([t, d.valuePerSecond, parseFloat(d.value), parseFloat(d.duration), d.valuePrecision]);
agg.push([t, d.valuePerSecond, parseFloat(d.value), parseFloat(d.duration)]);
}
return agg;
}, []),
Expand Down Expand Up @@ -212,7 +211,6 @@ export const RawMetricsGraph = ({
labels: series[closestSeriesIndex].metric,
timestamp: point[0],
valuePerSecond: point[1],
valuePercentage: point[4],
value: point[2],
duration: point[3],
x: xScale(point[0]),
Expand Down Expand Up @@ -338,7 +336,6 @@ export const RawMetricsGraph = ({
seriesIndex,
timestamp: sample[0],
valuePerSecond: sample[1],
valuePercentage: sample[4],
value: sample[2],
duration: sample[3],
x: xScale(sample[0]),
Expand Down
16 changes: 8 additions & 8 deletions ui/packages/shared/profile/src/MetricsSeries/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ interface MetricsSeriesProps {

const MetricsSeries = ({data, line, color, strokeWidth}: MetricsSeriesProps): JSX.Element => (
<g className="line-group">
<path
<path
className="line"
d={line(data.values) ?? undefined}
style={{
stroke: color,
d={line(data.values) ?? undefined}
style={{
stroke: color,
strokeWidth,
}}
/>
</g>
);
}}
/>
</g>
);

export default MetricsSeries;
25 changes: 16 additions & 9 deletions ui/packages/shared/profile/src/ProfileMetricsGraph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ export const useQueryRange = (

const stepDuration = getStepDuration(start, end);

const call = client.queryRange( {
query: queryExpression,
start: Timestamp.fromDate(new Date(start)),
end: Timestamp.fromDate(new Date(end)),
step: Duration.create(stepDuration),
limit: 0,
filterQuery: filterByFunction,
},
const call = client.queryRange(
{
query: queryExpression,
start: Timestamp.fromDate(new Date(start)),
end: Timestamp.fromDate(new Date(end)),
step: Duration.create(stepDuration),
limit: 0,
filterQuery: filterByFunction,
},
{meta: metadata}
);
call.response
Expand All @@ -96,7 +97,13 @@ const ProfileMetricsGraph = ({
addLabelMatcher,
onPointClick,
}: ProfileMetricsGraphProps): JSX.Element => {
const {isLoading, response, error} = useQueryRange(queryClient, queryExpression, from, to, filterByFunction);
const {isLoading, response, error} = useQueryRange(
queryClient,
queryExpression,
from,
to,
filterByFunction
);
const isLoaderVisible = useDelayedLoader(isLoading);
const {loader, onError, perf} = useParcaContext();

Expand Down
Loading

0 comments on commit 9caa2fa

Please sign in to comment.