Skip to content

Commit db77d66

Browse files
committed
Add support for [column] ? 'key' operator, fix Attio pagination
1 parent eb53fa4 commit db77d66

7 files changed

Lines changed: 36 additions & 11 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ SELECT * FROM [TABLE] WHERE [JSON_COLUMN]->>'[JSON_KEY]' = '[JSON_VALUE]';
316316
- [x] TablePlus
317317
- [x] DBeaver
318318
- [x] pgAdmin
319+
- [x] Grafana
319320
- [ ] Jupyter notebooks ([#27](https://github.com/BemiHQ/BemiDB/issues/27))
320321
- [x] Data syncing from other sources
321322
- [x] Amplitude (incremental)

src/common/iceberg_table_writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ func (writer *IcebergTableWriter) jsonToDuckdbRowValues(rowValues map[string]int
675675
}
676676
tableColumnNames := make([]string, len(writer.IcebergSchemaColumns))
677677
for i, icebergSchemaColumn := range writer.IcebergSchemaColumns {
678-
tableColumnNames[i] = strings.ToLower(icebergSchemaColumn.ColumnName) // Debezium JSON keys are lowercase
678+
tableColumnNames[i] = icebergSchemaColumn.ColumnName
679679
}
680680
if len(rowColumnNames) != len(tableColumnNames) {
681681
Panic(writer.Config, "Row column names count doesn't match table column names count: "+strings.Join(rowColumnNames, ", ")+" (row) vs "+strings.Join(tableColumnNames, ", ")+" (table)")
@@ -691,7 +691,7 @@ func (writer *IcebergTableWriter) jsonToDuckdbRowValues(rowValues map[string]int
691691
// Convert row values to DuckDB values
692692
duckdbRowValues := make([]driver.Value, len(writer.IcebergSchemaColumns))
693693
for i, icebergSchemaColumn := range writer.IcebergSchemaColumns {
694-
columnName := strings.ToLower(icebergSchemaColumn.ColumnName) // Debezium JSON keys are lowercase
694+
columnName := icebergSchemaColumn.ColumnName
695695
value := rowValues[columnName]
696696
duckdbRowValues[i] = icebergSchemaColumn.DuckdbValueFromJson(value)
697697
}

src/server/parser_a_expr.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ func (parser *ParserAExpr) RemappedJsonExtract(node *pgQuery.Node) *pgQuery.Node
6262
)
6363
}
6464

65+
// [column] ? 'key' -> json_exists([column], 'key')
66+
func (parser *ParserAExpr) RemappedJsonExists(node *pgQuery.Node) *pgQuery.Node {
67+
aExpr := parser.AExpr(node)
68+
if aExpr == nil || parser.OperatorName(aExpr) != "?" {
69+
return node
70+
}
71+
72+
return pgQuery.MakeFuncCallNode(
73+
[]*pgQuery.Node{pgQuery.MakeStrNode("json_exists")},
74+
[]*pgQuery.Node{aExpr.Lexpr, aExpr.Rexpr},
75+
0,
76+
)
77+
}
78+
6579
func (parser *ParserAExpr) OperatorName(aExpr *pgQuery.A_Expr) string {
6680
if aExpr.Kind != pgQuery.A_Expr_Kind_AEXPR_OP || len(aExpr.Name) != 1 {
6781
return ""

src/server/query_handler_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,11 @@ func TestHandleQuery(t *testing.T) {
931931
"types": {uint32ToString(pgtype.TextOID)},
932932
"values": {"value"},
933933
},
934+
"SELECT json_column ? 'key' AS exists FROM postgres.test_table WHERE id = 1": {
935+
"description": {"exists"},
936+
"types": {uint32ToString(pgtype.BoolOID)},
937+
"values": {"t"},
938+
},
934939
"SELECT jsonb_column FROM postgres.test_table WHERE bool_column = FALSE": {
935940
"description": {"jsonb_column"},
936941
"types": {uint32ToString(pgtype.TextOID)},

src/server/query_remapper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ func (remapper *QueryRemapper) remappedExpressions(node *pgQuery.Node, remappedC
372372
remapper.remapSelectStatement(subSelect, permissions, indentLevel+1) // recursion
373373
}
374374

375-
// Comparison
375+
// Operator: =, ?, etc.
376376
aExpr := node.GetAExpr()
377377
if aExpr != nil {
378378
node = remapper.remapperExpression.RemappedExpression(node)

src/server/query_remapper_expression.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func NewQueryRemapperExpression(config *Config) *QueryRemapperExpression {
2525

2626
func (remapper *QueryRemapperExpression) RemappedExpression(node *pgQuery.Node) *pgQuery.Node {
2727
node = remapper.remappedTypeCast(node)
28-
node = remapper.remappedArithmeticExpression(node)
28+
node = remapper.remappedOperatorExpression(node)
2929
node = remapper.remappedCollateClause(node)
3030
node = remapper.remappedNullColumnExpression(node)
3131
remapper.remapColumnReference(node)
@@ -80,7 +80,7 @@ func (remapper *QueryRemapperExpression) remappedTypeCast(node *pgQuery.Node) *p
8080
return node
8181
}
8282

83-
func (remapper *QueryRemapperExpression) remappedArithmeticExpression(node *pgQuery.Node) *pgQuery.Node {
83+
func (remapper *QueryRemapperExpression) remappedOperatorExpression(node *pgQuery.Node) *pgQuery.Node {
8484
aExpr := remapper.parserAExpr.AExpr(node)
8585
if aExpr == nil {
8686
return node
@@ -98,6 +98,9 @@ func (remapper *QueryRemapperExpression) remappedArithmeticExpression(node *pgQu
9898
// [column]->'value' -> json_extract([column], 'value')
9999
node = remapper.parserAExpr.RemappedJsonExtract(node)
100100

101+
// [column] ? 'key' -> json_exists([column], 'key')
102+
node = remapper.parserAExpr.RemappedJsonExists(node)
103+
101104
return node
102105
}
103106

src/syncer-attio/lib/attio.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package attio
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"fmt"
67
"io"
@@ -40,23 +41,24 @@ type ListRecordsResponse struct {
4041
func (attio *Attio) Load(object string, jsonQueueWriter *common.JsonQueueWriter) error {
4142
offset := 0
4243
for {
43-
req, err := http.NewRequest("POST", ATTIO_API_URL+"/objects/"+object+"/records/query", nil)
44+
jsonBody, err := json.Marshal(map[string]interface{}{"limit": ATTIO_API_LIMIT, "offset": offset})
45+
if err != nil {
46+
return err
47+
}
48+
req, err := http.NewRequest("POST", ATTIO_API_URL+"/objects/"+object+"/records/query", bytes.NewBuffer(jsonBody))
4449
if err != nil {
4550
return err
4651
}
47-
q := req.URL.Query()
48-
q.Add("limit", common.IntToString(ATTIO_API_LIMIT))
49-
q.Add("offset", common.IntToString(offset))
50-
req.URL.RawQuery = q.Encode()
51-
5252
req.Header.Set("Authorization", "Bearer "+attio.Config.ApiAccessToken)
5353
req.Header.Set("Content-Type", "application/json")
5454

55+
common.LogInfo(attio.Config.CommonConfig, "Sending request to Attio:", req.URL.String(), "with body:", string(jsonBody))
5556
resp, err := attio.HttpClient.Do(req)
5657
if err != nil {
5758
return err
5859
}
5960

61+
common.LogDebug(attio.Config.CommonConfig, "Received response from Attio:", resp.Status)
6062
if resp.StatusCode != http.StatusOK {
6163
body, _ := io.ReadAll(resp.Body)
6264
resp.Body.Close()

0 commit comments

Comments
 (0)