Skip to content

Commit 826e32a

Browse files
yeya24alanprot
andauthored
Add dynamic shard size for parquet converter (#6817)
* add dynamic shard size for parquet converter Signed-off-by: yeya24 <[email protected]> * update changelog Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]> Signed-off-by: Alan Protasio <[email protected]> Co-authored-by: Alan Protasio <[email protected]>
1 parent fed28b1 commit 826e32a

File tree

3 files changed

+10
-6
lines changed

3 files changed

+10
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
4242
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
4343
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
44+
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
4445
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4546
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4647
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/parquetconverter/converter.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3535
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
3636
"github.com/cortexproject/cortex/pkg/tenant"
37+
"github.com/cortexproject/cortex/pkg/util"
3738
util_log "github.com/cortexproject/cortex/pkg/util/log"
3839
"github.com/cortexproject/cortex/pkg/util/services"
3940
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -209,8 +210,10 @@ func (c *Converter) running(ctx context.Context) error {
209210

210211
var ring ring.ReadRing
211212
ring = c.ring
212-
if c.limits.ParquetConverterTenantShardSize(userID) > 0 {
213-
ring = c.ring.ShuffleShard(userID, c.limits.ParquetConverterTenantShardSize(userID))
213+
shardSize := c.limits.ParquetConverterTenantShardSize(userID)
214+
if shardSize > 0 {
215+
dynamicShardSize := util.DynamicShardSize(c.limits.ParquetConverterTenantShardSize(userID), ring.InstancesCount())
216+
ring = c.ring.ShuffleShard(userID, dynamicShardSize)
214217
}
215218

216219
userLogger := util_log.WithUserID(userID, c.logger)

pkg/util/validation/limits.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ type Limits struct {
207207
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`
208208

209209
// Parquet converter
210-
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
211-
ParquetConverterTenantShardSize int `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
210+
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
211+
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
212212

213213
// This config doesn't have a CLI flag registered here because they're registered in
214214
// their own original config struct.
@@ -305,7 +305,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
305305
f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit")
306306
f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit")
307307

308-
f.IntVar(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
308+
f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
309309
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")
310310

311311
// Store-gateway.
@@ -842,7 +842,7 @@ func (o *Overrides) CompactorTenantShardSize(userID string) float64 {
842842
}
843843

844844
// ParquetConverterTenantShardSize returns shard size (number of converters) used by this tenant when using shuffle-sharding strategy.
845-
func (o *Overrides) ParquetConverterTenantShardSize(userID string) int {
845+
func (o *Overrides) ParquetConverterTenantShardSize(userID string) float64 {
846846
return o.GetOverridesForUser(userID).ParquetConverterTenantShardSize
847847
}
848848

0 commit comments

Comments
 (0)