Skip to content

Commit 63f97d2

Browse files
committed
Introduce a regex tenant resolver
Signed-off-by: SungJin1212 <[email protected]>
1 parent d1dcdf0 commit 63f97d2

13 files changed

+828
-83
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
1010
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1111
* [FEATURE] Ruler: Add support for group labels. #6665
12+
* [FEATURE] Query federation: Introduce a regex tenant resolver to allow regex in `X-Scope-OrgID` value. #6713
13+
- Add a `tenant-federation.regex-matcher-enabled` flag. If it enabled, user can input regex to `X-Scope-OrgId`, the matched tenantIDs are automatically involved.
14+
- Add a `tenant-federation.user-sync-interval` flag, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs.
1215
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
1316
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
1417
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695

docs/configuration/config-file-reference.md

+12
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,18 @@ tenant_federation:
171171
# CLI flag: -tenant-federation.max-tenant
172172
[max_tenant: <int> | default = 0]
173173

174+
# [Experimental] If enabled, the `X-Scope-OrgID` header value can accept a
175+
# regex and the matched tenantIDs are automatically involved. The regex
176+
# matching rule follows the Prometheus, see the detail:
177+
# https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions.
178+
# CLI flag: -tenant-federation.regex-matcher-enabled
179+
[regex_matcher_enabled: <boolean> | default = false]
180+
181+
# If the regex matcher is enabled, it specifies how frequently to scan users.
182+
# The scanned users are used to calculate matched tenantIDs.
183+
# CLI flag: -tenant-federation.user-sync-interval
184+
[user_sync_interval: <duration> | default = 5m]
185+
174186
# The ruler_config configures the Cortex ruler.
175187
[ruler: <ruler_config>]
176188

docs/configuration/v1-guarantees.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ Currently experimental features are:
6464
- Blocks storage bucket index
6565
- The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
6666
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
67-
- Querier: tenant federation
67+
- Querier:
68+
- Tenant federation (`-tenant-federation.enabled`)
69+
- Enable regex matcher when the tenant federation is enabled (`-tenant-federation.regex-matcher-enabled`)
6870
- The thanosconvert tool for converting Thanos block metadata to Cortex
6971
- HA Tracker: cleanup of old replicas from KV Store.
7072
- Instance limits in ingester and distributor

integration/querier_tenant_federation_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,164 @@ type querierTenantFederationConfig struct {
2828

2929
func TestQuerierTenantFederation(t *testing.T) {
3030
runQuerierTenantFederationTest(t, querierTenantFederationConfig{})
31+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{})
3132
}
3233

3334
func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) {
3435
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
3536
querySchedulerEnabled: true,
3637
})
38+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
39+
querySchedulerEnabled: true,
40+
})
3741
}
3842

3943
func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) {
4044
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
4145
shuffleShardingEnabled: true,
4246
})
47+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
48+
shuffleShardingEnabled: true,
49+
})
4350
}
4451

4552
func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) {
4653
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
4754
querySchedulerEnabled: true,
4855
shuffleShardingEnabled: true,
4956
})
57+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
58+
querySchedulerEnabled: true,
59+
shuffleShardingEnabled: true,
60+
})
61+
}
62+
63+
func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) {
64+
const numUsers = 10
65+
66+
s, err := e2e.NewScenario(networkName)
67+
require.NoError(t, err)
68+
defer s.Close()
69+
70+
memcached := e2ecache.NewMemcached()
71+
consul := e2edb.NewConsul()
72+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
73+
74+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
75+
"-querier.cache-results": "true",
76+
"-querier.split-queries-by-interval": "24h",
77+
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
78+
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
79+
"-tenant-federation.enabled": "true",
80+
"-tenant-federation.regex-matcher-enabled": "true",
81+
"-tenant-federation.user-sync-interval": "1s",
82+
})
83+
84+
// Start the query-scheduler if enabled.
85+
var queryScheduler *e2ecortex.CortexService
86+
if cfg.querySchedulerEnabled {
87+
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
88+
require.NoError(t, s.StartAndWaitReady(queryScheduler))
89+
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
90+
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
91+
}
92+
93+
if cfg.shuffleShardingEnabled {
94+
// Use only single querier for each user.
95+
flags["-frontend.max-queriers-per-tenant"] = "1"
96+
}
97+
98+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
99+
require.NoError(t, s.StartAndWaitReady(minio))
100+
101+
// Start ingester and distributor.
102+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
103+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
104+
require.NoError(t, s.StartAndWaitReady(ingester, distributor))
105+
106+
// Wait until distributor have updated the ring.
107+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
108+
109+
// Push a series for each user to Cortex.
110+
now := time.Now()
111+
expectedVectors := make([]model.Vector, numUsers)
112+
tenantIDs := make([]string, numUsers)
113+
114+
for u := 0; u < numUsers; u++ {
115+
tenantIDs[u] = fmt.Sprintf("user-%d", u)
116+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u])
117+
require.NoError(t, err)
118+
119+
var series []prompb.TimeSeries
120+
series, expectedVectors[u] = generateSeries("series_1", now)
121+
122+
res, err := c.Push(series)
123+
require.NoError(t, err)
124+
require.Equal(t, 200, res.StatusCode)
125+
}
126+
127+
// Start the query-frontend.
128+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
129+
require.NoError(t, s.Start(queryFrontend))
130+
131+
if !cfg.querySchedulerEnabled {
132+
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
133+
}
134+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
135+
136+
var querier2 *e2ecortex.CortexService
137+
if cfg.shuffleShardingEnabled {
138+
querier2 = e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
139+
}
140+
141+
// Start queriers.
142+
require.NoError(t, s.StartAndWaitReady(querier))
143+
require.NoError(t, s.WaitReady(queryFrontend))
144+
if cfg.shuffleShardingEnabled {
145+
require.NoError(t, s.StartAndWaitReady(querier2))
146+
}
147+
148+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
149+
if cfg.shuffleShardingEnabled {
150+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
151+
}
152+
153+
// wait to update knownUsers
154+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
155+
if cfg.shuffleShardingEnabled {
156+
require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
157+
}
158+
159+
// query all tenants
160+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+")
161+
require.NoError(t, err)
162+
163+
result, err := c.Query("series_1", now)
164+
require.NoError(t, err)
165+
166+
assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))
167+
168+
// ensure a push to multiple tenants is failing
169+
series, _ := generateSeries("series_1", now)
170+
res, err := c.Push(series)
171+
require.NoError(t, err)
172+
173+
require.Equal(t, 500, res.StatusCode)
174+
175+
// check metric label values for total queries in the query frontend
176+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
177+
labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"),
178+
labels.MustNewMatcher(labels.MatchEqual, "op", "query"))))
179+
180+
// check metric label values for query queue length in either query frontend or query scheduler
181+
queueComponent := queryFrontend
182+
queueMetricName := "cortex_query_frontend_queue_length"
183+
if cfg.querySchedulerEnabled {
184+
queueComponent = queryScheduler
185+
queueMetricName = "cortex_query_scheduler_queue_length"
186+
}
187+
require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers(
188+
labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"))))
50189
}
51190

52191
func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) {

pkg/cortex/modules.go

+19
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/cortexproject/cortex/pkg/scheduler"
5151
"github.com/cortexproject/cortex/pkg/storage/bucket"
5252
"github.com/cortexproject/cortex/pkg/storegateway"
53+
"github.com/cortexproject/cortex/pkg/tenant"
5354
"github.com/cortexproject/cortex/pkg/util/grpcclient"
5455
util_log "github.com/cortexproject/cortex/pkg/util/log"
5556
"github.com/cortexproject/cortex/pkg/util/modules"
@@ -280,6 +281,14 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
280281
// single tenant. This allows for a less impactful enabling of tenant
281282
// federation.
282283
byPassForSingleQuerier := true
284+
if t.Cfg.TenantFederation.RegexMatcherEnabled {
285+
util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled")
286+
// If regex matcher enabled, we should set the byPassForSingleQuerier as false
287+
// because if the # of matched tenantIDs is only one, `X-Scope-OrgID` header is
288+
// set to input regex.
289+
byPassForSingleQuerier = false
290+
tenant.WithDefaultResolver(tenantfederation.NewRegexResolver(prometheus.DefaultRegisterer, t.Cfg.TenantFederation.UserSyncInterval, util_log.Logger, t.Distributor.AllUserStats))
291+
}
283292
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
284293
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)
285294
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)
@@ -486,6 +495,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
486495
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
487496
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
488497

498+
if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
499+
// If regex matcher enabled, we use regex validator to pass regex to the querier
500+
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
501+
}
502+
489503
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
490504
t.Cfg.QueryRange,
491505
util_log.Logger,
@@ -760,6 +774,11 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) {
760774
}
761775

762776
func (t *Cortex) initQueryScheduler() (services.Service, error) {
777+
if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
778+
// If regex matcher enabled, we use regex validator to pass regex to the querier
779+
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
780+
}
781+
763782
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
764783
if err != nil {
765784
return nil, errors.Wrap(err, "query-scheduler init")

pkg/querier/tenantfederation/exemplar_merge_queryable_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"errors"
66
"strings"
77
"testing"
8+
"time"
89

10+
"github.com/go-kit/log"
911
"github.com/prometheus/client_golang/prometheus"
1012
"github.com/prometheus/client_golang/prometheus/testutil"
1113
"github.com/prometheus/prometheus/model/exemplar"
@@ -14,7 +16,9 @@ import (
1416
"github.com/stretchr/testify/require"
1517
"github.com/weaveworks/common/user"
1618

19+
"github.com/cortexproject/cortex/pkg/ingester"
1720
"github.com/cortexproject/cortex/pkg/tenant"
21+
"github.com/cortexproject/cortex/pkg/util/test"
1822
)
1923

2024
var (
@@ -311,6 +315,112 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) {
311315
}
312316
}
313317

318+
func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) {
319+
// set a regex tenant resolver
320+
reg := prometheus.NewRegistry()
321+
userStat := ingester.UserStats{}
322+
regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), func(ctx context.Context) ([]ingester.UserIDStats, error) {
323+
return []ingester.UserIDStats{
324+
{UserID: "user-1", UserStats: userStat},
325+
{UserID: "user-2", UserStats: userStat},
326+
}, nil
327+
})
328+
tenant.WithDefaultResolver(regexResolver)
329+
330+
// wait update knownUsers
331+
test.Poll(t, time.Second*10, true, func() interface{} {
332+
return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0
333+
})
334+
335+
tests := []struct {
336+
name string
337+
upstream mockExemplarQueryable
338+
matcher [][]*labels.Matcher
339+
orgId string
340+
expectedResult []exemplar.QueryResult
341+
expectedErr error
342+
expectedMetrics string
343+
}{
344+
{
345+
name: "result labels should contains __tenant_id__ even if one tenant is queried",
346+
upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{
347+
"user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()},
348+
"user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()},
349+
}},
350+
matcher: [][]*labels.Matcher{{
351+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"),
352+
}},
353+
orgId: ".+-1",
354+
expectedResult: []exemplar.QueryResult{
355+
{
356+
SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"),
357+
Exemplars: []exemplar.Exemplar{
358+
{
359+
Labels: labels.FromStrings("traceID", "123"),
360+
Value: 123,
361+
Ts: 1734942337900,
362+
},
363+
},
364+
},
365+
},
366+
expectedMetrics: expectedSingleTenantsExemplarMetrics,
367+
},
368+
{
369+
name: "two tenants results should be aggregated",
370+
upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{
371+
"user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()},
372+
"user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()},
373+
}},
374+
matcher: [][]*labels.Matcher{{
375+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"),
376+
}},
377+
orgId: "user-.+",
378+
expectedResult: []exemplar.QueryResult{
379+
{
380+
SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"),
381+
Exemplars: []exemplar.Exemplar{
382+
{
383+
Labels: labels.FromStrings("traceID", "123"),
384+
Value: 123,
385+
Ts: 1734942337900,
386+
},
387+
},
388+
},
389+
{
390+
SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-2"),
391+
Exemplars: []exemplar.Exemplar{
392+
{
393+
Labels: labels.FromStrings("traceID", "456"),
394+
Value: 456,
395+
Ts: 1734942338000,
396+
},
397+
},
398+
},
399+
},
400+
expectedMetrics: expectedTwoTenantsExemplarMetrics,
401+
},
402+
}
403+
404+
for _, test := range tests {
405+
t.Run(test.name, func(t *testing.T) {
406+
reg := prometheus.NewPedanticRegistry()
407+
exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, false, reg)
408+
ctx := user.InjectOrgID(context.Background(), test.orgId)
409+
q, err := exemplarQueryable.ExemplarQuerier(ctx)
410+
require.NoError(t, err)
411+
412+
result, err := q.Select(mint, maxt, test.matcher...)
413+
if test.expectedErr != nil {
414+
require.Error(t, err)
415+
} else {
416+
require.NoError(t, err)
417+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_exemplar_query"))
418+
require.Equal(t, test.expectedResult, result)
419+
}
420+
})
421+
}
422+
}
423+
314424
func Test_filterAllTenantsAndMatchers(t *testing.T) {
315425
idLabelName := defaultTenantLabel
316426

0 commit comments

Comments
 (0)