Skip to content

Commit 1a3d2f1

Browse files
committed
Introduce a regex tenant resolver
Signed-off-by: SungJin1212 <[email protected]>
1 parent be90c4a commit 1a3d2f1

13 files changed

+1022
-82
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
1212
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1313
* [FEATURE] Ruler: Add support for group labels. #6665
14+
* [FEATURE] Query federation: Introduce a regex tenant resolver to allow regex in `X-Scope-OrgID` value. #6713
15+
- Add an experimental `tenant-federation.regex-matcher-enabled` flag. If it enabled, user can input regex to `X-Scope-OrgId`, the matched tenantIDs are automatically involved. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h).
16+
- Add an experimental `tenant-federation.user-sync-interval` flag, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs.
1417
* [FEATURE] Experimental Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet and Parquet Queryable. #6716 #6743
1518
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1619
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738

docs/configuration/config-file-reference.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,22 @@ tenant_federation:
181181
# CLI flag: -tenant-federation.max-tenant
182182
[max_tenant: <int> | default = 0]
183183

184+
# [Experimental] If enabled, the `X-Scope-OrgID` header value can accept a
185+
# regex and the matched tenantIDs are automatically involved. The regex
186+
# matching rule follows the Prometheus, see the detail:
187+
# https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions.
188+
# The user discovery is based on scanning block storage, so new users can get
189+
# queries after uploading a block (generally 2h).
190+
# CLI flag: -tenant-federation.regex-matcher-enabled
191+
[regex_matcher_enabled: <boolean> | default = false]
192+
193+
# [Experimental] If the regex matcher is enabled, it specifies how frequently
194+
# to scan users. The scanned users are used to calculate matched tenantIDs.
195+
# The scanning strategy depends on the
196+
# `-blocks-storage.users-scanner.strategy`.
197+
# CLI flag: -tenant-federation.user-sync-interval
198+
[user_sync_interval: <duration> | default = 5m]
199+
184200
# The ruler_config configures the Cortex ruler.
185201
[ruler: <ruler_config>]
186202

docs/configuration/v1-guarantees.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ Currently experimental features are:
6666
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
6767
- Blocks storage user index
6868
- Querier: tenant federation
69+
- `-tenant-federation.enabled`
70+
- `-tenant-federation.regex-matcher-enabled`
71+
- `-tenant-federation.user-sync-interval`
6972
- The thanosconvert tool for converting Thanos block metadata to Cortex
7073
- HA Tracker: cleanup of old replicas from KV Store.
7174
- Instance limits in ingester and distributor

integration/querier_tenant_federation_test.go

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,255 @@ type querierTenantFederationConfig struct {
2828

2929
func TestQuerierTenantFederation(t *testing.T) {
3030
runQuerierTenantFederationTest(t, querierTenantFederationConfig{})
31+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{})
3132
}
3233

3334
func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) {
3435
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
3536
querySchedulerEnabled: true,
3637
})
38+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
39+
querySchedulerEnabled: true,
40+
})
3741
}
3842

3943
func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) {
4044
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
4145
shuffleShardingEnabled: true,
4246
})
47+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
48+
shuffleShardingEnabled: true,
49+
})
4350
}
4451

4552
func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) {
4653
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
4754
querySchedulerEnabled: true,
4855
shuffleShardingEnabled: true,
4956
})
57+
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
58+
querySchedulerEnabled: true,
59+
shuffleShardingEnabled: true,
60+
})
61+
}
62+
63+
func TestRegexResolver_NewlyCreatedTenant(t *testing.T) {
64+
const blockRangePeriod = 5 * time.Second
65+
66+
s, err := e2e.NewScenario(networkName)
67+
require.NoError(t, err)
68+
defer s.Close()
69+
70+
consul := e2edb.NewConsulWithName("consul")
71+
require.NoError(t, s.StartAndWaitReady(consul))
72+
73+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
74+
"-querier.cache-results": "true",
75+
"-querier.split-queries-by-interval": "24h",
76+
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
77+
"-tenant-federation.enabled": "true",
78+
"-tenant-federation.regex-matcher-enabled": "true",
79+
80+
// to upload block quickly
81+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
82+
"-blocks-storage.tsdb.ship-interval": "1s",
83+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
84+
85+
// store gateway
86+
"-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(),
87+
"-querier.max-fetched-series-per-query": "1",
88+
})
89+
90+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
91+
require.NoError(t, s.StartAndWaitReady(minio))
92+
93+
// Start ingester and distributor.
94+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
95+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
96+
require.NoError(t, s.StartAndWaitReady(ingester, distributor))
97+
98+
// Wait until distributor have updated the ring.
99+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
100+
101+
// Start the query-frontend.
102+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
103+
require.NoError(t, s.Start(queryFrontend))
104+
105+
// Start the querier
106+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
107+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
108+
}), "")
109+
110+
// Start queriers.
111+
require.NoError(t, s.StartAndWaitReady(querier))
112+
require.NoError(t, s.WaitReady(queryFrontend))
113+
114+
now := time.Now()
115+
series, expectedVector := generateSeries("series_1", now)
116+
117+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
118+
require.NoError(t, err)
119+
120+
res, err := c.Push(series)
121+
require.NoError(t, err)
122+
require.Equal(t, 200, res.StatusCode)
123+
124+
result, err := c.Query("series_1", now)
125+
require.NoError(t, err)
126+
require.Equal(t, model.ValVector, result.Type())
127+
require.Equal(t, expectedVector, result.(model.Vector))
128+
}
129+
130+
func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) {
131+
const numUsers = 10
132+
const blockRangePeriod = 5 * time.Second
133+
134+
s, err := e2e.NewScenario(networkName)
135+
require.NoError(t, err)
136+
defer s.Close()
137+
138+
memcached := e2ecache.NewMemcached()
139+
consul := e2edb.NewConsul()
140+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
141+
142+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
143+
"-querier.cache-results": "true",
144+
"-querier.split-queries-by-interval": "24h",
145+
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
146+
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
147+
"-tenant-federation.enabled": "true",
148+
"-tenant-federation.regex-matcher-enabled": "true",
149+
"-tenant-federation.user-sync-interval": "1s",
150+
151+
// to upload block quickly
152+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
153+
"-blocks-storage.tsdb.ship-interval": "1s",
154+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
155+
156+
// store gateway
157+
"-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(),
158+
"-querier.max-fetched-series-per-query": "1",
159+
})
160+
161+
// Start the query-scheduler if enabled.
162+
var queryScheduler *e2ecortex.CortexService
163+
if cfg.querySchedulerEnabled {
164+
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
165+
require.NoError(t, s.StartAndWaitReady(queryScheduler))
166+
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
167+
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
168+
}
169+
170+
if cfg.shuffleShardingEnabled {
171+
// Use only single querier for each user.
172+
flags["-frontend.max-queriers-per-tenant"] = "1"
173+
}
174+
175+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
176+
require.NoError(t, s.StartAndWaitReady(minio))
177+
178+
// Start ingester and distributor.
179+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
180+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
181+
require.NoError(t, s.StartAndWaitReady(ingester, distributor))
182+
183+
// Wait until distributor have updated the ring.
184+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
185+
186+
// Push a series for each user to Cortex.
187+
now := time.Now()
188+
expectedVectors := make([]model.Vector, numUsers)
189+
tenantIDs := make([]string, numUsers)
190+
191+
for u := 0; u < numUsers; u++ {
192+
tenantIDs[u] = fmt.Sprintf("user-%d", u)
193+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u])
194+
require.NoError(t, err)
195+
196+
var series []prompb.TimeSeries
197+
series, expectedVectors[u] = generateSeries("series_1", now)
198+
// To ship series_1 block
199+
series2, _ := generateSeries("series_2", now.Add(blockRangePeriod*2))
200+
201+
res, err := c.Push(series)
202+
require.NoError(t, err)
203+
require.Equal(t, 200, res.StatusCode)
204+
205+
res, err = c.Push(series2)
206+
require.NoError(t, err)
207+
require.Equal(t, 200, res.StatusCode)
208+
}
209+
210+
// Start the query-frontend.
211+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
212+
require.NoError(t, s.Start(queryFrontend))
213+
214+
if !cfg.querySchedulerEnabled {
215+
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
216+
}
217+
218+
// Start the querier and store-gateway
219+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
220+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
221+
222+
var querier2 *e2ecortex.CortexService
223+
if cfg.shuffleShardingEnabled {
224+
querier2 = e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
225+
}
226+
227+
// Start queriers.
228+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
229+
require.NoError(t, s.WaitReady(queryFrontend))
230+
if cfg.shuffleShardingEnabled {
231+
require.NoError(t, s.StartAndWaitReady(querier2))
232+
}
233+
234+
// Wait until the querier and store-gateway have updated ring
235+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
236+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
237+
if cfg.shuffleShardingEnabled {
238+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
239+
}
240+
241+
// wait to upload blocks
242+
require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics))
243+
244+
// wait to update knownUsers
245+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
246+
if cfg.shuffleShardingEnabled {
247+
require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
248+
}
249+
250+
// query all tenants
251+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+")
252+
require.NoError(t, err)
253+
254+
result, err := c.Query("series_1", now)
255+
require.NoError(t, err)
256+
257+
assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))
258+
259+
// ensure a push to multiple tenants is failing
260+
series, _ := generateSeries("series_1", now)
261+
res, err := c.Push(series)
262+
require.NoError(t, err)
263+
264+
require.Equal(t, 500, res.StatusCode)
265+
266+
// check metric label values for total queries in the query frontend
267+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
268+
labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"),
269+
labels.MustNewMatcher(labels.MatchEqual, "op", "query"))))
270+
271+
// check metric label values for query queue length in either query frontend or query scheduler
272+
queueComponent := queryFrontend
273+
queueMetricName := "cortex_query_frontend_queue_length"
274+
if cfg.querySchedulerEnabled {
275+
queueComponent = queryScheduler
276+
queueMetricName = "cortex_query_scheduler_queue_length"
277+
}
278+
require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers(
279+
labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"))))
50280
}
51281

52282
func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) {

pkg/cortex/modules.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"github.com/cortexproject/cortex/pkg/scheduler"
5252
"github.com/cortexproject/cortex/pkg/storage/bucket"
5353
"github.com/cortexproject/cortex/pkg/storegateway"
54+
"github.com/cortexproject/cortex/pkg/tenant"
5455
"github.com/cortexproject/cortex/pkg/util/grpcclient"
5556
util_log "github.com/cortexproject/cortex/pkg/util/log"
5657
"github.com/cortexproject/cortex/pkg/util/modules"
@@ -282,10 +283,30 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
282283
// single tenant. This allows for a less impactful enabling of tenant
283284
// federation.
284285
byPassForSingleQuerier := true
286+
285287
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
286288
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)
287289
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)
290+
291+
if t.Cfg.TenantFederation.RegexMatcherEnabled {
292+
util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled")
293+
294+
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
295+
return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, prometheus.DefaultRegisterer)
296+
}
297+
298+
regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, prometheus.DefaultRegisterer, bucketClientFactory, t.Cfg.TenantFederation.UserSyncInterval, util_log.Logger)
299+
if err != nil {
300+
return nil, fmt.Errorf("failed to initialize regex resolver: %v", err)
301+
}
302+
tenant.WithDefaultResolver(regexResolver)
303+
304+
return regexResolver, nil
305+
}
306+
307+
return nil, nil
288308
}
309+
289310
return nil, nil
290311
}
291312

@@ -497,6 +518,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
497518
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
498519
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
499520

521+
if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
522+
// If regex matcher enabled, we use regex validator to pass regex to the querier
523+
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
524+
}
525+
500526
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
501527
t.Cfg.QueryRange,
502528
util_log.Logger,
@@ -776,6 +802,11 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) {
776802
}
777803

778804
func (t *Cortex) initQueryScheduler() (services.Service, error) {
805+
if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
806+
// If regex matcher enabled, we use regex validator to pass regex to the querier
807+
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
808+
}
809+
779810
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
780811
if err != nil {
781812
return nil, errors.Wrap(err, "query-scheduler init")

0 commit comments

Comments
 (0)