Skip to content

Commit 3e4f186

Browse files
authored
Add fallback logic to thanos promql engine (#6630)
Signed-off-by: SungJin1212 <[email protected]>
1 parent 1d5b305 commit 3e4f186

File tree

310 files changed

+71662
-425
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

310 files changed

+71662
-425
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ require (
5151
github.com/spf13/afero v1.11.0
5252
github.com/stretchr/testify v1.10.0
5353
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
54-
github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c
54+
github.com/thanos-io/promql-engine v0.0.0-20250302135832-accbf0891a16
5555
github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80
5656
github.com/uber/jaeger-client-go v2.30.0+incompatible
5757
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,8 +1687,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
16871687
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
16881688
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
16891689
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
1690-
github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c h1:STCm5S4Aht3hOR0WQ0B3daZv21GQC13uPYIfkcN762U=
1691-
github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs=
1690+
github.com/thanos-io/promql-engine v0.0.0-20250302135832-accbf0891a16 h1:ezd8hNCWiGQr4kdfCHFa0VCSi+LAO/28Mna264nDs2c=
1691+
github.com/thanos-io/promql-engine v0.0.0-20250302135832-accbf0891a16/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs=
16921692
github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80 h1:mOCRYn9SLBWJCXAdP+qDfgZDc0eqDxDc2HZGKTZ5vzk=
16931693
github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80/go.mod h1:Y7D8la8B5rpzRVKq2HCR4hbYZ4LGroSPqIJjtizgQg8=
16941694
github.com/tjhop/slog-gokit v0.1.2 h1:pmQI4SvU9h4gA0vIQsdhJQSqQg4mOmsPykG2/PM3j1I=

pkg/querier/engine_factory.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package querier
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/client_golang/prometheus/promauto"
9+
"github.com/prometheus/prometheus/promql"
10+
"github.com/prometheus/prometheus/storage"
11+
"github.com/thanos-io/promql-engine/engine"
12+
"github.com/thanos-io/promql-engine/logicalplan"
13+
)
14+
15+
type EngineFactory struct {
16+
prometheusEngine *promql.Engine
17+
thanosEngine *engine.Engine
18+
19+
fallbackQueriesTotal prometheus.Counter
20+
}
21+
22+
func NewEngineFactory(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *EngineFactory {
23+
prometheusEngine := promql.NewEngine(opts)
24+
25+
var thanosEngine *engine.Engine
26+
if enableThanosEngine {
27+
thanosEngine = engine.New(engine.Opts{
28+
EngineOpts: opts,
29+
LogicalOptimizers: logicalplan.AllOptimizers,
30+
EnableAnalysis: true,
31+
})
32+
}
33+
34+
return &EngineFactory{
35+
prometheusEngine: prometheusEngine,
36+
thanosEngine: thanosEngine,
37+
fallbackQueriesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
38+
Name: "cortex_thanos_engine_fallback_queries_total",
39+
Help: "Total number of fallback queries due to not implementation in thanos engine",
40+
}),
41+
}
42+
}
43+
44+
func (qf *EngineFactory) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
45+
if qf.thanosEngine != nil {
46+
res, err := qf.thanosEngine.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
47+
if err != nil {
48+
if engine.IsUnimplemented(err) {
49+
// fallback to use prometheus engine
50+
qf.fallbackQueriesTotal.Inc()
51+
goto fallback
52+
}
53+
return nil, err
54+
}
55+
return res, nil
56+
}
57+
58+
fallback:
59+
return qf.prometheusEngine.NewInstantQuery(ctx, q, opts, qs, ts)
60+
}
61+
62+
func (qf *EngineFactory) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
63+
if qf.thanosEngine != nil {
64+
res, err := qf.thanosEngine.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
65+
if err != nil {
66+
if engine.IsUnimplemented(err) {
67+
// fallback to use prometheus engine
68+
qf.fallbackQueriesTotal.Inc()
69+
goto fallback
70+
}
71+
return nil, err
72+
}
73+
return res, nil
74+
}
75+
76+
fallback:
77+
return qf.prometheusEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
78+
}
79+
80+
func fromPromQLOpts(opts promql.QueryOpts) *engine.QueryOpts {
81+
if opts == nil {
82+
return &engine.QueryOpts{}
83+
}
84+
return &engine.QueryOpts{
85+
LookbackDeltaParam: opts.LookbackDelta(),
86+
EnablePerStepStatsParam: opts.EnablePerStepStats(),
87+
}
88+
}

pkg/querier/engine_factory_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package querier
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"testing"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/testutil"
12+
"github.com/prometheus/prometheus/promql/parser"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/cortexproject/cortex/pkg/util/flagext"
16+
"github.com/cortexproject/cortex/pkg/util/validation"
17+
)
18+
19+
func TestEngineFactory_Fallback(t *testing.T) {
20+
// add unimplemented function
21+
parser.Functions["unimplemented"] = &parser.Function{
22+
Name: "unimplemented",
23+
ArgTypes: []parser.ValueType{parser.ValueTypeVector},
24+
ReturnType: parser.ValueTypeVector,
25+
}
26+
27+
cfg := Config{}
28+
flagext.DefaultValues(&cfg)
29+
cfg.ThanosEngine = true
30+
ctx := context.Background()
31+
reg := prometheus.NewRegistry()
32+
33+
chunkStore := &emptyChunkStore{}
34+
distributor := &errDistributor{}
35+
36+
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
37+
require.NoError(t, err)
38+
39+
now := time.Now()
40+
start := time.Now().Add(-time.Minute * 5)
41+
step := time.Minute
42+
queryable, _, queryEngine := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, reg, log.NewNopLogger(), nil)
43+
44+
// instant query, should go to fallback
45+
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now)
46+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
47+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
48+
# TYPE cortex_thanos_engine_fallback_queries_total counter
49+
cortex_thanos_engine_fallback_queries_total 1
50+
`), "cortex_thanos_engine_fallback_queries_total"))
51+
52+
// range query, should go to fallback
53+
_, _ = queryEngine.NewRangeQuery(ctx, queryable, nil, "unimplemented(foo)", start, now, step)
54+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
55+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
56+
# TYPE cortex_thanos_engine_fallback_queries_total counter
57+
cortex_thanos_engine_fallback_queries_total 2
58+
`), "cortex_thanos_engine_fallback_queries_total"))
59+
}

pkg/querier/querier.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919
"github.com/prometheus/prometheus/promql/parser"
2020
"github.com/prometheus/prometheus/storage"
2121
"github.com/prometheus/prometheus/util/annotations"
22-
"github.com/thanos-io/promql-engine/engine"
23-
"github.com/thanos-io/promql-engine/logicalplan"
2422
"github.com/thanos-io/thanos/pkg/strutil"
2523
"golang.org/x/sync/errgroup"
2624

@@ -208,7 +206,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
208206
// The cortex supports holt_winters for users using this function.
209207
EnableExperimentalPromQLFunctions(cfg.EnablePromQLExperimentalFunctions, true)
210208

211-
var queryEngine promql.QueryEngine
212209
opts := promql.EngineOpts{
213210
Logger: util_log.GoKitLogToSlog(logger),
214211
Reg: reg,
@@ -223,15 +220,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
223220
return cfg.DefaultEvaluationInterval.Milliseconds()
224221
},
225222
}
226-
if cfg.ThanosEngine {
227-
queryEngine = engine.New(engine.Opts{
228-
EngineOpts: opts,
229-
LogicalOptimizers: logicalplan.AllOptimizers,
230-
EnableAnalysis: true,
231-
})
232-
} else {
233-
queryEngine = promql.NewEngine(opts)
234-
}
223+
queryEngine := NewEngineFactory(opts, cfg.ThanosEngine, reg)
235224
return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine
236225
}
237226

vendor/github.com/thanos-io/promql-engine/engine/distributed.go

Lines changed: 35 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)