@@ -47,6 +47,7 @@ type Config struct {
47
47
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
48
48
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
49
49
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
50
+ ConversionInterval time.Duration `yaml:"conversion_interval"`
50
51
51
52
DataDir string `yaml:"data_dir"`
52
53
@@ -70,7 +71,7 @@ type Converter struct {
70
71
ringSubservices * services.Manager
71
72
ringSubservicesWatcher * services.FailureWatcher
72
73
73
- bkt objstore.InstrumentedBucket
74
+ bkt objstore.Bucket
74
75
75
76
// chunk pool
76
77
pool chunkenc.Pool
@@ -88,9 +89,17 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
88
89
f .Var (& cfg .DisabledTenants , "parquet-converter.disabled-tenants" , "Comma separated list of tenants that cannot converted." )
89
90
f .StringVar (& cfg .DataDir , "parquet-converter.data-dir" , "./data" , "Data directory in which to cache blocks and process conversions." )
90
91
f .IntVar (& cfg .MetaSyncConcurrency , "parquet-converter.meta-sync-concurrency" , 20 , "Number of Go routines to use when syncing block meta files from the long term storage." )
92
+ f .DurationVar (& cfg .ConversionInterval , "parquet-converter.conversion-interval" , time .Minute , "The frequency at which the conversion job runs." )
91
93
}
92
94
93
- func NewConverter (cfg Config , storageCfg cortex_tsdb.BlocksStorageConfig , blockRanges []int64 , logger log.Logger , registerer prometheus.Registerer , limits * validation.Overrides ) * Converter {
95
+ func NewConverter (cfg Config , storageCfg cortex_tsdb.BlocksStorageConfig , blockRanges []int64 , logger log.Logger , registerer prometheus.Registerer , limits * validation.Overrides ) (* Converter , error ) {
96
+ bkt , err := bucket .NewClient (context .Background (), storageCfg .Bucket , nil , "parquet-converter" , logger , registerer )
97
+
98
+ return newConverter (cfg , bkt , storageCfg , blockRanges , logger , registerer , limits ), err
99
+ }
100
+
101
+ func newConverter (cfg Config , bkt objstore.InstrumentedBucket , storageCfg cortex_tsdb.BlocksStorageConfig , blockRanges []int64 , logger log.Logger , registerer prometheus.Registerer , limits * validation.Overrides ) * Converter {
102
+ bkt = bucketindex .BucketWithGlobalMarkers (bkt )
94
103
c := & Converter {
95
104
cfg : cfg ,
96
105
reg : registerer ,
@@ -101,24 +110,17 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
101
110
pool : chunkenc .NewPool (),
102
111
blockRanges : blockRanges ,
103
112
fetcherMetrics : block .NewFetcherMetrics (registerer , nil , nil ),
113
+ bkt : bkt ,
104
114
}
105
115
106
116
c .Service = services .NewBasicService (c .starting , c .running , c .stopping )
107
117
return c
108
118
}
109
119
110
120
func (c * Converter ) starting (ctx context.Context ) error {
111
- bkt , err := bucket .NewClient (ctx , c .storageCfg .Bucket , nil , "parquet-converter" , c .logger , c .reg )
112
- bkt = bucketindex .BucketWithGlobalMarkers (bkt )
113
-
114
- if err != nil {
115
- return err
116
- }
117
-
118
- c .bkt = bkt
119
121
lifecyclerCfg := c .cfg .Ring .ToLifecyclerConfig ()
122
+ var err error
120
123
c .ringLifecycler , err = ring .NewLifecycler (lifecyclerCfg , ring .NewNoopFlushTransferer (), "parquet-converter" , ringKey , true , false , c .logger , prometheus .WrapRegistererWithPrefix ("cortex_" , c .reg ))
121
-
122
124
if err != nil {
123
125
return errors .Wrap (err , "unable to initialize converter ring lifecycler" )
124
126
}
@@ -152,7 +154,7 @@ func (c *Converter) starting(ctx context.Context) error {
152
154
153
155
func (c * Converter ) running (ctx context.Context ) error {
154
156
level .Info (c .logger ).Log ("msg" , "parquet-converter started" )
155
- t := time .NewTicker (time . Second * 10 )
157
+ t := time .NewTicker (c . cfg . ConversionInterval )
156
158
defer t .Stop ()
157
159
for {
158
160
select {
@@ -187,7 +189,6 @@ func (c *Converter) running(ctx context.Context) error {
187
189
level .Info (userLogger ).Log ("msg" , "scanned user" , "user" , userID )
188
190
189
191
err = c .convertUser (ctx , userLogger , ring , userID )
190
-
191
192
if err != nil {
192
193
level .Error (userLogger ).Log ("msg" , "failed to convert user" , "user" , userID , "err" , err )
193
194
}
@@ -216,7 +217,6 @@ func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) {
216
217
}
217
218
218
219
func (c * Converter ) convertUser (ctx context.Context , logger log.Logger , ring ring.ReadRing , userID string ) error {
219
-
220
220
uBucket := bucket .NewUserBucketClient (userID , c .bkt , c .limits )
221
221
222
222
var blockLister block.Lister
@@ -253,7 +253,6 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
253
253
c .fetcherMetrics ,
254
254
[]block.MetadataFilter {ignoreDeletionMarkFilter },
255
255
)
256
-
257
256
if err != nil {
258
257
return errors .Wrap (err , "error creating block fetcher" )
259
258
}
@@ -275,7 +274,6 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
275
274
}
276
275
277
276
marker , err := cortex_parquet .ReadConverterMark (ctx , b .ULID , uBucket , logger )
278
-
279
277
if err != nil {
280
278
level .Error (logger ).Log ("msg" , "failed to read marker" , "block" , b .ULID .String (), "err" , err )
281
279
continue
@@ -304,7 +302,6 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
304
302
}
305
303
306
304
tsdbBlock , err := tsdb .OpenBlock (logutil .GoKitLogToSlog (logger ), bdir , c .pool , tsdb .DefaultPostingsDecoderFactory )
307
-
308
305
if err != nil {
309
306
level .Error (logger ).Log ("msg" , "Error opening block" , "err" , err )
310
307
continue
0 commit comments