Skip to content

Commit 01a64a4

Browse files
committed
adding tests / hiding documentation for now
Signed-off-by: alanprot <[email protected]>
1 parent adca9f7 commit 01a64a4

File tree

7 files changed

+252
-112
lines changed

7 files changed

+252
-112
lines changed

docs/configuration/config-file-reference.md

-92
Original file line numberDiff line numberDiff line change
@@ -152,96 +152,6 @@ api:
152152
# The compactor_config configures the compactor for the blocks storage.
153153
[compactor: <compactor_config>]
154154

155-
parquet_converter:
156-
# Comma separated list of tenants that can be converted. If specified, only
157-
# these tenants will be converted, otherwise all tenants can be converted.
158-
# CLI flag: -parquet-converter.enabled-tenants
159-
[enabled_tenants: <string> | default = ""]
160-
161-
# Comma separated list of tenants that cannot converted.
162-
# CLI flag: -parquet-converter.disabled-tenants
163-
[disabled_tenants: <string> | default = ""]
164-
165-
ring:
166-
kvstore:
167-
# Backend storage to use for the ring. Supported values are: consul, etcd,
168-
# inmemory, memberlist, multi.
169-
# CLI flag: -parquet-converter.ring.store
170-
[store: <string> | default = "consul"]
171-
172-
# The prefix for the keys in the store. Should end with a /.
173-
# CLI flag: -parquet-converter.ring.prefix
174-
[prefix: <string> | default = "collectors/"]
175-
176-
dynamodb:
177-
# Region to access dynamodb.
178-
# CLI flag: -parquet-converter.ring.dynamodb.region
179-
[region: <string> | default = ""]
180-
181-
# Table name to use on dynamodb.
182-
# CLI flag: -parquet-converter.ring.dynamodb.table-name
183-
[table_name: <string> | default = ""]
184-
185-
# Time to expire items on dynamodb.
186-
# CLI flag: -parquet-converter.ring.dynamodb.ttl-time
187-
[ttl: <duration> | default = 0s]
188-
189-
# Time to refresh local ring with information on dynamodb.
190-
# CLI flag: -parquet-converter.ring.dynamodb.puller-sync-time
191-
[puller_sync_time: <duration> | default = 1m]
192-
193-
# Maximum number of retries for DDB KV CAS.
194-
# CLI flag: -parquet-converter.ring.dynamodb.max-cas-retries
195-
[max_cas_retries: <int> | default = 10]
196-
197-
# Timeout of dynamoDbClient requests. Default is 2m.
198-
# CLI flag: -parquet-converter.ring.dynamodb.timeout
199-
[timeout: <duration> | default = 2m]
200-
201-
# The consul_config configures the consul client.
202-
# The CLI flags prefix for this block config is: parquet-converter.ring
203-
[consul: <consul_config>]
204-
205-
# The etcd_config configures the etcd client.
206-
# The CLI flags prefix for this block config is: parquet-converter.ring
207-
[etcd: <etcd_config>]
208-
209-
multi:
210-
# Primary backend storage used by multi-client.
211-
# CLI flag: -parquet-converter.ring.multi.primary
212-
[primary: <string> | default = ""]
213-
214-
# Secondary backend storage used by multi-client.
215-
# CLI flag: -parquet-converter.ring.multi.secondary
216-
[secondary: <string> | default = ""]
217-
218-
# Mirror writes to secondary store.
219-
# CLI flag: -parquet-converter.ring.multi.mirror-enabled
220-
[mirror_enabled: <boolean> | default = false]
221-
222-
# Timeout for storing value to secondary store.
223-
# CLI flag: -parquet-converter.ring.multi.mirror-timeout
224-
[mirror_timeout: <duration> | default = 2s]
225-
226-
# Period at which to heartbeat to the ring. 0 = disabled.
227-
# CLI flag: -parquet-converter.ring.heartbeat-period
228-
[heartbeat_period: <duration> | default = 5s]
229-
230-
# The heartbeat timeout after which parquet-converter are considered
231-
# unhealthy within the ring. 0 = never (timeout disabled).
232-
# CLI flag: -parquet-converter.ring.heartbeat-timeout
233-
[heartbeat_timeout: <duration> | default = 1m]
234-
235-
# Time since last heartbeat before parquet-converter will be removed from
236-
# ring. 0 to disable
237-
# CLI flag: -parquet-converter.auto-forget-delay
238-
[auto_forget_delay: <duration> | default = 2m]
239-
240-
# File path where tokens are stored. If empty, tokens are not stored at
241-
# shutdown and restored at startup.
242-
# CLI flag: -parquet-converter.ring.tokens-file-path
243-
[tokens_file_path: <string> | default = ""]
244-
245155
# The store_gateway_config configures the store-gateway service used by the
246156
# blocks storage.
247157
[store_gateway: <store_gateway_config>]
@@ -2589,7 +2499,6 @@ The `consul_config` configures the consul client. The supported CLI flags `<pref
25892499
- `compactor.ring`
25902500
- `distributor.ha-tracker`
25912501
- `distributor.ring`
2592-
- `parquet-converter.ring`
25932502
- `ruler.ring`
25942503
- `store-gateway.sharding-ring`
25952504

@@ -2906,7 +2815,6 @@ The `etcd_config` configures the etcd client. The supported CLI flags `<prefix>`
29062815
- `compactor.ring`
29072816
- `distributor.ha-tracker`
29082817
- `distributor.ring`
2909-
- `parquet-converter.ring`
29102818
- `ruler.ring`
29112819
- `store-gateway.sharding-ring`
29122820

pkg/cortex/cortex.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ type Config struct {
114114
QueryRange queryrange.Config `yaml:"query_range"`
115115
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
116116
Compactor compactor.Config `yaml:"compactor"`
117-
ParquetConverter parquetconverter.Config `yaml:"parquet_converter"`
117+
ParquetConverter parquetconverter.Config `yaml:"parquet_converter" doc:"hidden"`
118118
StoreGateway storegateway.Config `yaml:"store_gateway"`
119119
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
120120

pkg/cortex/modules.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -696,8 +696,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
696696

697697
func (t *Cortex) initParquetConverter() (serv services.Service, err error) {
698698
t.Cfg.ParquetConverter.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
699-
t.Parquetconverter = parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, t.Cfg.Compactor.BlockRanges.ToMilliseconds(), util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)
700-
return t.Parquetconverter, nil
699+
return parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, t.Cfg.Compactor.BlockRanges.ToMilliseconds(), util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)
701700
}
702701

703702
func (t *Cortex) initCompactor() (serv services.Service, err error) {

pkg/parquetconverter/converter.go

+14-17
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type Config struct {
4747
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
4848
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
4949
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
50+
ConversionInterval time.Duration `yaml:"conversion_interval"`
5051

5152
DataDir string `yaml:"data_dir"`
5253

@@ -70,7 +71,7 @@ type Converter struct {
7071
ringSubservices *services.Manager
7172
ringSubservicesWatcher *services.FailureWatcher
7273

73-
bkt objstore.InstrumentedBucket
74+
bkt objstore.Bucket
7475

7576
// chunk pool
7677
pool chunkenc.Pool
@@ -88,9 +89,17 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8889
f.Var(&cfg.DisabledTenants, "parquet-converter.disabled-tenants", "Comma separated list of tenants that cannot converted.")
8990
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
9091
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
92+
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
9193
}
9294

93-
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) *Converter {
95+
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
96+
bkt, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "parquet-converter", logger, registerer)
97+
98+
return newConverter(cfg, bkt, storageCfg, blockRanges, logger, registerer, limits), err
99+
}
100+
101+
func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) *Converter {
102+
bkt = bucketindex.BucketWithGlobalMarkers(bkt)
94103
c := &Converter{
95104
cfg: cfg,
96105
reg: registerer,
@@ -101,24 +110,17 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
101110
pool: chunkenc.NewPool(),
102111
blockRanges: blockRanges,
103112
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
113+
bkt: bkt,
104114
}
105115

106116
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
107117
return c
108118
}
109119

110120
func (c *Converter) starting(ctx context.Context) error {
111-
bkt, err := bucket.NewClient(ctx, c.storageCfg.Bucket, nil, "parquet-converter", c.logger, c.reg)
112-
bkt = bucketindex.BucketWithGlobalMarkers(bkt)
113-
114-
if err != nil {
115-
return err
116-
}
117-
118-
c.bkt = bkt
119121
lifecyclerCfg := c.cfg.Ring.ToLifecyclerConfig()
122+
var err error
120123
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "parquet-converter", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.reg))
121-
122124
if err != nil {
123125
return errors.Wrap(err, "unable to initialize converter ring lifecycler")
124126
}
@@ -152,7 +154,7 @@ func (c *Converter) starting(ctx context.Context) error {
152154

153155
func (c *Converter) running(ctx context.Context) error {
154156
level.Info(c.logger).Log("msg", "parquet-converter started")
155-
t := time.NewTicker(time.Second * 10)
157+
t := time.NewTicker(c.cfg.ConversionInterval)
156158
defer t.Stop()
157159
for {
158160
select {
@@ -187,7 +189,6 @@ func (c *Converter) running(ctx context.Context) error {
187189
level.Info(userLogger).Log("msg", "scanned user", "user", userID)
188190

189191
err = c.convertUser(ctx, userLogger, ring, userID)
190-
191192
if err != nil {
192193
level.Error(userLogger).Log("msg", "failed to convert user", "user", userID, "err", err)
193194
}
@@ -216,7 +217,6 @@ func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) {
216217
}
217218

218219
func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring ring.ReadRing, userID string) error {
219-
220220
uBucket := bucket.NewUserBucketClient(userID, c.bkt, c.limits)
221221

222222
var blockLister block.Lister
@@ -253,7 +253,6 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
253253
c.fetcherMetrics,
254254
[]block.MetadataFilter{ignoreDeletionMarkFilter},
255255
)
256-
257256
if err != nil {
258257
return errors.Wrap(err, "error creating block fetcher")
259258
}
@@ -275,7 +274,6 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
275274
}
276275

277276
marker, err := cortex_parquet.ReadConverterMark(ctx, b.ULID, uBucket, logger)
278-
279277
if err != nil {
280278
level.Error(logger).Log("msg", "failed to read marker", "block", b.ULID.String(), "err", err)
281279
continue
@@ -304,7 +302,6 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
304302
}
305303

306304
tsdbBlock, err := tsdb.OpenBlock(logutil.GoKitLogToSlog(logger), bdir, c.pool, tsdb.DefaultPostingsDecoderFactory)
307-
308305
if err != nil {
309306
level.Error(logger).Log("msg", "Error opening block", "err", err)
310307
continue
+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package parquetconverter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"testing"
8+
"time"
9+
10+
"github.com/cortexproject/cortex/integration/e2e"
11+
"github.com/cortexproject/cortex/pkg/ring"
12+
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
13+
"github.com/cortexproject/cortex/pkg/storage/bucket"
14+
"github.com/cortexproject/cortex/pkg/storage/parquet"
15+
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
16+
"github.com/cortexproject/cortex/pkg/util/concurrency"
17+
"github.com/cortexproject/cortex/pkg/util/flagext"
18+
"github.com/cortexproject/cortex/pkg/util/services"
19+
"github.com/cortexproject/cortex/pkg/util/test"
20+
"github.com/cortexproject/cortex/pkg/util/validation"
21+
"github.com/go-kit/log"
22+
"github.com/oklog/ulid"
23+
"github.com/prometheus/client_golang/prometheus"
24+
"github.com/prometheus/prometheus/model/labels"
25+
"github.com/prometheus/prometheus/tsdb"
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
"github.com/thanos-io/objstore"
29+
"github.com/thanos-io/thanos/pkg/block"
30+
"github.com/thanos-io/thanos/pkg/block/metadata"
31+
)
32+
33+
func TestConverter(t *testing.T) {
34+
cfg := prepareConfig()
35+
user := "user"
36+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
37+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
38+
dir := t.TempDir()
39+
40+
cfg.Ring.InstanceID = "compactor-1"
41+
cfg.Ring.InstanceAddr = "1.2.3.4"
42+
cfg.Ring.KVStore.Mock = ringStore
43+
bucketClient := objstore.NewInMemBucket()
44+
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
45+
46+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), nil)
47+
48+
ctx := context.Background()
49+
50+
lbls := labels.Labels{labels.Label{
51+
Name: "__name__",
52+
Value: "test",
53+
}}
54+
55+
blocks := []ulid.ULID{}
56+
// Create blocks
57+
for _, duration := range []time.Duration{2 * time.Hour, 24 * time.Hour} {
58+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
59+
id, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, duration.Milliseconds(), time.Minute.Milliseconds(), 10)
60+
require.NoError(t, err)
61+
blocks = append(blocks, id)
62+
}
63+
64+
for _, bIds := range blocks {
65+
blockDir := fmt.Sprintf("%s/%s", dir, bIds.String())
66+
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
67+
require.NoError(t, err)
68+
fmt.Println(b.Dir())
69+
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
70+
require.NoError(t, err)
71+
}
72+
73+
// Try to start the compactor with a bad consul kv-store. The
74+
err := services.StartAndAwaitRunning(context.Background(), c)
75+
require.NoError(t, err)
76+
77+
blocksConverted := []ulid.ULID{}
78+
79+
test.Poll(t, 3*time.Minute, 1, func() interface{} {
80+
blocksConverted = blocksConverted[:0]
81+
for _, bIds := range blocks {
82+
m, err := parquet.ReadConverterMark(ctx, bIds, userBucket, logger)
83+
require.NoError(t, err)
84+
if m.Version == parquet.CurrentVersion {
85+
blocksConverted = append(blocksConverted, bIds)
86+
}
87+
}
88+
return len(blocksConverted)
89+
})
90+
91+
// Verify all files are there
92+
for _, block := range blocksConverted {
93+
for _, file := range []string{
94+
fmt.Sprintf("%s/parquet-converter-mark.json", block.String()),
95+
fmt.Sprintf("markers/%s-parquet-converter-mark.json", block.String()),
96+
fmt.Sprintf("%s/0.chunks.parquet", block.String()),
97+
fmt.Sprintf("%s/0.labels.parquet", block.String()),
98+
} {
99+
ok, err := userBucket.Exists(ctx, file)
100+
require.NoError(t, err)
101+
require.True(t, ok)
102+
}
103+
}
104+
}
105+
106+
func prepareConfig() Config {
107+
cfg := Config{}
108+
flagext.DefaultValues(&cfg)
109+
cfg.ConversionInterval = time.Second
110+
return cfg
111+
}
112+
113+
func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Converter, log.Logger, prometheus.Gatherer) {
114+
storageCfg := cortex_tsdb.BlocksStorageConfig{}
115+
blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
116+
flagext.DefaultValues(&storageCfg)
117+
storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery)
118+
119+
// Create a temporary directory for compactor data.
120+
cfg.DataDir = t.TempDir()
121+
122+
logs := &concurrency.SyncBuffer{}
123+
logger := log.NewLogfmtLogger(logs)
124+
registry := prometheus.NewRegistry()
125+
126+
if limits == nil {
127+
limits = &validation.Limits{}
128+
flagext.DefaultValues(limits)
129+
}
130+
131+
overrides, err := validation.NewOverrides(*limits, nil)
132+
require.NoError(t, err)
133+
134+
c := newConverter(cfg, bucketClient, storageCfg, blockRanges.ToMilliseconds(), logger, registry, overrides)
135+
return c, logger, registry
136+
}

0 commit comments

Comments
 (0)