Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support reading from multiple tables using logical replication slots #113

Merged
merged 10 commits into from
Jan 18, 2024
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ returned.

## Configuration Options

| name | description | required | default |
|---------------------------|-------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. example: `"employees,offices,payments"` | true | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| name | description | required | default |
|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. example: `"employees,offices,payments"` | true | |
| `key` | List of Key column names per table, separated by comma. example:`"table1:key1,table2:key2"`, if not supplied, the table primary key will be used as the `'Key'` field for the records. | false | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |

# Destination

Expand Down
54 changes: 47 additions & 7 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package postgres
import (
"context"
"fmt"
"strings"

"github.com/conduitio/conduit-connector-postgres/source"
"github.com/conduitio/conduit-connector-postgres/source/logrepl"
Expand All @@ -29,9 +30,10 @@ import (
type Source struct {
sdk.UnimplementedSource

iterator source.Iterator
config source.Config
conn *pgx.Conn
iterator source.Iterator
config source.Config
conn *pgx.Conn
KeyColumnMp map[string]string
}

func NewSource() sdk.Source {
Expand All @@ -52,13 +54,30 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}
// todo: when cdcMode "auto" is implemented, change this check
if len(s.config.Table) != 1 && s.config.CDCMode == source.CDCModeLongPolling {
return fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table")
}
s.KeyColumnMp = make(map[string]string, len(s.config.Table))
for _, pair := range s.config.Key {
// Split each pair into key and value
parts := strings.Split(pair, ":")
if len(parts) != 2 {
return fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key")
}
s.KeyColumnMp[parts[0]] = parts[1]
}
return nil
}
func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
conn, err := pgx.Connect(ctx, s.config.URL)
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
columns, err := s.getTableColumns(conn)
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
s.conn = conn

switch s.config.CDCMode {
Expand All @@ -78,8 +97,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
Tables: s.config.Table,
KeyColumnName: s.config.Key,
Columns: s.config.Columns,
KeyColumnMp: s.KeyColumnMp,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand All @@ -97,8 +115,8 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
ctx,
s.conn,
s.config.Table[0], //todo: only the first table for now
s.config.Columns,
s.config.Key)
columns,
s.KeyColumnMp[s.config.Table[0]])
if err != nil {
return fmt.Errorf("failed to create long polling iterator: %w", err)
}
Expand Down Expand Up @@ -131,3 +149,25 @@ func (s *Source) Teardown(ctx context.Context) error {
}
return nil
}

func (s *Source) getTableColumns(conn *pgx.Conn) ([]string, error) {
query := fmt.Sprintf("SELECT column_name FROM information_schema.columns WHERE table_name = '%s'", s.config.Table[0])

rows, err := conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

var columns []string
for rows.Next() {
var columnName string
err := rows.Scan(&columnName)
if err != nil {
return nil, err
}
columns = append(columns, columnName)
}

return columns, nil
}
6 changes: 2 additions & 4 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ type Config struct {
URL string `json:"url" validate:"required"`
// List of table names to read from, separated by a comma.
Table []string `json:"table" validate:"required"`
// todo: reove param, Comma separated list of column names that should be included in each Record's payload.
Columns []string `json:"columns"`
// todo: remove param, Column name that records should use for their `Key` fields.
Key string `json:"key"`
// list of Key column names per table ex:"table1:key1,table2:key2", records should use the key values for their `Key` fields.
Key []string `json:"key"`

// Whether or not the plugin will take a snapshot of the entire table before starting cdc mode.
SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"`
Expand Down
24 changes: 13 additions & 11 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ type Config struct {
SlotName string
PublicationName string
Tables []string
KeyColumnName string
Columns []string
KeyColumnMp map[string]string
}

// CDCIterator asynchronously listens for events from the logical replication
Expand Down Expand Up @@ -155,12 +154,16 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er
}

var err error
keyColumnMp := make(map[string]string, len(i.config.Tables))
if i.config.KeyColumnMp == nil {
i.config.KeyColumnMp = make(map[string]string, len(i.config.Tables))
}
for _, tableName := range i.config.Tables {
// Call function and store the result in the map
keyColumnMp[tableName], err = i.getKeyColumn(ctx, conn, tableName)
if err != nil {
return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err)
// get unprovided table keys
if _, ok := i.config.KeyColumnMp[tableName]; !ok {
i.config.KeyColumnMp[tableName], err = i.getKeyColumn(ctx, conn, tableName)
if err != nil {
return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err)
}
}
}

Expand All @@ -172,8 +175,7 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er
lsn,
NewCDCHandler(
internal.NewRelationSet(conn.ConnInfo()),
keyColumnMp,
i.config.Columns, // todo, delete this option, use processors instead
i.config.KeyColumnMp,
i.records,
).Handle,
)
Expand All @@ -185,8 +187,8 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er
// getKeyColumn queries the db for the name of the primary key column for a
// table if one exists and returns it.
func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) {
if i.config.KeyColumnName != "" {
return i.config.KeyColumnName, nil
if i.config.KeyColumnMp[tableName] != "" {
return i.config.KeyColumnMp[tableName], nil
}

query := `SELECT column_name
Expand Down
17 changes: 2 additions & 15 deletions source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,17 @@ import (
// converting them to a record and sending them to a channel.
type CDCHandler struct {
keyColumnMp map[string]string
columns map[string]bool // columns can be used to filter only specific columns
relationSet *internal.RelationSet
out chan<- sdk.Record
}

func NewCDCHandler(
rs *internal.RelationSet,
keyColumnMp map[string]string,
columns []string,
out chan<- sdk.Record,
) *CDCHandler {
var columnSet map[string]bool
if len(columns) > 0 {
columnSet = make(map[string]bool)
for _, col := range columns {
columnSet[col] = true
}
}
return &CDCHandler{
keyColumnMp: keyColumnMp,
columns: columnSet,
relationSet: rs,
out: out,
}
Expand Down Expand Up @@ -210,11 +200,8 @@ func (h *CDCHandler) buildRecordPayload(values map[string]pgtype.Value) sdk.Data
}
payload := make(sdk.StructuredData)
for k, v := range values {
// filter columns if columns are specified
if h.columns == nil || h.columns[k] {
value := v.Get()
payload[k] = value
}
value := v.Get()
payload[k] = value
}
return payload
}
2 changes: 1 addition & 1 deletion source/longpoll/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type SnapshotIterator struct {
// * NewSnapshotIterator attempts to load the sql rows into the SnapshotIterator and will
// immediately begin to return them to subsequent Read calls.
// * It acquires a read only transaction lock before reading the table.
// * If Teardown is called while a snpashot is in progress, it will return an
// * If Teardown is called while a snapshot is in progress, it will return an
// ErrSnapshotInterrupt error.
func NewSnapshotIterator(ctx context.Context, conn *pgx.Conn, table string, columns []string, key string) (*SnapshotIterator, error) {
s := &SnapshotIterator{
Expand Down
6 changes: 0 additions & 6 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.