Skip to content

Commit a268cb3

Browse files
committed
Add support for materialized views
1 parent 58e8320 commit a268cb3

54 files changed

Lines changed: 3623 additions & 2239 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Dockerfile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ RUN \
3636
# Compile syncers and server
3737

3838
COPY --chown=app:app src/common/go.mod src/common/go.sum /app/src/common/
39-
COPY --chown=app:app src/syncer-common/go.mod src/syncer-common/go.sum /app/src/syncer-common/
4039

4140
COPY --chown=app:app src/syncer-postgres/go.mod src/syncer-postgres/go.sum /app/src/syncer-postgres/
4241
RUN cd /app/src/syncer-postgres && go mod download
@@ -48,7 +47,6 @@ COPY --chown=app:app src/server/go.mod src/server/go.sum /app/src/server/
4847
RUN cd /app/src/server && go mod download
4948

5049
COPY --chown=app:app src/common /app/src/common
51-
COPY --chown=app:app src/syncer-common /app/src/syncer-common
5250
COPY --chown=app:app src/syncer-postgres /app/src/syncer-postgres
5351
COPY --chown=app:app src/syncer-amplitude /app/src/syncer-amplitude
5452
COPY --chown=app:app src/server /app/src/server

Dockerfile.test

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ RUN \
5656
# Prepare syncers and server
5757

5858
COPY --chown=app:app src/common/go.mod src/common/go.sum /app/src/common/
59-
COPY --chown=app:app src/syncer-common/go.mod src/syncer-common/go.sum /app/src/syncer-common/
6059

6160
COPY --chown=app:app src/syncer-postgres/go.mod src/syncer-postgres/go.sum /app/src/syncer-postgres/
6261
RUN cd /app/src/syncer-postgres && go mod download
@@ -65,7 +64,6 @@ COPY --chown=app:app src/server/go.mod src/server/go.sum /app/src/server/
6564
RUN cd /app/src/server && go mod download
6665

6766
COPY --chown=app:app src/common /app/src/common
68-
COPY --chown=app:app src/syncer-common /app/src/syncer-common
6967
COPY --chown=app:app src/syncer-postgres /app/src/syncer-postgres
7068
COPY --chown=app:app src/server /app/src/server
7169

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@ sh:
33

44
install:
55
devbox run "cd src/common && go mod tidy && \
6-
cd ../syncer-common && go mod tidy && \
76
cd ../syncer-postgres && go mod tidy && \
87
cd ../syncer-amplitude && go mod tidy && \
98
cd ../server && go mod tidy"
109

1110
lint:
1211
devbox run "cd src/common && go fmt && staticcheck . && \
13-
cd ../syncer-common && go fmt && staticcheck . && \
1412
cd ../syncer-postgres && go fmt && deadcode . && staticcheck . && \
1513
cd ../syncer-amplitude && go fmt && deadcode . && staticcheck . && \
1614
cd ../server && go fmt && deadcode . && staticcheck ."

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ SELECT * FROM [TABLE] WHERE [JSON_COLUMN]->>'[JSON_KEY]' = '[JSON_VALUE]';
323323
- [x] Iceberg tables compaction
324324
- [x] Packaging in a Docker image
325325
- [x] Table compaction without Trino as a dependency
326+
- [x] Materialized views
326327
- [ ] Partitioned tables ([#15](https://github.com/BemiHQ/BemiDB/issues/15))
327328
- [ ] Transformations with dbt ([#25](https://github.com/BemiHQ/BemiDB/issues/25))
328329

docker/bin-test/test.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ SOURCE_POSTGRES_SYNC_MODE=FULL_REFRESH \
2828
go test -v ./...
2929

3030
# Run tests
31-
cd /app/src/syncer-common
31+
cd /app/src/common
3232
go test -v -count=1 ./...
3333
cd /app/src/server
3434
BEMIDB_USER=user \

scripts/catalog.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,11 @@ CREATE TABLE IF NOT EXISTS iceberg_tables (
66
previous_metadata_location VARCHAR(1000),
77
PRIMARY KEY (catalog_name, table_namespace, table_name)
88
);
9+
10+
CREATE TABLE IF NOT EXISTS iceberg_materialized_views (
11+
schema_name VARCHAR(255) NOT NULL,
12+
table_name VARCHAR(255) NOT NULL,
13+
definition TEXT NOT NULL
14+
);
15+
16+
CREATE UNIQUE INDEX IF NOT EXISTS idx_materialized_views ON iceberg_materialized_views (schema_name, table_name);
Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
package syncerCommon
1+
package common
22

33
import (
44
"errors"
55
"io"
66
"sync"
7-
8-
"github.com/BemiHQ/BemiDB/src/common"
97
)
108

119
type CappedBuffer struct {
12-
Config *common.CommonConfig
10+
Config *CommonConfig
1311
MaxSizeBytes int
1412

1513
buffer []byte
@@ -20,7 +18,7 @@ type CappedBuffer struct {
2018
closed bool
2119
}
2220

23-
func NewCappedBuffer(config *common.CommonConfig, maxSizeBytes int) *CappedBuffer {
21+
func NewCappedBuffer(config *CommonConfig, maxSizeBytes int) *CappedBuffer {
2422
sizedBuffer := &CappedBuffer{
2523
Config: config,
2624
buffer: make([]byte, 0, maxSizeBytes),
@@ -44,7 +42,7 @@ func (buf *CappedBuffer) Write(payload []byte) (writtenBytes int, err error) {
4442
}
4543

4644
for len(buf.buffer)+len(payload) > buf.MaxSizeBytes && !buf.closed {
47-
common.LogTrace(buf.Config, ">> Waiting for more space in capped buffer...")
45+
LogTrace(buf.Config, ">> Waiting for more space in capped buffer...")
4846
buf.conditionalSync.Wait() // Wait for the reader
4947
}
5048

@@ -55,7 +53,7 @@ func (buf *CappedBuffer) Write(payload []byte) (writtenBytes int, err error) {
5553

5654
writtenBytes = len(payload)
5755
buf.buffer = append(buf.buffer, payload...)
58-
common.LogTrace(buf.Config, ">> Writing", writtenBytes, "bytes to capped buffer...")
56+
LogTrace(buf.Config, ">> Writing", writtenBytes, "bytes to capped buffer...")
5957

6058
buf.conditionalSync.Broadcast() // Notify the reader that new data is available
6159

@@ -72,7 +70,7 @@ func (buf *CappedBuffer) Read(payload []byte) (readBytes int, err error) {
7270
defer buf.mutex.Unlock()
7371

7472
for len(buf.buffer) == 0 && !buf.closed {
75-
common.LogTrace(buf.Config, "<< Waiting for more data in capped buffer...")
73+
LogTrace(buf.Config, "<< Waiting for more data in capped buffer...")
7674
buf.conditionalSync.Wait() // Wait for the writer
7775
}
7876

@@ -83,7 +81,7 @@ func (buf *CappedBuffer) Read(payload []byte) (readBytes int, err error) {
8381
maxReadBytes := len(payload)
8482
readBytes = copy(payload, buf.buffer)
8583
buf.buffer = buf.buffer[readBytes:]
86-
common.LogTrace(buf.Config, "<< Reading "+common.IntToString(readBytes)+"/"+common.IntToString(maxReadBytes)+" bytes from capped buffer...")
84+
LogTrace(buf.Config, "<< Reading "+IntToString(readBytes)+"/"+IntToString(maxReadBytes)+" bytes from capped buffer...")
8785

8886
buf.conditionalSync.Broadcast() // Notify the writer that space is now available
8987

@@ -94,7 +92,7 @@ func (buf *CappedBuffer) Close() error {
9492
buf.closeOnceSync.Do(func() {
9593
buf.mutex.Lock()
9694

97-
common.LogTrace(buf.Config, "== Closing capped buffer...")
95+
LogTrace(buf.Config, "== Closing capped buffer...")
9896
buf.closed = true
9997

10098
buf.conditionalSync.Broadcast() // Wake up any waiting writers/readers
Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
package syncerCommon
1+
package common
22

33
import (
44
"bytes"
55
"io"
66
"sync"
77
"testing"
88
"time"
9-
10-
"github.com/BemiHQ/BemiDB/src/common"
119
)
1210

13-
func initTestConfig() *common.CommonConfig {
14-
return &common.CommonConfig{
15-
LogLevel: common.LOG_LEVEL_INFO, // Use INFO to avoid excessive logging during tests
11+
func initTestConfig() *CommonConfig {
12+
return &CommonConfig{
13+
LogLevel: LOG_LEVEL_INFO, // Use INFO to avoid excessive logging during tests
1614
}
1715
}
1816

src/common/common_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package common
22

33
const (
4-
VERSION = "1.0.1"
4+
VERSION = "1.1.0"
55

66
ENV_LOG_LEVEL = "BEMIDB_LOG_LEVEL"
77
ENV_DISABLE_ANONYMOUS_ANALYTICS = "BEMIDB_DISABLE_ANONYMOUS_ANALYTICS"

src/common/duckdb_client.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ import (
88
"github.com/marcboeker/go-duckdb/v2"
99
)
1010

11+
var SYNCER_DUCKDB_BOOT_QUERIES = []string{
12+
"SET memory_limit='2GB'",
13+
"SET threads=2",
14+
}
15+
1116
type DuckdbClient struct {
1217
Config *CommonConfig
1318
Db *sql.DB
@@ -88,16 +93,21 @@ func (client *DuckdbClient) ExecContext(ctx context.Context, query string, args
8893
return client.Db.ExecContext(ctx, replaceNamedStringArgs(query, args[0]))
8994
}
9095

91-
func (client *DuckdbClient) ExecTransactionContext(ctx context.Context, queries []string) error {
96+
func (client *DuckdbClient) ExecTransactionContext(ctx context.Context, queries []string, args ...[]map[string]string) error {
9297
tx, err := client.Db.Begin()
9398
LogDebug(client.Config, "Querying DuckDBClient: BEGIN")
9499
if err != nil {
95100
return err
96101
}
97102

98-
for _, query := range queries {
103+
for i, query := range queries {
99104
LogDebug(client.Config, "Querying DuckDBClient:", query)
100-
_, err := tx.ExecContext(ctx, query)
105+
var err error
106+
if len(args) == 0 {
107+
_, err = tx.ExecContext(ctx, query)
108+
} else {
109+
_, err = tx.ExecContext(ctx, replaceNamedStringArgs(query, args[0][i]))
110+
}
101111
if err != nil {
102112
tx.Rollback()
103113
return err
@@ -135,7 +145,11 @@ func (client *DuckdbClient) setExplicitAwsCredentials(ctx context.Context) {
135145

136146
func replaceNamedStringArgs(query string, args map[string]string) string {
137147
for key, value := range args {
138-
query = strings.ReplaceAll(query, "$"+key, value)
148+
query = strings.ReplaceAll(
149+
query,
150+
"$"+key,
151+
strings.ReplaceAll(value, "'", "''"),
152+
)
139153
}
140154
return query
141155
}

0 commit comments

Comments
 (0)