Skip to content

Commit 423374f

Browse files
committed
Add support for ::JSONB type casting and WHERE 'val' = ANY(column)
1 parent 19e1453 commit 423374f

41 files changed

Lines changed: 3885 additions & 568 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,15 @@ RUN \
3737

3838
COPY --chown=app:app src/common/go.mod src/common/go.sum /app/src/common/
3939

40+
COPY --chown=app:app src/syncer-postgres/lib/go.mod src/syncer-postgres/lib/go.sum /app/src/syncer-postgres/lib/
4041
COPY --chown=app:app src/syncer-postgres/go.mod src/syncer-postgres/go.sum /app/src/syncer-postgres/
4142
RUN cd /app/src/syncer-postgres && go mod download
4243

44+
COPY --chown=app:app src/syncer-amplitude/lib/go.mod src/syncer-amplitude/lib/go.sum /app/src/syncer-amplitude/lib/
4345
COPY --chown=app:app src/syncer-amplitude/go.mod src/syncer-amplitude/go.sum /app/src/syncer-amplitude/
4446
RUN cd /app/src/syncer-amplitude && go mod download
4547

48+
COPY --chown=app:app src/syncer-attio/lib/go.mod src/syncer-attio/lib/go.sum /app/src/syncer-attio/lib/
4649
COPY --chown=app:app src/syncer-attio/go.mod src/syncer-attio/go.sum /app/src/syncer-attio/
4750
RUN cd /app/src/syncer-attio && go mod download
4851

Dockerfile.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ RUN \
5757

5858
COPY --chown=app:app src/common/go.mod src/common/go.sum /app/src/common/
5959

60+
COPY --chown=app:app src/syncer-postgres/lib/go.mod src/syncer-postgres/lib/go.sum /app/src/syncer-postgres/lib/
6061
COPY --chown=app:app src/syncer-postgres/go.mod src/syncer-postgres/go.sum /app/src/syncer-postgres/
6162
RUN cd /app/src/syncer-postgres && go mod download
6263

Makefile

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ sh:
33

44
install:
55
devbox run "cd src/common && go mod tidy && \
6-
cd ../syncer-postgres && go mod tidy && \
7-
cd ../syncer-amplitude && go mod tidy && \
8-
cd ../syncer-attio && go mod tidy && \
9-
cd ../server && go mod tidy"
6+
cd ../syncer-postgres && go mod tidy && cd ../lib && go mod tidy && \
7+
cd ../../syncer-amplitude && go mod tidy && cd ./lib && go mod tidy && \
8+
cd ../../syncer-attio && go mod tidy && cd ./lib && go mod tidy && \
9+
cd ../../server && go mod tidy"
1010

1111
lint:
1212
devbox run "cd src/common && go fmt && staticcheck . && \
13-
cd ../syncer-postgres && go fmt && deadcode . && staticcheck . && \
14-
cd ../syncer-amplitude && go fmt && deadcode . && staticcheck . && \
15-
cd ../syncer-attio && go fmt && deadcode . && staticcheck . && \
16-
cd ../server && go fmt && deadcode . && staticcheck ."
13+
cd ../syncer-postgres && go fmt && deadcode . && staticcheck . && cd ./lib && go fmt && staticcheck . && \
14+
cd ../../syncer-amplitude && go fmt && deadcode . && staticcheck . && cd ./lib && go fmt && staticcheck . && \
15+
cd ../../syncer-attio && go fmt && deadcode . && staticcheck . && cd ./lib && go fmt && staticcheck . && \
16+
cd ../../server && go fmt && deadcode . && staticcheck ."
1717

1818
build:
1919
./scripts/build-docker.sh

src/server/parser_a_expr.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,18 @@ func (parser *ParserAExpr) RightAConstValue(aExpr *pgQuery.A_Expr) string {
8686
return aExpr.Rexpr.GetAConst().GetSval().Sval
8787
}
8888

89-
// = ANY({schema_information}) -> IN (schema_information)
90-
func (parser *ParserAExpr) ConvertedRightAnyToIn(node *pgQuery.Node) *pgQuery.Node {
89+
// = ANY('{information_schema, ...}') -> IN ('information_schema', ...)
90+
//
91+
// DuckDB error: UNNEST() for correlated expressions is not supported yet
92+
func (parser *ParserAExpr) ConvertedRightAnyStringConstantToIn(node *pgQuery.Node) *pgQuery.Node {
9193
aExpr := parser.AExpr(node)
9294

9395
if aExpr.Kind != pgQuery.A_Expr_Kind_AEXPR_OP_ANY {
9496
return node
9597
}
9698

9799
if aExpr.Rexpr.GetAConst() == nil {
98-
// NOTE: ... = ANY() on non-constants is not fully supported yet
99-
return parser.utils.MakeNullNode()
100+
return node
100101
}
101102

102103
arrayStr := aExpr.Rexpr.GetAConst().GetSval().Sval

src/server/parser_type_cast.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ func (parser *ParserTypeCast) TypeName(typeCast *pgQuery.TypeCast) string {
4949
return typeName
5050
}
5151

52+
func (parser *ParserTypeCast) SetTypeName(typeCast *pgQuery.TypeCast, typeName string) {
53+
if typeCast == nil || len(typeCast.TypeName.Names) != 1 {
54+
return
55+
}
56+
57+
typeCast.TypeName.Names[0] = pgQuery.MakeStrNode(typeName)
58+
}
59+
5260
func (parser *ParserTypeCast) NestedTypeCast(typeCast *pgQuery.TypeCast) *pgQuery.TypeCast {
5361
return parser.TypeCast(typeCast.Arg)
5462
}

src/server/query_handler_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,11 @@ func TestHandleQuery(t *testing.T) {
11261126
"types": {uint32ToString(pgtype.IntervalOID)},
11271127
"values": {"0 months 7 days 0 microseconds"},
11281128
},
1129+
"SELECT '{\"key\": \"value\"}'::JSONB AS jsonb": {
1130+
"description": {"jsonb"},
1131+
"types": {uint32ToString(pgtype.JSONOID)},
1132+
"values": {"{\"key\":\"value\"}"},
1133+
},
11291134
"SELECT date_trunc('month', '2025-02-24 15:58:23-05'::timestamptz + '-1 month'::interval) AS date": {
11301135
"description": {"date"},
11311136
"types": {uint32ToString(pgtype.TimestamptzOID)},
@@ -1285,9 +1290,10 @@ func TestHandleQuery(t *testing.T) {
12851290

12861291
t.Run("WHERE ANY(column reference)", func(t *testing.T) {
12871292
testResponseByQuery(t, queryHandler, map[string]map[string][]string{
1288-
"SELECT id FROM postgres.test_table WHERE id = ANY(id)": { // NOTE: ... = ANY() on non-constants is not fully supported yet
1293+
"SELECT id FROM postgres.test_table WHERE 'one' = ANY(array_text_column)": {
12891294
"description": {"id"},
12901295
"types": {uint32ToString(pgtype.Int4OID)},
1296+
"values": {"1"},
12911297
},
12921298
})
12931299
})

src/server/query_remapper_expression.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ func (remapper *QueryRemapperExpression) remappedTypeCast(node *pgQuery.Node) *p
6363
return node
6464
}
6565
return remapper.parserTypeCast.MakeSubselectOidBySchemaTableArg(nestedTypeCast.Arg)
66+
case "jsonb":
67+
// value::jsonb -> value::json
68+
remapper.parserTypeCast.SetTypeName(typeCast, "json")
6669
case "text":
6770
// value::(regtype|regnamespace|regclass)::text -> value::text
6871
nestedTypeCast := remapper.parserTypeCast.NestedTypeCast(typeCast)
@@ -83,8 +86,8 @@ func (remapper *QueryRemapperExpression) remappedArithmeticExpression(node *pgQu
8386
return node
8487
}
8588

86-
// = ANY({schema_information}) -> IN (schema_information)
87-
node = remapper.parserAExpr.ConvertedRightAnyToIn(node)
89+
// = ANY('{information_schema, ...}') -> IN ('information_schema', ...)
90+
node = remapper.parserAExpr.ConvertedRightAnyStringConstantToIn(node)
8891

8992
// pg_catalog.[operator] -> [operator]
9093
remapper.parserAExpr.RemovePgCatalog(node)

src/syncer-amplitude/go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@ module github.com/BemiHQ/BemiDB/src/syncer-amplitude
22

33
go 1.24.4
44

5-
require github.com/BemiHQ/BemiDB/src/common v0.0.0-00010101000000-000000000000
5+
require (
6+
github.com/BemiHQ/BemiDB/src/common v0.0.0-00010101000000-000000000000
7+
github.com/BemiHQ/BemiDB/src/syncer-amplitude/lib v0.0.0-00010101000000-000000000000
8+
)
69

710
replace github.com/BemiHQ/BemiDB/src/common => ../common
811

12+
replace github.com/BemiHQ/BemiDB/src/syncer-amplitude/lib => ./lib
13+
914
require (
1015
github.com/apache/arrow-go/v18 v18.4.0 // indirect
1116
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package main
1+
package amplitude
22

33
import (
44
"archive/zip"
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package main
1+
package amplitude
22

33
import (
44
"flag"
@@ -35,11 +35,7 @@ type configParseValues struct {
3535
var _config Config
3636
var _configParseValues configParseValues
3737

38-
func init() {
39-
registerFlags()
40-
}
41-
42-
func registerFlags() {
38+
func RegisterFlags() {
4339
_config.CommonConfig = &common.CommonConfig{}
4440

4541
flag.StringVar(&_config.CommonConfig.LogLevel, "log-level", os.Getenv(common.ENV_LOG_LEVEL), `Log level: "ERROR", "WARN", "INFO", "DEBUG", "TRACE". Default: "`+common.DEFAULT_LOG_LEVEL+`"`)
@@ -57,6 +53,11 @@ func registerFlags() {
5753
flag.StringVar(&_configParseValues.StartDate, "start-date", os.Getenv(ENV_START_DATE), "Amplitude start date in YYYY-MM-DD format")
5854
}
5955

56+
func LoadConfig() *Config {
57+
parseFlags()
58+
return &_config
59+
}
60+
6061
func parseFlags() {
6162
flag.Parse()
6263

@@ -103,8 +104,3 @@ func parseFlags() {
103104
}
104105
_config.StartDate = parsedStartDate
105106
}
106-
107-
func LoadConfig() *Config {
108-
parseFlags()
109-
return &_config
110-
}

0 commit comments

Comments
 (0)