Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New snapshot iterator and position format #132

Merged
merged 21 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ test:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose.yml down; \
docker compose -f test/docker-compose.yml down --volumes; \
exit $$ret

.PHONY: lint
lint:
golangci-lint run
golangci-lint run -v

.PHONY: generate
generate:
go generate ./...

.PHONY: fmt
fmt:
gofumpt -l -w .
gci write --skip-generated .

.PHONY: install-tools
install-tools:
@echo Installing tools from tools.go
Expand Down
121 changes: 61 additions & 60 deletions destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,75 +50,76 @@ func TestDestination_Write(t *testing.T) {
tests := []struct {
name string
record sdk.Record
}{{
name: "snapshot",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 123,
"column3": true,
}{
{
name: "snapshot",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 123,
"column3": true,
},
},
},
},
}, {
name: "create",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 456,
"column3": false,
}, {
name: "create",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 456,
"column3": false,
},
},
},
},
}, {
name: "insert on update (upsert)",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "bar",
"column2": 567,
"column3": true,
}, {
name: "insert on update (upsert)",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "bar",
"column2": 567,
"column3": true,
},
},
},
},
}, {
name: "update on conflict",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foobar",
"column2": 567,
"column3": true,
}, {
name: "update on conflict",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foobar",
"column2": 567,
"column3": true,
},
},
},
}, {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
},
}, {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
module github.com/conduitio/conduit-connector-postgres

go 1.21
go 1.22

require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/Masterminds/squirrel v1.5.4
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/daixiang0/gci v0.12.3
github.com/golangci/golangci-lint v1.57.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/matryer/is v1.4.1
golang.org/x/tools v0.19.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
mvdan.cc/gofumpt v0.6.0
)

require (
Expand Down Expand Up @@ -50,7 +55,6 @@ require (
github.com/ckaznocha/intrange v0.1.1 // indirect
github.com/conduitio/conduit-connector-protocol v0.5.0 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
github.com/daixiang0/gci v0.12.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.5.0 // indirect
github.com/ettle/strcase v0.2.0 // indirect
Expand Down Expand Up @@ -80,7 +84,6 @@ require (
github.com/golangci/plugin-module-register v0.1.1 // indirect
github.com/golangci/revgrep v0.5.2 // indirect
github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gordonklaus/ineffassign v0.1.0 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
Expand Down Expand Up @@ -209,15 +212,12 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.4.7 // indirect
mvdan.cc/gofumpt v0.6.0 // indirect
mvdan.cc/unparam v0.0.0-20240104100049-c549a3470d14 // indirect
)
1 change: 1 addition & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
}
return nil
}

func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
conn, err := pgx.Connect(ctx, s.config.URL)
if err != nil {
Expand Down
73 changes: 37 additions & 36 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,44 @@ func TestConfig_Validate(t *testing.T) {
name string
cfg Config
wantErr bool
}{{
name: "valid config",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
}{
{
name: "valid config",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: false,
}, {
name: "invalid postgres url",
cfg: Config{
URL: "postgresql",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
}, {
name: "invalid multiple tables for long polling",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLongPolling,
},
wantErr: true,
}, {
name: "invalid key list format",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"key1,key2"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
},
wantErr: false,
}, {
name: "invalid postgres url",
cfg: Config{
URL: "postgresql",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
}, {
name: "invalid multiple tables for long polling",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLongPolling,
},
wantErr: true,
}, {
name: "invalid key list format",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"key1,key2"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion source/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/conduitio/conduit-connector-postgres/source/logrepl"
"github.com/conduitio/conduit-connector-postgres/source/longpoll"

sdk "github.com/conduitio/conduit-connector-sdk"
)

Expand Down
3 changes: 2 additions & 1 deletion source/logrepl/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) s

// creates a snapshot iterator for testing that hands its connection's cleanup.
func createTestSnapshotIterator(ctx context.Context, t *testing.T,
pool *pgxpool.Pool, cfg SnapshotConfig) *SnapshotIterator {
pool *pgxpool.Pool, cfg SnapshotConfig,
) *SnapshotIterator {
is := is.New(t)

conn, err := pool.Acquire(ctx)
Expand Down
Loading