Skip to content

Commit

Permalink
Handling Codec(Delta, ZSTD(1)) (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
sharadgaur authored Jul 10, 2024
1 parent 61b3867 commit fa2bec7
Show file tree
Hide file tree
Showing 7 changed files with 588 additions and 2 deletions.
11 changes: 10 additions & 1 deletion parser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (a *AlterTableAddIndex) AlterType() string {

func (a *AlterTableAddIndex) String(level int) string {
var builder strings.Builder
builder.WriteString("ADD INDEX ")
builder.WriteString("ADD ")
builder.WriteString(a.Index.String(level))
if a.IfNotExists {
builder.WriteString("IF NOT EXISTS ")
Expand Down Expand Up @@ -1271,8 +1271,11 @@ func (a *TableIndex) End() Pos {

func (a *TableIndex) String(level int) string {
var builder strings.Builder
builder.WriteString("INDEX")
builder.WriteByte(' ')
builder.WriteString(a.Name.String(0))
builder.WriteString(a.ColumnExpr.String(level))
builder.WriteByte(' ')
builder.WriteString("TYPE")
builder.WriteByte(' ')
builder.WriteString(a.ColumnType.String(level))
Expand Down Expand Up @@ -3265,6 +3268,7 @@ func (n *NestedTypeExpr) Accept(visitor ASTVisitor) error {
type CompressionCodec struct {
CodecPos Pos
RightParenPos Pos
Type *Ident
Name *Ident
Level *NumberLiteral // compression level
}
Expand All @@ -3280,6 +3284,11 @@ func (c *CompressionCodec) End() Pos {
func (c *CompressionCodec) String(level int) string {
var builder strings.Builder
builder.WriteString("CODEC(")
if c.Type != nil {
builder.WriteString(c.Type.String(level))
builder.WriteByte(',')
builder.WriteByte(' ')
}
builder.WriteString(c.Name.String(level))
if c.Level != nil {
builder.WriteByte('(')
Expand Down
14 changes: 14 additions & 0 deletions parser/parser_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,19 @@ func (p *Parser) tryParseCompressionCodecs(pos Pos) (*CompressionCodec, error) {
if err != nil {
return nil, err
}
// parse DELTA if CODEC(Delta, ZSTD(1))
var codecType *Ident
if strings.ToUpper(name.Name) == "DELTA" {
codecType = name
if _, err := p.consumeTokenKind(","); err != nil {
return nil, err
}
name, err = p.parseIdent()
if err != nil {
return nil, err
}
}

var level *NumberLiteral
// TODO: check if the codec name is valid
switch strings.ToUpper(name.Name) {
Expand All @@ -819,6 +832,7 @@ func (p *Parser) tryParseCompressionCodecs(pos Pos) (*CompressionCodec, error) {
return &CompressionCodec{
CodecPos: pos,
RightParenPos: rightParenPos,
Type: codecType,
Name: name,
Level: level,
}, nil
Expand Down
12 changes: 12 additions & 0 deletions parser/testdata/ddl/create_table_with_codec_delta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS test_local
(
`id` UInt64 CODEC(Delta, ZSTD(1)),
`api_id` UInt64 CODEC(ZSTD(1)),
`timestamp` DateTime64(9) CODEC(ZSTD(1)),
INDEX timestamp_index(timestamp) TYPE minmax GRANULARITY 4
)
ENGINE = ReplicatedMergeTree('/root/test_local', '{replica}')
PARTITION BY toStartOfHour(`timestamp`)
ORDER BY (toUnixTimestamp64Nano(`timestamp`), `api_id`)
TTL toStartOfHour(`timestamp`) + INTERVAL 7 DAY,toStartOfHour(`timestamp`) + INTERVAL 2 DAY
SETTINGS execute_merges_on_single_replica_time_threshold=1200, index_granularity=16384, max_bytes_to_merge_at_max_space_in_pool=64424509440, storage_policy='main', ttl_only_drop_parts=1;
2 changes: 1 addition & 1 deletion parser/testdata/ddl/format/alter_table_add_index.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ ALTER TABLE test.events_local ON CLUSTER 'default_cluster' ADD INDEX my_index(f0
-- Format SQL:
ALTER TABLE test.events_local
ON CLUSTER 'default_cluster'
ADD INDEX my_index(f0)TYPE minmax GRANULARITY 1024;
ADD INDEX my_index(f0) TYPE minmax GRANULARITY 1024;
28 changes: 28 additions & 0 deletions parser/testdata/ddl/format/create_table_with_codec_delta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Origin SQL:
CREATE TABLE IF NOT EXISTS test_local
(
`id` UInt64 CODEC(Delta, ZSTD(1)),
`api_id` UInt64 CODEC(ZSTD(1)),
`timestamp` DateTime64(9) CODEC(ZSTD(1)),
INDEX timestamp_index(timestamp) TYPE minmax GRANULARITY 4
)
ENGINE = ReplicatedMergeTree('/root/test_local', '{replica}')
PARTITION BY toStartOfHour(`timestamp`)
ORDER BY (toUnixTimestamp64Nano(`timestamp`), `api_id`)
TTL toStartOfHour(`timestamp`) + INTERVAL 7 DAY,toStartOfHour(`timestamp`) + INTERVAL 2 DAY
SETTINGS execute_merges_on_single_replica_time_threshold=1200, index_granularity=16384, max_bytes_to_merge_at_max_space_in_pool=64424509440, storage_policy='main', ttl_only_drop_parts=1;


-- Format SQL:
CREATE TABLE IF NOT EXISTS test_local
(
`id` UInt64 CODEC(Delta, ZSTD(1)),
`api_id` UInt64 CODEC(ZSTD(1)),
`timestamp` DateTime64(9) CODEC(ZSTD(1)),
INDEX timestamp_index(timestamp) TYPE minmax GRANULARITY 4
)
ENGINE = ReplicatedMergeTree('/root/test_local', '{replica}')
PARTITION BY toStartOfHour(`timestamp`)
TTL toStartOfHour(`timestamp`) + INTERVAL 7 DAY,toStartOfHour(`timestamp`) + INTERVAL 2 DAY
SETTINGS execute_merges_on_single_replica_time_threshold=1200, index_granularity=16384, max_bytes_to_merge_at_max_space_in_pool=64424509440, storage_policy='main', ttl_only_drop_parts=1
ORDER BY (toUnixTimestamp64Nano(`timestamp`), `api_id`);
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"Codec": {
"CodecPos": 198,
"RightParenPos": 212,
"Type": null,
"Name": {
"Name": "ZSTD",
"QuoteType": 1,
Expand Down
Loading

0 comments on commit fa2bec7

Please sign in to comment.