Skip to content

Commit 040e699

Browse files
committed
Using MetaFetcher to discover the blocks
Signed-off-by: alanprot <[email protected]>
1 parent 35719a4 commit 040e699

File tree

1 file changed

+155
-112
lines changed

1 file changed

+155
-112
lines changed

pkg/parquetconverter/converter.go

+155-112
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ import (
1010
"strings"
1111
"time"
1212

13-
"github.com/parquet-go/parquet-go"
14-
1513
"github.com/go-kit/log"
1614
"github.com/go-kit/log/level"
15+
"github.com/parquet-go/parquet-go"
1716
"github.com/pkg/errors"
1817
"github.com/prometheus-community/parquet-common/convert"
1918
"github.com/prometheus/client_golang/prometheus"
@@ -22,6 +21,7 @@ import (
2221
"github.com/prometheus/prometheus/tsdb/chunkenc"
2322
"github.com/thanos-io/objstore"
2423
"github.com/thanos-io/thanos/pkg/block"
24+
"github.com/thanos-io/thanos/pkg/block/metadata"
2525
"github.com/thanos-io/thanos/pkg/logutil"
2626

2727
"github.com/cortexproject/cortex/pkg/ring"
@@ -43,9 +43,11 @@ const (
4343
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
4444

4545
type Config struct {
46-
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
47-
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
48-
DataDir string `yaml:"data_dir"`
46+
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
47+
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
48+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
49+
50+
DataDir string `yaml:"data_dir"`
4951

5052
Ring RingConfig `yaml:"ring"`
5153
}
@@ -61,9 +63,6 @@ type Converter struct {
6163
allowedTenants *util.AllowedTenants
6264
limits *validation.Overrides
6365

64-
// Blocks loader
65-
loader *bucketindex.Loader
66-
6766
// Ring used for sharding compactions.
6867
ringLifecycler *ring.Lifecycler
6968
ring *ring.Ring
@@ -77,6 +76,8 @@ type Converter struct {
7776

7877
// compaction block ranges
7978
blockRanges []int64
79+
80+
fetcherMetrics *block.FetcherMetrics
8081
}
8182

8283
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -85,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8586
f.Var(&cfg.EnabledTenants, "parquet-converter.enabled-tenants", "Comma separated list of tenants that can be converted. If specified, only these tenants will be converted, otherwise all tenants can be converted.")
8687
f.Var(&cfg.DisabledTenants, "parquet-converter.disabled-tenants", "Comma separated list of tenants that cannot converted.")
8788
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
89+
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
8890
}
8991

9092
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) *Converter {
@@ -97,6 +99,7 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
9799
limits: limits,
98100
pool: chunkenc.NewPool(),
99101
blockRanges: blockRanges,
102+
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
100103
}
101104

102105
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
@@ -109,15 +112,6 @@ func (c *Converter) starting(ctx context.Context) error {
109112
return err
110113
}
111114

112-
indexLoaderConfig := bucketindex.LoaderConfig{
113-
CheckInterval: time.Minute,
114-
UpdateOnStaleInterval: c.storageCfg.BucketStore.SyncInterval,
115-
UpdateOnErrorInterval: c.storageCfg.BucketStore.BucketIndex.UpdateOnErrorInterval,
116-
IdleTimeout: c.storageCfg.BucketStore.BucketIndex.IdleTimeout,
117-
}
118-
119-
c.loader = bucketindex.NewLoader(indexLoaderConfig, bkt, c.limits, util_log.Logger, prometheus.DefaultRegisterer)
120-
121115
c.bkt = bkt
122116
lifecyclerCfg := c.cfg.Ring.ToLifecyclerConfig()
123117
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "parquet-converter", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.reg))
@@ -150,14 +144,6 @@ func (c *Converter) starting(ctx context.Context) error {
150144
return err
151145
}
152146

153-
if err := c.loader.StartAsync(context.Background()); err != nil {
154-
return errors.Wrap(err, "failed to start loader")
155-
}
156-
157-
if err := c.loader.AwaitRunning(ctx); err != nil {
158-
return errors.Wrap(err, "failed to start loader")
159-
}
160-
161147
return nil
162148
}
163149

@@ -177,97 +163,30 @@ func (c *Converter) running(ctx context.Context) error {
177163
continue
178164
}
179165
for _, userID := range users {
180-
owned, err := c.ownUser(userID)
181-
if err != nil {
182-
level.Error(c.logger).Log("msg", "failed to check if user is owned by the user", "user", userID, "err", err)
183-
continue
184-
}
185-
if !owned {
186-
level.Info(c.logger).Log("msg", "user not owned", "user", userID)
187-
continue
188-
}
189-
level.Info(c.logger).Log("msg", "scanned user", "user", userID)
190-
userLogger := util_log.WithUserID(userID, c.logger)
166+
191167
var ring ring.ReadRing
192168
ring = c.ring
193169
if c.limits.ParquetConverterTenantShardSize(userID) > 0 {
194170
ring = c.ring.ShuffleShard(userID, c.limits.ParquetConverterTenantShardSize(userID))
195171
}
196172

197-
idx, _, err := c.loader.GetIndex(ctx, userID)
173+
userLogger := util_log.WithUserID(userID, c.logger)
174+
175+
owned, err := c.ownUser(ring, userID)
198176
if err != nil {
199-
level.Error(userLogger).Log("msg", "failed to get index", "err", err)
177+
level.Error(userLogger).Log("msg", "failed to check if user is owned by the user", "user", userID, "err", err)
200178
continue
201179
}
180+
if !owned {
181+
level.Info(userLogger).Log("msg", "user not owned", "user", userID)
182+
continue
183+
}
184+
level.Info(userLogger).Log("msg", "scanned user", "user", userID)
185+
186+
err = c.convertUser(ctx, userLogger, ring, userID)
202187

203-
for _, b := range idx.Blocks {
204-
ok, err := c.ownBlock(ring, b.ID.String())
205-
if err != nil {
206-
level.Error(userLogger).Log("msg", "failed to get own block", "block", b.ID.String(), "err", err)
207-
continue
208-
}
209-
210-
if !ok {
211-
continue
212-
}
213-
uBucket := bucket.NewUserBucketClient(userID, c.bkt, c.limits)
214-
215-
marker, err := ReadConverterMark(ctx, b.ID, uBucket, userLogger)
216-
if err != nil {
217-
level.Error(userLogger).Log("msg", "failed to read marker", "block", b.ID.String(), "err", err)
218-
continue
219-
}
220-
221-
if marker.Version == CurrentVersion {
222-
continue
223-
}
224-
225-
// Do not convert 2 hours blocks
226-
if getBlockTimeRange(b, c.blockRanges) == c.blockRanges[0] {
227-
continue
228-
}
229-
230-
if err := os.RemoveAll(c.compactRootDir()); err != nil {
231-
level.Error(userLogger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err)
232-
}
233-
234-
bdir := filepath.Join(c.compactDirForUser(userID), b.ID.String())
235-
236-
level.Info(userLogger).Log("msg", "downloading block", "block", b.ID.String(), "dir", bdir)
237-
if err := block.Download(ctx, userLogger, uBucket, b.ID, bdir, objstore.WithFetchConcurrency(10)); err != nil {
238-
level.Error(userLogger).Log("msg", "Error downloading block", "err", err)
239-
continue
240-
}
241-
242-
tsdbBlock, err := tsdb.OpenBlock(logutil.GoKitLogToSlog(userLogger), bdir, c.pool, tsdb.DefaultPostingsDecoderFactory)
243-
if err != nil {
244-
level.Error(userLogger).Log("msg", "Error opening block", "err", err)
245-
continue
246-
}
247-
// Add converter logic
248-
level.Info(userLogger).Log("msg", "converting block", "block", b.ID.String(), "dir", bdir)
249-
_, err = convert.ConvertTSDBBlock(
250-
ctx,
251-
uBucket,
252-
tsdbBlock.MinTime(),
253-
tsdbBlock.MaxTime(),
254-
[]convert.Convertible{tsdbBlock},
255-
convert.WithSortBy(labels.MetricName),
256-
convert.WithColDuration(time.Hour*8),
257-
convert.WithName(b.ID.String()),
258-
convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")),
259-
)
260-
261-
_ = tsdbBlock.Close()
262-
263-
if err != nil {
264-
level.Error(userLogger).Log("msg", "Error converting block", "err", err)
265-
}
266-
267-
err = WriteCompactMark(ctx, b.ID, uBucket)
268-
if err != nil {
269-
level.Error(userLogger).Log("msg", "Error writing block", "err", err)
270-
}
188+
if err != nil {
189+
level.Error(userLogger).Log("msg", "failed to convert user", "user", userID, "err", err)
271190
}
272191
}
273192
}
@@ -276,7 +195,6 @@ func (c *Converter) running(ctx context.Context) error {
276195

277196
func (c *Converter) stopping(_ error) error {
278197
ctx := context.Background()
279-
services.StopAndAwaitTerminated(ctx, c.loader) //nolint:errcheck
280198
if c.ringSubservices != nil {
281199
return services.StopManagerAndAwaitStopped(ctx, c.ringSubservices)
282200
}
@@ -294,7 +212,134 @@ func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) {
294212
return users, err
295213
}
296214

297-
func (c *Converter) ownUser(userID string) (bool, error) {
215+
func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring ring.ReadRing, userID string) error {
216+
217+
uBucket := bucket.NewUserBucketClient(userID, c.bkt, c.limits)
218+
219+
var blockLister block.Lister
220+
switch cortex_tsdb.BlockDiscoveryStrategy(c.storageCfg.BucketStore.BlockDiscoveryStrategy) {
221+
case cortex_tsdb.ConcurrentDiscovery:
222+
blockLister = block.NewConcurrentLister(logger, uBucket)
223+
case cortex_tsdb.RecursiveDiscovery:
224+
blockLister = block.NewRecursiveLister(logger, uBucket)
225+
case cortex_tsdb.BucketIndexDiscovery:
226+
if !c.storageCfg.BucketStore.BucketIndex.Enabled {
227+
return cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
228+
}
229+
blockLister = bucketindex.NewBlockLister(logger, c.bkt, userID, c.limits)
230+
default:
231+
return cortex_tsdb.ErrBlockDiscoveryStrategy
232+
}
233+
234+
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(
235+
logger,
236+
uBucket,
237+
0,
238+
c.cfg.MetaSyncConcurrency)
239+
240+
var baseFetcherMetrics block.BaseFetcherMetrics
241+
baseFetcherMetrics.Syncs = c.fetcherMetrics.Syncs
242+
// Create the blocks finder.
243+
fetcher, err := block.NewMetaFetcherWithMetrics(
244+
logger,
245+
c.cfg.MetaSyncConcurrency,
246+
uBucket,
247+
blockLister,
248+
c.metaSyncDirForUser(userID),
249+
&baseFetcherMetrics,
250+
c.fetcherMetrics,
251+
[]block.MetadataFilter{ignoreDeletionMarkFilter},
252+
)
253+
254+
if err != nil {
255+
return errors.Wrap(err, "error creating block fetcher")
256+
}
257+
258+
blocks, _, err := fetcher.Fetch(ctx)
259+
if err != nil {
260+
return errors.Wrapf(err, "failed to fetch blocks for user %s", userID)
261+
}
262+
263+
for _, b := range blocks {
264+
ok, err := c.ownBlock(ring, b.ULID.String())
265+
if err != nil {
266+
level.Error(logger).Log("msg", "failed to get own block", "block", b.ULID.String(), "err", err)
267+
continue
268+
}
269+
270+
if !ok {
271+
continue
272+
}
273+
274+
marker, err := ReadConverterMark(ctx, b.ULID, uBucket, logger)
275+
276+
if err != nil {
277+
level.Error(logger).Log("msg", "failed to read marker", "block", b.ULID.String(), "err", err)
278+
continue
279+
}
280+
281+
if marker.Version == CurrentVersion {
282+
continue
283+
}
284+
285+
// Do not convert 2 hours blocks
286+
if getBlockTimeRange(b, c.blockRanges) == c.blockRanges[0] {
287+
continue
288+
}
289+
290+
if err := os.RemoveAll(c.compactRootDir()); err != nil {
291+
level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err)
292+
}
293+
294+
bdir := filepath.Join(c.compactDirForUser(userID), b.ULID.String())
295+
296+
level.Info(logger).Log("msg", "downloading block", "block", b.ULID.String(), "dir", bdir)
297+
298+
if err := block.Download(ctx, logger, uBucket, b.ULID, bdir, objstore.WithFetchConcurrency(10)); err != nil {
299+
level.Error(logger).Log("msg", "Error downloading block", "err", err)
300+
continue
301+
}
302+
303+
tsdbBlock, err := tsdb.OpenBlock(logutil.GoKitLogToSlog(logger), bdir, c.pool, tsdb.DefaultPostingsDecoderFactory)
304+
305+
if err != nil {
306+
level.Error(logger).Log("msg", "Error opening block", "err", err)
307+
continue
308+
}
309+
310+
level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
311+
_, err = convert.ConvertTSDBBlock(
312+
ctx,
313+
uBucket,
314+
tsdbBlock.MinTime(),
315+
tsdbBlock.MaxTime(),
316+
[]convert.Convertible{tsdbBlock},
317+
convert.WithSortBy(labels.MetricName),
318+
convert.WithColDuration(time.Hour*8),
319+
convert.WithName(b.ULID.String()),
320+
convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")),
321+
)
322+
323+
_ = tsdbBlock.Close()
324+
325+
if err != nil {
326+
level.Error(logger).Log("msg", "Error converting block", "err", err)
327+
}
328+
329+
err = WriteCompactMark(ctx, b.ULID, uBucket)
330+
if err != nil {
331+
level.Error(logger).Log("msg", "Error writing block", "err", err)
332+
}
333+
}
334+
335+
return nil
336+
}
337+
338+
func (c *Converter) metaSyncDirForUser(userID string) string {
339+
return filepath.Join(c.cfg.DataDir, "converter-meta-"+userID)
340+
}
341+
342+
func (c *Converter) ownUser(r ring.ReadRing, userID string) (bool, error) {
298343
if !c.allowedTenants.IsAllowed(userID) {
299344
return false, nil
300345
}
@@ -303,9 +348,7 @@ func (c *Converter) ownUser(userID string) (bool, error) {
303348
return true, nil
304349
}
305350

306-
subRing := c.ring.ShuffleShard(userID, c.limits.ParquetConverterTenantShardSize(userID))
307-
308-
rs, err := subRing.GetAllHealthy(RingOp)
351+
rs, err := r.GetAllHealthy(RingOp)
309352
if err != nil {
310353
return false, err
311354
}
@@ -340,7 +383,7 @@ func (c *Converter) compactDirForUser(userID string) string {
340383
return filepath.Join(c.compactRootDir(), userID)
341384
}
342385

343-
func getBlockTimeRange(b *bucketindex.Block, timeRanges []int64) int64 {
386+
func getBlockTimeRange(b *metadata.Meta, timeRanges []int64) int64 {
344387
timeRange := int64(0)
345388
// fallback logic to guess block time range based
346389
// on MaxTime and MinTime

0 commit comments

Comments
 (0)