diff --git a/pkg/decode/decoder.go b/pkg/decode/decoder.go index 2ef9dcf..9882115 100644 --- a/pkg/decode/decoder.go +++ b/pkg/decode/decoder.go @@ -1,6 +1,8 @@ package decode -import "github.com/replicase/pgcapture/pkg/pb" +import ( + "github.com/replicase/pgcapture/pkg/pb" +) const ( ExtensionSchema = "pgcapture" @@ -50,3 +52,67 @@ func IsDDL(m *pb.Change) bool { func Ignore(m *pb.Change) bool { return m.Schema == ExtensionSchema && m.Table == ExtensionSources } + +func makeOldPBTuple(schema *PGXSchemaLoader, rel Relation, src []Field, noNull bool) (fields []*pb.Field) { + if src == nil { + return nil + } + fields = make([]*pb.Field, 0, len(src)) + for i, s := range src { + if noNull && s.Datum == nil { + continue + } + typeInfo, err := schema.GetTypeInfo(rel.NspName, rel.RelName, rel.Fields[i]) + if err != nil { + // TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view + continue + } + switch s.Format { + case 'b': + fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Binary{Binary: s.Datum}}) + case 'n': + fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: nil}) + case 't': + fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Text{Text: string(s.Datum)}}) + case 'u': + continue // unchanged toast field should be excluded + } + } + return fields +} + +func makeNewPBTuple(schema *PGXSchemaLoader, rel Relation, old, new []Field, noNull bool) (fields []*pb.Field) { + if new == nil { + return nil + } + fields = make([]*pb.Field, 0, len(new)) + for i, s := range new { + if noNull && s.Datum == nil { + continue + } + typeInfo, err := schema.GetTypeInfo(rel.NspName, rel.RelName, rel.Fields[i]) + if err != nil { + // TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view + continue + } + ReAppend: + switch s.Format { + case 'b': + fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Binary{Binary: s.Datum}}) + case 'n': + fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: nil}) + case 't': + fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Text{Text: string(s.Datum)}}) + case 'u': + // fill the unchanged field with old value when ReplicaIdentity is full + // otherwise, skip the unchanged field + if typeInfo.ReplicaIdentity == ReplicaIdentityFull && old[i].Format != 'u' { + s.Format = old[i].Format + s.Datum = old[i].Datum + goto ReAppend + } + continue + } + } + return fields +} diff --git a/pkg/decode/main_test.go b/pkg/decode/main_test.go index bd1b9f7..11ac44a 100644 --- a/pkg/decode/main_test.go +++ b/pkg/decode/main_test.go @@ -3,6 +3,7 @@ package decode import ( "bytes" "context" + "fmt" "strings" "github.com/jackc/pgx/v5" @@ -27,9 +28,10 @@ func (c *change) Apply(ctx context.Context, conn *pgx.Conn) (err error) { fmts[i] = 1 } + table := c.Expect.Table switch c.Expect.Op { case pb.Change_INSERT: - _, err = conn.PgConn().ExecParams(ctx, "insert into t values ($1,$2,$3,$4,$5,$6)", vals, oids, fmts, fmts).Close() + _, err = conn.PgConn().ExecParams(ctx, fmt.Sprintf("insert into %s values ($1,$2,$3,$4,$5,$6)", table), vals, oids, fmts, fmts).Close() case pb.Change_UPDATE: if c.Expect.Old != nil { vals[5] = c.Expect.Old[0].GetBinary() @@ -39,12 +41,12 @@ func (c *change) Apply(ctx context.Context, conn *pgx.Conn) (err error) { oids[5] = c.Expect.New[0].Oid } fmts[5] = 1 - _, err = conn.PgConn().ExecParams(ctx, "update t set id=$1,uid=$2,txt=$3,js=$4,ts=$5 where id=$6", vals, oids, fmts, fmts).Close() + _, err = conn.PgConn().ExecParams(ctx, fmt.Sprintf("update %s set id=$1,uid=$2,txt=$3,js=$4,ts=$5 where id=$6", table), vals, oids, fmts, fmts).Close() case pb.Change_DELETE: vals[0] = c.Expect.Old[0].GetBinary() oids[0] = c.Expect.Old[0].Oid fmts[0] = 1 - _, err = conn.PgConn().ExecParams(ctx, "delete from t where id=$1", vals[:1], oids[:1], fmts[:1], fmts[:1]).Close() + _, err = conn.PgConn().ExecParams(ctx, fmt.Sprintf("delete from %s where id=$1", table), vals[:1], oids[:1], fmts[:1], fmts[:1]).Close() } return err } diff --git a/pkg/decode/pglogical.go b/pkg/decode/pglogical.go index b2c0686..b6ec17b 100644 --- a/pkg/decode/pglogical.go +++ b/pkg/decode/pglogical.go @@ -62,8 +62,8 @@ func (p *PGLogicalDecoder) Decode(in []byte) (m *pb.Message, err error) { } c := &pb.Change{Schema: rel.NspName, Table: rel.RelName, Op: OpMap[in[0]]} - c.Old = p.makePBTuple(rel, r.Old, true) - c.New = p.makePBTuple(rel, r.New, false) + c.Old = makeOldPBTuple(p.schema, rel, r.Old, true) + c.New = makeNewPBTuple(p.schema, rel, r.Old, r.New, false) if len(c.Old) != 0 || len(c.New) != 0 { return &pb.Message{Type: &pb.Message_Change{Change: c}}, nil @@ -78,34 +78,6 @@ func (p *PGLogicalDecoder) GetPluginArgs() []string { return p.pluginArgs } -func (p *PGLogicalDecoder) makePBTuple(rel Relation, src []Field, noNull bool) (fields []*pb.Field) { - if src == nil { - return nil - } - fields = make([]*pb.Field, 0, len(src)) - for i, s := range src { - if noNull && s.Datum == nil { - continue - } - oid, err := p.schema.GetTypeOID(rel.NspName, rel.RelName, rel.Fields[i]) - if err != nil { - // TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view - continue - } - switch s.Format { - case 'b': - fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Binary{Binary: s.Datum}}) - case 'n': - fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: nil}) - case 't': - fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Text{Text: string(s.Datum)}}) - case 'u': - continue // unchanged toast field should be excluded - } - } - return fields -} - func (p *PGLogicalDecoder) ReadBegin(in []byte) (*pb.Message, error) { if len(in) != 1+1+8+8+4 { return nil, errors.New("begin wrong length") diff --git a/pkg/decode/pglogical_test.go b/pkg/decode/pglogical_test.go index 776a7de..1aa09d5 100644 --- a/pkg/decode/pglogical_test.go +++ b/pkg/decode/pglogical_test.go @@ -29,7 +29,9 @@ func TestPGLogicalDecoder(t *testing.T) { defer conn.Close(ctx) conn.Exec(ctx, "DROP SCHEMA public CASCADE; CREATE SCHEMA public; CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";") - conn.Exec(ctx, "CREATE TABLE t (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)") + conn.Exec(ctx, "CREATE TABLE t1 (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)") + conn.Exec(ctx, "CREATE TABLE t2 (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)") + conn.Exec(ctx, "ALTER TABLE t2 REPLICA IDENTITY FULL") conn.Exec(ctx, fmt.Sprintf("SELECT pg_drop_replication_slot('%s')", TestSlot)) conn.Exec(ctx, sql.CreateLogicalSlot, TestSlot, PGLogicalOutputPlugin) @@ -45,7 +47,7 @@ func TestPGLogicalDecoder(t *testing.T) { now := time.Now() changes := []*change{ { - Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t", + Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t1", New: []*pb.Field{ {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}}, @@ -57,7 +59,7 @@ func TestPGLogicalDecoder(t *testing.T) { }, }, { - Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t", + Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t1", New: []*pb.Field{ {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(2, pgtype.Int8OID)}}, {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("3e89ee8c-3657-4103-99a7-680292a0c22c"), pgtype.UUIDOID)}}, @@ -69,7 +71,7 @@ func TestPGLogicalDecoder(t *testing.T) { }, }, { - Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t", + Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t1", New: []*pb.Field{ {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}}, @@ -80,7 +82,7 @@ func TestPGLogicalDecoder(t *testing.T) { }, }, { - Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t", + Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t1", New: []*pb.Field{ {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(3, pgtype.Int8OID)}}, {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("f0d3ad8e-709f-4f67-9860-e149c671d82a"), pgtype.UUIDOID)}}, @@ -94,19 +96,62 @@ func TestPGLogicalDecoder(t *testing.T) { }, }, { - Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t", + Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t1", Old: []*pb.Field{ {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(3, pgtype.Int8OID)}}, }, }, }, { - Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t", + Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t1", Old: []*pb.Field{ {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, }, }, }, + { + Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t2", + New: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}}, + {Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + }, + }, + { + Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t2", + New: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}}, + {Name: "txt", Oid: 25, Value: nil}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + Old: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}}, + {Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + }, + }, + { + Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t2", + Old: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + }, + }, } for _, change := range changes { if err = change.Apply(ctx, conn); err != nil { @@ -165,7 +210,8 @@ recv: } case 2: if c := m.GetChange(); c == nil || !proto.Equal(c, changes[(count-2)/3].Expect) { - t.Fatalf("unexpected %v", m.String()) + fmt.Println(count) + t.Fatalf("unexpected %v\n %v", m.String(), changes[(count-2)/3].Expect.String()) } } } diff --git a/pkg/decode/pgoutput.go b/pkg/decode/pgoutput.go index 4ba045d..0484e72 100644 --- a/pkg/decode/pgoutput.go +++ b/pkg/decode/pgoutput.go @@ -52,8 +52,8 @@ func (p *PGOutputDecoder) Decode(in []byte) (m *pb.Message, err error) { } c := &pb.Change{Schema: rel.NspName, Table: rel.RelName, Op: OpMap[in[0]]} - c.Old = p.makePBTuple(rel, r.Old, true) - c.New = p.makePBTuple(rel, r.New, false) + c.Old = makeOldPBTuple(p.schema, rel, r.Old, true) + c.New = makeNewPBTuple(p.schema, rel, r.Old, r.New, false) if len(c.Old) != 0 || len(c.New) != 0 { return &pb.Message{Type: &pb.Message_Change{Change: c}}, nil @@ -68,34 +68,6 @@ func (p *PGOutputDecoder) GetPluginArgs() []string { return p.pluginArgs } -func (p *PGOutputDecoder) makePBTuple(rel Relation, src []Field, noNull bool) (fields []*pb.Field) { - if src == nil { - return nil - } - fields = make([]*pb.Field, 0, len(src)) - for i, s := range src { - if noNull && s.Datum == nil { - continue - } - oid, err := p.schema.GetTypeOID(rel.NspName, rel.RelName, rel.Fields[i]) - if err != nil { - // TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view - continue - } - switch s.Format { - case 'b': - fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Binary{Binary: s.Datum}}) - case 'n': - fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: nil}) - case 't': - fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Text{Text: string(s.Datum)}}) - case 'u': - continue // unchanged toast field should be excluded - } - } - return fields -} - func (p *PGOutputDecoder) ReadBegin(in []byte) (*pb.Message, error) { if len(in) != 1+1+8+8+3 { return nil, errors.New("begin wrong length") diff --git a/pkg/decode/pgoutput_test.go b/pkg/decode/pgoutput_test.go index 25d0386..f37c239 100644 --- a/pkg/decode/pgoutput_test.go +++ b/pkg/decode/pgoutput_test.go @@ -30,6 +30,8 @@ func TestPGOutputDecoder(t *testing.T) { conn.Exec(ctx, "DROP SCHEMA public CASCADE; CREATE SCHEMA public; CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";") conn.Exec(ctx, "CREATE TABLE t (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)") + conn.Exec(ctx, "CREATE TABLE t2 (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)") + conn.Exec(ctx, "ALTER TABLE t2 REPLICA IDENTITY FULL") conn.Exec(ctx, fmt.Sprintf("SELECT pg_drop_replication_slot('%s')", TestSlot)) conn.Exec(ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES;", TestSlot)) conn.Exec(ctx, sql.CreateLogicalSlot, TestSlot, PGOutputPlugin) @@ -106,6 +108,49 @@ func TestPGOutputDecoder(t *testing.T) { }, }, }, + { + Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t2", + New: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}}, + {Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + }, + }, + { + Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t2", + New: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}}, + {Name: "txt", Oid: 25, Value: nil}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + Old: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}}, + {Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + }, + }, + { + Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t2", + Old: []*pb.Field{ + {Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}}, + {Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}}, + {Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}}, + {Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}}, + {Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}}, + }, + }, + }, } for _, change := range changes { if err = change.Apply(ctx, conn); err != nil { diff --git a/pkg/decode/schema.go b/pkg/decode/schema.go index 7b71cfc..ef54b90 100644 --- a/pkg/decode/schema.go +++ b/pkg/decode/schema.go @@ -97,7 +97,21 @@ func (i ColumnInfo) Filter(fields []*pb.Field, fieldSelector fieldSelector) (fie return set, fFields } -type TypeCache map[string]map[string]map[string]uint32 +type ReplicaIdentity rune + +const ( + ReplicaIdentityDefault ReplicaIdentity = 'd' + ReplicaIdentityFull ReplicaIdentity = 'f' + ReplicaIdentityIndex ReplicaIdentity = 'i' + ReplicaIdentityNothing ReplicaIdentity = 'n' +) + +type TypeInfo struct { + OID uint32 + ReplicaIdentity ReplicaIdentity +} + +type TypeCache map[string]map[string]map[string]TypeInfo type KeysCache map[string]map[string]ColumnInfo func NewPGXSchemaLoader(conn *pgx.Conn) *PGXSchemaLoader { @@ -117,23 +131,29 @@ func (p *PGXSchemaLoader) RefreshType() error { } defer rows.Close() - var nspname, relname, attname string - var atttypid uint32 + var ( + nspname, relname, attname string + atttypid uint32 + relreplident ReplicaIdentity + ) for rows.Next() { - if err := rows.Scan(&nspname, &relname, &attname, &atttypid); err != nil { + if err := rows.Scan(&nspname, &relname, &attname, &atttypid, &relreplident); err != nil { return err } tbls, ok := p.types[nspname] if !ok { - tbls = make(map[string]map[string]uint32) + tbls = make(map[string]map[string]TypeInfo) p.types[nspname] = tbls } cols, ok := tbls[relname] if !ok { - cols = make(map[string]uint32) + cols = make(map[string]TypeInfo) tbls[relname] = cols } - cols[attname] = atttypid + cols[attname] = TypeInfo{ + OID: atttypid, + ReplicaIdentity: relreplident, + } } return nil } @@ -170,15 +190,16 @@ func (p *PGXSchemaLoader) RefreshColumnInfo() error { return nil } -func (p *PGXSchemaLoader) GetTypeOID(namespace, table, field string) (oid uint32, err error) { +func (p *PGXSchemaLoader) GetTypeInfo(namespace, table, field string) (*TypeInfo, error) { if tbls, ok := p.types[namespace]; !ok { - return 0, fmt.Errorf("%s.%s %w", namespace, table, ErrSchemaTableMissing) + return nil, fmt.Errorf("%s.%s %w", namespace, table, ErrSchemaNamespaceMissing) } else if cols, ok := tbls[table]; !ok { - return 0, fmt.Errorf("%s.%s %w", namespace, table, ErrSchemaTableMissing) - } else if oid, ok = cols[field]; !ok { - return 0, fmt.Errorf("%s.%s.%s %w", namespace, table, field, ErrSchemaColumnMissing) + return nil, fmt.Errorf("%s.%s %w", namespace, table, ErrSchemaTableMissing) + } else if typeInfo, ok := cols[field]; !ok { + return nil, fmt.Errorf("%s.%s.%s %w", namespace, table, field, ErrSchemaColumnMissing) + } else { + return &typeInfo, nil } - return oid, nil } func (p *PGXSchemaLoader) GetColumnInfo(namespace, table string) (*ColumnInfo, error) { @@ -214,7 +235,8 @@ func (p *PGXSchemaLoader) GetVersion() (version int64, err error) { } var ( - ErrSchemaTableMissing = errors.New("table missing") - ErrSchemaColumnMissing = errors.New("column missing") - ErrSchemaIdentityMissing = errors.New("table identity keys missing") + ErrSchemaNamespaceMissing = errors.New("namespace missing") + ErrSchemaTableMissing = errors.New("table missing") + ErrSchemaColumnMissing = errors.New("column missing") + ErrSchemaIdentityMissing = errors.New("table identity keys missing") ) diff --git a/pkg/decode/schema_test.go b/pkg/decode/schema_test.go index 8eb8133..30a9e4d 100644 --- a/pkg/decode/schema_test.go +++ b/pkg/decode/schema_test.go @@ -24,7 +24,7 @@ func TestSchemaLoader(t *testing.T) { schema := NewPGXSchemaLoader(conn) - t.Run("GetTypeOID", func(t *testing.T) { + t.Run("GetTypeInfo", func(t *testing.T) { var columns []string for _, name := range pgTypeNames { if strings.HasPrefix("name", "_") { @@ -46,20 +46,22 @@ func TestSchemaLoader(t *testing.T) { if !ok { t.Fatalf("%s type missing from pgx", name) } - if oid, err := schema.GetTypeOID("public", "t", name); err != nil { - t.Fatalf("GetTypeOID fail: %v", err) - } else if oid != dt.OID { - t.Fatalf("GetTypeOID OID mismatch: %s %v %v", name, oid, dt.OID) + if typeInfo, err := schema.GetTypeInfo("public", "t", name); err != nil { + t.Fatalf("GetTypeInfo fail: %v", err) + } else if typeInfo.OID != dt.OID { + t.Fatalf("GetTypeInfo OID mismatch: %s %v %v", name, typeInfo.OID, dt.OID) + } else if typeInfo.ReplicaIdentity != 'd' { + t.Fatalf("GetTypeInfo ReplicaIdentity mismatch: %s %v %v", name, typeInfo.ReplicaIdentity, 'd') } } - if _, err = schema.GetTypeOID("other", "other", "other"); !errors.Is(err, ErrSchemaTableMissing) { + if _, err = schema.GetTypeInfo("other", "other", "other"); !errors.Is(err, ErrSchemaNamespaceMissing) { t.Fatalf("unexpected %v", err) } - if _, err = schema.GetTypeOID("public", "other", "other"); !errors.Is(err, ErrSchemaTableMissing) { + if _, err = schema.GetTypeInfo("public", "other", "other"); !errors.Is(err, ErrSchemaTableMissing) { t.Fatalf("unexpected %v", err) } - if _, err = schema.GetTypeOID("public", "t", "other"); !errors.Is(err, ErrSchemaColumnMissing) { + if _, err = schema.GetTypeInfo("public", "t", "other"); !errors.Is(err, ErrSchemaColumnMissing) { t.Fatalf("unexpected %v", err) } }) diff --git a/pkg/sql/source.go b/pkg/sql/source.go index 4db0314..ec9627d 100644 --- a/pkg/sql/source.go +++ b/pkg/sql/source.go @@ -1,6 +1,6 @@ package sql -var QueryAttrTypeOID = `SELECT nspname, relname, attname, atttypid +var QueryAttrTypeOID = `SELECT nspname, relname, attname, atttypid, relreplident FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_class c ON c.relnamespace = n.oid AND c.relkind = 'r' JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum > 0 and a.attisdropped = false