@@ -10,6 +10,8 @@ import (
10
10
"strings"
11
11
"time"
12
12
13
+ "github.com/parquet-go/parquet-go"
14
+
13
15
"github.com/go-kit/log"
14
16
"github.com/go-kit/log/level"
15
17
"github.com/pkg/errors"
@@ -38,9 +40,7 @@ const (
38
40
ringKey = "parquet-converter"
39
41
)
40
42
41
- var (
42
- RingOp = ring .NewOp ([]ring.InstanceState {ring .ACTIVE }, nil )
43
- )
43
+ var RingOp = ring .NewOp ([]ring.InstanceState {ring .ACTIVE }, nil )
44
44
45
45
type Config struct {
46
46
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
@@ -84,6 +84,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
84
84
85
85
f .Var (& cfg .EnabledTenants , "parquet-converter.enabled-tenants" , "Comma separated list of tenants that can be converted. If specified, only these tenants will be converted, otherwise all tenants can be converted." )
86
86
f .Var (& cfg .DisabledTenants , "parquet-converter.disabled-tenants" , "Comma separated list of tenants that cannot converted." )
87
+ f .StringVar (& cfg .DataDir , "parquet-converter.data-dir" , "./data" , "Data directory in which to cache blocks and process conversions." )
87
88
}
88
89
89
90
func NewConverter (cfg Config , storageCfg cortex_tsdb.BlocksStorageConfig , blockRanges []int64 , logger log.Logger , registerer prometheus.Registerer , limits * validation.Overrides ) * Converter {
@@ -121,6 +122,10 @@ func (c *Converter) starting(ctx context.Context) error {
121
122
lifecyclerCfg := c .cfg .Ring .ToLifecyclerConfig ()
122
123
c .ringLifecycler , err = ring .NewLifecycler (lifecyclerCfg , ring .NewNoopFlushTransferer (), "parquet-converter" , ringKey , true , false , c .logger , prometheus .WrapRegistererWithPrefix ("cortex_" , c .reg ))
123
124
125
+ if err != nil {
126
+ return errors .Wrap (err , "unable to initialize converter ring lifecycler" )
127
+ }
128
+
124
129
c .ring , err = ring .New (lifecyclerCfg .RingConfig , "parquet-converter" , ringKey , c .logger , prometheus .WrapRegistererWithPrefix ("cortex_" , c .reg ))
125
130
if err != nil {
126
131
return errors .Wrap (err , "unable to initialize compactor ring" )
@@ -201,65 +206,70 @@ func (c *Converter) running(ctx context.Context) error {
201
206
level .Error (userLogger ).Log ("msg" , "failed to get own block" , "block" , b .ID .String (), "err" , err )
202
207
continue
203
208
}
204
- if ok {
205
- marker , err := ReadConverterMark (ctx , b .ID , c .bkt , userLogger )
206
- if err != nil {
207
- level .Error (userLogger ).Log ("msg" , "failed to read marker" , "block" , b .ID .String (), "err" , err )
208
- continue
209
- }
210
-
211
- if marker .Version == CurrentVersion {
212
- continue
213
- }
214
-
215
- // Do not convert 2 hours blocks
216
- if getBlockTimeRange (b , c .blockRanges ) == c .blockRanges [0 ] {
217
- continue
218
- }
219
-
220
- if err := os .RemoveAll (c .compactRootDir ()); err != nil {
221
- level .Error (userLogger ).Log ("msg" , "failed to remove work directory" , "path" , c .compactRootDir (), "err" , err )
222
- }
223
-
224
- bdir := filepath .Join (c .compactDirForUser (userID ), b .ID .String ())
225
- uBucket := bucket .NewUserBucketClient (userID , c .bkt , c .limits )
226
-
227
- level .Info (userLogger ).Log ("msg" , "downloading block" , "block" , b .ID .String (), "dir" , bdir )
228
- if err := block .Download (ctx , userLogger , uBucket , b .ID , bdir , objstore .WithFetchConcurrency (10 )); err != nil {
229
- level .Error (userLogger ).Log ("msg" , "Error downloading block" , "err" , err )
230
- continue
231
- }
232
-
233
- tsdbBlock , err := tsdb .OpenBlock (logutil .GoKitLogToSlog (userLogger ), bdir , c .pool , tsdb .DefaultPostingsDecoderFactory )
234
- if err != nil {
235
- level .Error (userLogger ).Log ("msg" , "Error opening block" , "err" , err )
236
- continue
237
- }
238
- // Add converter logic
239
- level .Info (userLogger ).Log ("msg" , "converting block" , "block" , b .ID .String (), "dir" , bdir )
240
- _ , err = convert .ConvertTSDBBlock (
241
- ctx ,
242
- uBucket ,
243
- tsdbBlock .MinTime (),
244
- tsdbBlock .MaxTime (),
245
- []convert.Convertible {tsdbBlock },
246
- convert .WithSortBy (labels .MetricName ),
247
- convert .WithColDuration (time .Hour * 8 ),
248
- convert .WithName (b .ID .String ()),
249
- )
250
-
251
- if err != nil {
252
- level .Error (userLogger ).Log ("msg" , "Error converting block" , "err" , err )
253
- }
254
-
255
- err = WriteCompactMark (ctx , b .ID , uBucket )
256
- if err != nil {
257
- level .Error (userLogger ).Log ("msg" , "Error writing block" , "err" , err )
258
- }
209
+
210
+ if ! ok {
211
+ continue
212
+ }
213
+ uBucket := bucket .NewUserBucketClient (userID , c .bkt , c .limits )
214
+
215
+ marker , err := ReadConverterMark (ctx , b .ID , uBucket , userLogger )
216
+ if err != nil {
217
+ level .Error (userLogger ).Log ("msg" , "failed to read marker" , "block" , b .ID .String (), "err" , err )
218
+ continue
219
+ }
220
+
221
+ if marker .Version == CurrentVersion {
222
+ continue
223
+ }
224
+
225
+ // Do not convert 2 hours blocks
226
+ if getBlockTimeRange (b , c .blockRanges ) == c .blockRanges [0 ] {
227
+ continue
228
+ }
229
+
230
+ if err := os .RemoveAll (c .compactRootDir ()); err != nil {
231
+ level .Error (userLogger ).Log ("msg" , "failed to remove work directory" , "path" , c .compactRootDir (), "err" , err )
232
+ }
233
+
234
+ bdir := filepath .Join (c .compactDirForUser (userID ), b .ID .String ())
235
+
236
+ level .Info (userLogger ).Log ("msg" , "downloading block" , "block" , b .ID .String (), "dir" , bdir )
237
+ if err := block .Download (ctx , userLogger , uBucket , b .ID , bdir , objstore .WithFetchConcurrency (10 )); err != nil {
238
+ level .Error (userLogger ).Log ("msg" , "Error downloading block" , "err" , err )
239
+ continue
240
+ }
241
+
242
+ tsdbBlock , err := tsdb .OpenBlock (logutil .GoKitLogToSlog (userLogger ), bdir , c .pool , tsdb .DefaultPostingsDecoderFactory )
243
+ if err != nil {
244
+ level .Error (userLogger ).Log ("msg" , "Error opening block" , "err" , err )
245
+ continue
246
+ }
247
+ // Add converter logic
248
+ level .Info (userLogger ).Log ("msg" , "converting block" , "block" , b .ID .String (), "dir" , bdir )
249
+ _ , err = convert .ConvertTSDBBlock (
250
+ ctx ,
251
+ uBucket ,
252
+ tsdbBlock .MinTime (),
253
+ tsdbBlock .MaxTime (),
254
+ []convert.Convertible {tsdbBlock },
255
+ convert .WithSortBy (labels .MetricName ),
256
+ convert .WithColDuration (time .Hour * 8 ),
257
+ convert .WithName (b .ID .String ()),
258
+ convert .WithColumnPageBuffers (parquet .NewFileBufferPool (bdir , "buffers.*" )),
259
+ )
260
+
261
+ _ = tsdbBlock .Close ()
262
+
263
+ if err != nil {
264
+ level .Error (userLogger ).Log ("msg" , "Error converting block" , "err" , err )
265
+ }
266
+
267
+ err = WriteCompactMark (ctx , b .ID , uBucket )
268
+ if err != nil {
269
+ level .Error (userLogger ).Log ("msg" , "Error writing block" , "err" , err )
259
270
}
260
271
}
261
272
}
262
-
263
273
}
264
274
}
265
275
}
@@ -346,7 +356,7 @@ func getBlockTimeRange(b *bucketindex.Block, timeRanges []int64) int64 {
346
356
return timeRange
347
357
}
348
358
349
- func getRangeStart (mint int64 , tr int64 ) int64 {
359
+ func getRangeStart (mint , tr int64 ) int64 {
350
360
// Compute start of aligned time range of size tr closest to the current block's start.
351
361
// This code has been copied from TSDB.
352
362
if mint >= 0 {
0 commit comments