Skip to content

Commit 02575cd

Browse files
committed
Speed up full-refresh syncing and write to Iceberg w/o Trino as a dependency
1 parent 084a90d commit 02575cd

30 files changed

Lines changed: 5716 additions & 210 deletions

img/architecture.png

-1.46 KB
Loading

src/server/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
const (
11-
VERSION = "1.0.0-beta.1"
11+
VERSION = "1.0.0-beta.2"
1212

1313
ENV_PORT = "BEMIDB_PORT"
1414
ENV_DATABASE = "BEMIDB_DATABASE"

src/server/duckdb.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"database/sql"
6-
"regexp"
76
"slices"
87
"strings"
98

@@ -26,6 +25,7 @@ var DUCKDB_INIT_BOOT_QUERIES = []string{
2625
// Configure DuckDB
2726
"SET scalar_subquery_error_on_multiple_rows=false",
2827
"SET timezone='UTC'",
28+
"SET memory_limit='2GB'",
2929
}
3030

3131
type Duckdb struct {
@@ -146,10 +146,8 @@ func (duckdb *Duckdb) setExplicitAwsCredentials(ctx context.Context) {
146146
}
147147

148148
func replaceNamedStringArgs(query string, args map[string]string) string {
149-
re := regexp.MustCompile(`['";]`) // Escape single quotes, double quotes, and semicolons from args
150-
151149
for key, value := range args {
152-
query = strings.ReplaceAll(query, "$"+key, re.ReplaceAllString(value, ""))
150+
query = strings.ReplaceAll(query, "$"+key, value)
153151
}
154152
return query
155153
}

src/server/query_handler_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ func TestHandleQuery(t *testing.T) {
537537
"SELECT float4_column FROM postgres.test_table WHERE float4_column != 3.14": {
538538
"description": {"float4_column"},
539539
"types": {uint32ToString(pgtype.Float4OID)},
540-
"values": {"NaN"},
540+
"values": {"0"},
541541
},
542542
"SELECT float8_column FROM postgres.test_table WHERE bool_column = TRUE": {
543543
"description": {"float8_column"},
@@ -590,12 +590,12 @@ func TestHandleQuery(t *testing.T) {
590590
"values": {"12:00:00.123"},
591591
},
592592
"SELECT timeMsColumn FROM postgres.test_table WHERE timeMsColumn IS NOT NULL": {
593-
"description": {"timemscolumn"},
593+
"description": {"timeMsColumn"},
594594
"types": {uint32ToString(pgtype.TimeOID)},
595595
"values": {"12:00:00.123"},
596596
},
597597
"SELECT timeMsColumn FROM postgres.test_table WHERE timeMsColumn IS NULL": {
598-
"description": {"timemscolumn"},
598+
"description": {"timeMsColumn"},
599599
"types": {uint32ToString(pgtype.TimeOID)},
600600
"values": {""},
601601
},
@@ -672,7 +672,7 @@ func TestHandleQuery(t *testing.T) {
672672
"SELECT interval_column FROM postgres.test_table WHERE interval_column IS NOT NULL": {
673673
"description": {"interval_column"},
674674
"types": {uint32ToString(pgtype.TextOID)},
675-
"values": {"2806201000001us"},
675+
"values": {"1 mon 2 days 01:00:01.000001"},
676676
},
677677
"SELECT interval_column FROM postgres.test_table WHERE interval_column IS NULL": {
678678
"description": {"interval_column"},
@@ -737,7 +737,7 @@ func TestHandleQuery(t *testing.T) {
737737
"SELECT point_column FROM postgres.test_table WHERE point_column IS NOT NULL": {
738738
"description": {"point_column"},
739739
"types": {uint32ToString(pgtype.TextOID)},
740-
"values": {"0101000000D6FFFF5F74AC4240CDFFFFDF44804640"},
740+
"values": {"(37.347301483154,45.002101898193)"},
741741
},
742742
"SELECT point_column FROM postgres.test_table WHERE point_column IS NULL": {
743743
"description": {"point_column"},

src/server/storage_s3.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,49 +25,49 @@ type MetadataJson struct {
2525
}
2626

2727
type StorageS3 struct {
28-
s3Client *s3.Client
29-
config *Config
28+
S3Client *s3.Client
29+
Config *Config
3030
}
3131

32-
func NewS3Storage(config *Config) *StorageS3 {
32+
func NewS3Storage(Config *Config) *StorageS3 {
3333
var awsConfigOptions = []func(*awsConfig.LoadOptions) error{
34-
awsConfig.WithRegion(config.Aws.Region),
34+
awsConfig.WithRegion(Config.Aws.Region),
3535
}
3636

37-
if config.LogLevel == LOG_LEVEL_TRACE {
37+
if Config.LogLevel == LOG_LEVEL_TRACE {
3838
awsConfigOptions = append(awsConfigOptions, awsConfig.WithClientLogMode(aws.LogRequest))
3939
}
4040

41-
if IsLocalHost(config.Aws.S3Endpoint) {
42-
awsConfigOptions = append(awsConfigOptions, awsConfig.WithBaseEndpoint("http://"+config.Aws.S3Endpoint))
41+
if IsLocalHost(Config.Aws.S3Endpoint) {
42+
awsConfigOptions = append(awsConfigOptions, awsConfig.WithBaseEndpoint("http://"+Config.Aws.S3Endpoint))
4343
} else {
44-
awsConfigOptions = append(awsConfigOptions, awsConfig.WithBaseEndpoint("https://"+config.Aws.S3Endpoint))
44+
awsConfigOptions = append(awsConfigOptions, awsConfig.WithBaseEndpoint("https://"+Config.Aws.S3Endpoint))
4545
}
4646

4747
awsCredentials := credentials.NewStaticCredentialsProvider(
48-
config.Aws.AccessKeyId,
49-
config.Aws.SecretAccessKey,
48+
Config.Aws.AccessKeyId,
49+
Config.Aws.SecretAccessKey,
5050
"",
5151
)
5252
awsConfigOptions = append(awsConfigOptions, awsConfig.WithCredentialsProvider(awsCredentials))
5353

5454
loadedAwsConfig, err := awsConfig.LoadDefaultConfig(context.Background(), awsConfigOptions...)
55-
PanicIfError(config, err)
55+
PanicIfError(Config, err)
5656

57-
s3Client := s3.NewFromConfig(loadedAwsConfig, func(o *s3.Options) {
58-
if config.Aws.S3Endpoint != DEFAULT_AWS_S3_ENDPOINT {
57+
S3Client := s3.NewFromConfig(loadedAwsConfig, func(o *s3.Options) {
58+
if Config.Aws.S3Endpoint != DEFAULT_AWS_S3_ENDPOINT {
5959
o.UsePathStyle = true
6060
}
6161
})
6262

6363
return &StorageS3{
64-
s3Client: s3Client,
65-
config: config,
64+
S3Client: S3Client,
65+
Config: Config,
6666
}
6767
}
6868

6969
func (storage *StorageS3) IcebergTableFields(metadataPath string) ([]IcebergTableField, error) {
70-
metadataKey := strings.TrimPrefix(metadataPath, "s3://"+storage.config.Aws.S3Bucket+"/")
70+
metadataKey := strings.TrimPrefix(metadataPath, "s3://"+storage.Config.Aws.S3Bucket+"/")
7171
metadataContent, err := storage.readFileContent(metadataKey)
7272
if err != nil {
7373
return nil, err
@@ -78,8 +78,8 @@ func (storage *StorageS3) IcebergTableFields(metadataPath string) ([]IcebergTabl
7878

7979
func (storage *StorageS3) readFileContent(fileKey string) ([]byte, error) {
8080
ctx := context.Background()
81-
getObjectResponse, err := storage.s3Client.GetObject(ctx, &s3.GetObjectInput{
82-
Bucket: aws.String(storage.config.Aws.S3Bucket),
81+
getObjectResponse, err := storage.S3Client.GetObject(ctx, &s3.GetObjectInput{
82+
Bucket: aws.String(storage.Config.Aws.S3Bucket),
8383
Key: aws.String(fileKey),
8484
})
8585
if err != nil {

src/syncer-amplitude/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,15 @@ func registerFlags() {
4040
_config.BaseConfig = &common.BaseConfig{}
4141

4242
flag.StringVar(&_config.BaseConfig.LogLevel, "log-level", os.Getenv(common.ENV_LOG_LEVEL), `Log level: "ERROR", "WARN", "INFO", "DEBUG", "TRACE". Default: "`+common.DEFAULT_LOG_LEVEL+`"`)
43+
flag.StringVar(&_config.BaseConfig.CatalogDatabaseUrl, "catalog-database-url", os.Getenv(common.ENV_CATALOG_DATABASE_URL), "Catalog database URL")
4344
flag.StringVar(&_config.BaseConfig.DestinationSchemaName, "destination-schema-name", os.Getenv(common.ENV_DESTINATION_SCHEMA_NAME), "Destination schema name to store the synced data")
4445
flag.StringVar(&_config.BaseConfig.Trino.DatabaseUrl, "trino-database-url", os.Getenv(common.ENV_TRINO_DATABASE_URL), "Trino database URL to sync to")
4546
flag.StringVar(&_config.BaseConfig.Trino.CatalogName, "trino-catalog-name", os.Getenv(common.ENV_TRINO_CATALOG_NAME), "Trino catalog name")
47+
flag.StringVar(&_config.BaseConfig.Aws.Region, "aws-region", os.Getenv(common.ENV_AWS_REGION), "AWS region")
4648
flag.StringVar(&_config.BaseConfig.Aws.S3Endpoint, "aws-s3-endpoint", os.Getenv(common.ENV_AWS_S3_ENDPOINT), "AWS S3 endpoint. Default: \""+common.DEFAULT_AWS_S3_ENDPOINT+`"`)
4749
flag.StringVar(&_config.BaseConfig.Aws.S3Bucket, "aws-s3-bucket", os.Getenv(common.ENV_AWS_S3_BUCKET), "AWS S3 bucket name")
50+
flag.StringVar(&_config.BaseConfig.Aws.AccessKeyId, "aws-access-key-id", os.Getenv(common.ENV_AWS_ACCESS_KEY_ID), "AWS access key ID")
51+
flag.StringVar(&_config.BaseConfig.Aws.SecretAccessKey, "aws-secret-access-key", os.Getenv(common.ENV_AWS_SECRET_ACCESS_KEY), "AWS secret access key")
4852
flag.BoolVar(&_config.BaseConfig.DisableAnonymousAnalytics, "disable-anonymous-analytics", os.Getenv(common.ENV_DISABLE_ANONYMOUS_ANALYTICS) == "true", "Disable anonymous analytics collection")
4953

5054
flag.StringVar(&_config.ApiKey, "api-key", os.Getenv(ENV_API_KEY), "Amplitude API Key")
@@ -60,6 +64,9 @@ func parseFlags() {
6064
} else if !slices.Contains(common.LOG_LEVELS, _config.BaseConfig.LogLevel) {
6165
panic("Invalid log level " + _config.BaseConfig.LogLevel + ". Must be one of " + strings.Join(common.LOG_LEVELS, ", "))
6266
}
67+
if _config.BaseConfig.CatalogDatabaseUrl == "" {
68+
panic("Catalog database URL is required")
69+
}
6370
if _config.BaseConfig.DestinationSchemaName == "" {
6471
panic("Destination schema name is required")
6572
}
@@ -69,6 +76,22 @@ func parseFlags() {
6976
if _config.BaseConfig.Trino.CatalogName == "" {
7077
panic("Trino catalog name is required")
7178
}
79+
if _config.BaseConfig.Aws.Region == "" {
80+
panic("AWS region is required")
81+
}
82+
if _config.BaseConfig.Aws.S3Endpoint == "" {
83+
_config.BaseConfig.Aws.S3Endpoint = common.DEFAULT_AWS_S3_ENDPOINT
84+
}
85+
if _config.BaseConfig.Aws.S3Bucket == "" {
86+
panic("AWS S3 bucket name is required")
87+
}
88+
if _config.BaseConfig.Aws.AccessKeyId != "" && _config.BaseConfig.Aws.SecretAccessKey == "" {
89+
panic("AWS secret access key is required")
90+
}
91+
if _config.BaseConfig.Aws.AccessKeyId == "" && _config.BaseConfig.Aws.SecretAccessKey != "" {
92+
panic("AWS access key ID is required")
93+
}
94+
7295
if _config.ApiKey == "" {
7396
panic("Amplitude API key is required")
7497
}

src/syncer-amplitude/go.mod

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,67 @@ require github.com/BemiHQ/BemiDB/src/syncer-common v0.0.1
77
replace github.com/BemiHQ/BemiDB/src/syncer-common => ../syncer-common
88

99
require (
10+
github.com/apache/arrow-go/v18 v18.1.0 // indirect
11+
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
12+
github.com/apache/thrift v0.21.0 // indirect
13+
github.com/aws/aws-sdk-go-v2 v1.36.6 // indirect
14+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect
15+
github.com/aws/aws-sdk-go-v2/config v1.29.18 // indirect
16+
github.com/aws/aws-sdk-go-v2/credentials v1.17.71 // indirect
17+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.33 // indirect
18+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.85 // indirect
19+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.37 // indirect
20+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.37 // indirect
21+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
22+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.37 // indirect
23+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 // indirect
24+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.5 // indirect
25+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.18 // indirect
26+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.18 // indirect
27+
github.com/aws/aws-sdk-go-v2/service/s3 v1.84.1 // indirect
28+
github.com/aws/aws-sdk-go-v2/service/sso v1.25.6 // indirect
29+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.4 // indirect
30+
github.com/aws/aws-sdk-go-v2/service/sts v1.34.1 // indirect
31+
github.com/aws/smithy-go v1.22.4 // indirect
32+
github.com/duckdb/duckdb-go-bindings v0.1.17 // indirect
33+
github.com/duckdb/duckdb-go-bindings/darwin-amd64 v0.1.12 // indirect
34+
github.com/duckdb/duckdb-go-bindings/darwin-arm64 v0.1.12 // indirect
35+
github.com/duckdb/duckdb-go-bindings/linux-amd64 v0.1.12 // indirect
36+
github.com/duckdb/duckdb-go-bindings/linux-arm64 v0.1.12 // indirect
37+
github.com/duckdb/duckdb-go-bindings/windows-amd64 v0.1.12 // indirect
38+
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
39+
github.com/goccy/go-json v0.10.5 // indirect
40+
github.com/golang/snappy v0.0.4 // indirect
41+
github.com/google/flatbuffers v25.1.24+incompatible // indirect
42+
github.com/google/uuid v1.6.0 // indirect
1043
github.com/hashicorp/go-uuid v1.0.3 // indirect
44+
github.com/jackc/pgpassfile v1.0.0 // indirect
45+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
46+
github.com/jackc/pgx/v5 v5.7.5 // indirect
1147
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
1248
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
1349
github.com/jcmturner/gofork v1.7.6 // indirect
1450
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
1551
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
1652
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
53+
github.com/klauspost/compress v1.17.11 // indirect
54+
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
55+
github.com/linkedin/goavro v2.1.0+incompatible // indirect
56+
github.com/marcboeker/go-duckdb/arrowmapping v0.0.10 // indirect
57+
github.com/marcboeker/go-duckdb/mapping v0.0.11 // indirect
58+
github.com/marcboeker/go-duckdb/v2 v2.3.3 // indirect
59+
github.com/pierrec/lz4/v4 v4.1.22 // indirect
1760
github.com/trinodb/trino-go-client v0.323.0 // indirect
18-
golang.org/x/crypto v0.37.0 // indirect
19-
golang.org/x/net v0.24.0 // indirect
61+
github.com/xitongsys/parquet-go v1.6.2 // indirect
62+
github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b // indirect
63+
github.com/zeebo/xxh3 v1.0.2 // indirect
64+
golang.org/x/crypto v0.40.0 // indirect
65+
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect
66+
golang.org/x/mod v0.26.0 // indirect
67+
golang.org/x/net v0.42.0 // indirect
68+
golang.org/x/sync v0.16.0 // indirect
69+
golang.org/x/sys v0.34.0 // indirect
70+
golang.org/x/text v0.27.0 // indirect
71+
golang.org/x/tools v0.35.0 // indirect
72+
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
2073
)

0 commit comments

Comments
 (0)