Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add connector logs to tests #220

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestDestination_Write(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctx := test.Context(t)
conn := test.ConnectSimple(ctx, t, test.RegularConnString)
tableName := test.SetupTestTable(ctx, t, conn)

Expand Down Expand Up @@ -144,7 +144,7 @@ func TestDestination_Write(t *testing.T) {

func TestDestination_Batch(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctx := test.Context(t)
conn := test.ConnectSimple(ctx, t, test.RegularConnString)
tableName := test.SetupTestTable(ctx, t, conn)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.7.1
github.com/matryer/is v1.4.1
github.com/rs/zerolog v1.33.0
golang.org/x/tools v0.26.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
mvdan.cc/gofumpt v0.7.0
Expand Down Expand Up @@ -163,7 +164,6 @@ require (
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/ryancurrah/gomodguard v1.3.5 // indirect
github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
Expand Down
15 changes: 8 additions & 7 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

func TestCDCIterator_New(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)

tests := []struct {
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestCDCIterator_New(t *testing.T) {
}

func TestCDCIterator_Next(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
is := is.New(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestCDCIterator_Next(t *testing.T) {
}

func TestCDCIterator_Next_Fail(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestCDCIterator_Next_Fail(t *testing.T) {
}

func TestCDCIterator_EnsureLSN(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
is := is.New(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
Expand Down Expand Up @@ -416,7 +416,7 @@ func TestCDCIterator_EnsureLSN(t *testing.T) {
}

func TestCDCIterator_Ack(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)

tests := []struct {
name string
Expand Down Expand Up @@ -501,7 +501,8 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LSN, pglogrepl.LSN, error) {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
ctx := test.Context(t)
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
defer cancel()

var writeLSN, flushLSN pglogrepl.LSN
Expand All @@ -522,7 +523,7 @@ func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LS
}

func TestCDCIterator_Schema(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)
Expand Down
5 changes: 3 additions & 2 deletions source/logrepl/combined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestConfig_Validate(t *testing.T) {
}

func TestCombinedIterator_New(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)

Expand Down Expand Up @@ -137,7 +137,8 @@ func TestCombinedIterator_New(t *testing.T) {
}

func TestCombinedIterator_Next(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
ctx := test.Context(t)
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

is := is.New(t)
Expand Down
7 changes: 3 additions & 4 deletions source/logrepl/internal/publication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package internal

import (
"context"
"fmt"
"strings"
"testing"
Expand All @@ -25,7 +24,7 @@ import (
)

func TestCreatePublication(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
pool := test.ConnectPool(ctx, t, test.RegularConnString)

pubNames := []string{"testpub", "123", "test-hyphen", "test=equal"}
Expand Down Expand Up @@ -71,7 +70,7 @@ func TestCreatePublication(t *testing.T) {
}

func TestCreatePublicationForTables(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
pub := test.RandomIdentifier(t)
pool := test.ConnectPool(ctx, t, test.RegularConnString)

Expand Down Expand Up @@ -100,7 +99,7 @@ func TestCreatePublicationForTables(t *testing.T) {
}

func TestDropPublication(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
is := is.New(t)
pub := test.RandomIdentifier(t)

Expand Down
2 changes: 1 addition & 1 deletion source/logrepl/internal/relationset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestRelationSetAllTypes(t *testing.T) {
// any machine (CI or local)
time.Local = nil

ctx := context.Background()
ctx := test.Context(t)
is := is.New(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
Expand Down
3 changes: 1 addition & 2 deletions source/logrepl/internal/replication_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package internal

import (
"context"
"errors"
"fmt"
"testing"
Expand All @@ -26,7 +25,7 @@ import (

func Test_ReadReplicationSlot(t *testing.T) {
var (
ctx = context.Background()
ctx = test.Context(t)
pool = test.ConnectPool(ctx, t, test.RepmgrConnString)
slotName = test.RandomIdentifier(t)
)
Expand Down
7 changes: 4 additions & 3 deletions source/logrepl/internal/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func TestSubscription_Create(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
is := is.New(t)
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
pool.Close()
Expand All @@ -40,7 +40,7 @@ func TestSubscription_Create(t *testing.T) {

func TestSubscription_WithRepmgr(t *testing.T) {
var (
ctx = context.Background()
ctx = test.Context(t)
pool = test.ConnectPool(ctx, t, test.RepmgrConnString)
table1 = test.SetupTestTable(ctx, t, pool)
table2 = test.SetupTestTable(ctx, t, pool)
Expand Down Expand Up @@ -150,7 +150,8 @@ func TestSubscription_WithRepmgr(t *testing.T) {
}

func TestSubscription_ClosedContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx := test.Context(t)
ctx, cancel := context.WithCancel(ctx)

var (
is = is.New(t)
Expand Down
2 changes: 1 addition & 1 deletion source/schema/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

func Test_AvroExtract(t *testing.T) {
ctx := context.Background()
ctx := test.Context(t)
is := is.New(t)

c := test.ConnectSimple(ctx, t, test.RegularConnString)
Expand Down
15 changes: 8 additions & 7 deletions source/snapshot/fetch_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func Test_FetchConfigValidate(t *testing.T) {

func Test_FetcherValidate(t *testing.T) {
var (
ctx = context.Background()
ctx = test.Context(t)
pool = test.ConnectPool(ctx, t, test.RegularConnString)
table = test.SetupTestTable(ctx, t, pool)
)
Expand Down Expand Up @@ -207,7 +207,7 @@ func Test_FetcherRun_Initial(t *testing.T) {
table = test.SetupTestTable(context.Background(), t, pool)
is = is.New(t)
out = make(chan FetchData)
ctx = context.Background()
ctx = test.Context(t)
tt = &tomb.Tomb{}
)

Expand Down Expand Up @@ -262,7 +262,7 @@ func Test_FetcherRun_Resume(t *testing.T) {
table = test.SetupTestTable(context.Background(), t, pool)
is = is.New(t)
out = make(chan FetchData)
ctx = context.Background()
ctx = test.Context(t)
tt = &tomb.Tomb{}
)

Expand Down Expand Up @@ -320,7 +320,7 @@ func Test_FetcherRun_Resume(t *testing.T) {
func Test_withSnapshot(t *testing.T) {
var (
is = is.New(t)
ctx = context.Background()
ctx = test.Context(t)
pool = test.ConnectPool(ctx, t, test.RegularConnString)
)

Expand Down Expand Up @@ -388,7 +388,8 @@ func Test_withSnapshot(t *testing.T) {
func Test_send(t *testing.T) {
is := is.New(t)

ctx, cancel := context.WithCancel(context.Background())
ctx := test.Context(t)
ctx, cancel := context.WithCancel(ctx)
f := FetchWorker{conf: FetchConfig{}}

cancel()
Expand Down Expand Up @@ -426,7 +427,7 @@ func Test_FetchWorker_buildRecordData(t *testing.T) {
func Test_FetchWorker_updateSnapshotEnd(t *testing.T) {
var (
is = is.New(t)
ctx = context.Background()
ctx = test.Context(t)
pool = test.ConnectPool(ctx, t, test.RegularConnString)
table = test.SetupTestTable(ctx, t, pool)
)
Expand Down Expand Up @@ -485,7 +486,7 @@ func Test_FetchWorker_createCursor(t *testing.T) {
pool = test.ConnectPool(context.Background(), t, test.RegularConnString)
table = test.SetupTestTable(context.Background(), t, pool)
is = is.New(t)
ctx = context.Background()
ctx = test.Context(t)
)

f := FetchWorker{
Expand Down
2 changes: 1 addition & 1 deletion source/snapshot/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func Test_Iterator_Next(t *testing.T) {
var (
ctx = context.Background()
ctx = test.Context(t)
pool = test.ConnectPool(ctx, t, test.RegularConnString)
table = test.SetupTestTable(ctx, t, pool)
)
Expand Down
2 changes: 1 addition & 1 deletion source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func TestSource_Open(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctx := test.Context(t)
conn := test.ConnectSimple(ctx, t, test.RepmgrConnString)
tableName := test.SetupTestTable(ctx, t, conn)
slotName := "conduitslot1"
Expand Down
7 changes: 7 additions & 0 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/matryer/is"
"github.com/rs/zerolog"
)

// RepmgrConnString is a replication user connection string for the test postgres.
Expand Down Expand Up @@ -250,3 +251,9 @@ func IsPgError(is *is.I, err error, wantCode string) {
is.True(ok) // expected err to be a *pgconn.PgError
is.Equal(pgerr.Code, wantCode)
}

func Context(t *testing.T) context.Context {
writer := zerolog.NewTestWriter(t)
logger := zerolog.New(writer).Level(zerolog.InfoLevel)
return logger.WithContext(context.Background())
}