Skip to content

Commit

Permalink
Merge pull request #53 from replicase/feat/handle-unchanged-toast-field
Browse files Browse the repository at this point in the history
feat: handle unchanged toast field
  • Loading branch information
rueian authored Oct 24, 2023
2 parents 65209c2 + d0da98a commit fb405dc
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 97 deletions.
68 changes: 67 additions & 1 deletion pkg/decode/decoder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package decode

import "github.com/replicase/pgcapture/pkg/pb"
import (
"github.com/replicase/pgcapture/pkg/pb"
)

const (
ExtensionSchema = "pgcapture"
Expand Down Expand Up @@ -50,3 +52,67 @@ func IsDDL(m *pb.Change) bool {
func Ignore(m *pb.Change) bool {
return m.Schema == ExtensionSchema && m.Table == ExtensionSources
}

func makeOldPBTuple(schema *PGXSchemaLoader, rel Relation, src []Field, noNull bool) (fields []*pb.Field) {
if src == nil {
return nil
}
fields = make([]*pb.Field, 0, len(src))
for i, s := range src {
if noNull && s.Datum == nil {
continue
}
typeInfo, err := schema.GetTypeInfo(rel.NspName, rel.RelName, rel.Fields[i])
if err != nil {
// TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view
continue
}
switch s.Format {
case 'b':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Binary{Binary: s.Datum}})
case 'n':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: nil})
case 't':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Text{Text: string(s.Datum)}})
case 'u':
continue // unchanged toast field should be excluded
}
}
return fields
}

func makeNewPBTuple(schema *PGXSchemaLoader, rel Relation, old, new []Field, noNull bool) (fields []*pb.Field) {
if new == nil {
return nil
}
fields = make([]*pb.Field, 0, len(new))
for i, s := range new {
if noNull && s.Datum == nil {
continue
}
typeInfo, err := schema.GetTypeInfo(rel.NspName, rel.RelName, rel.Fields[i])
if err != nil {
// TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view
continue
}
ReAppend:
switch s.Format {
case 'b':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Binary{Binary: s.Datum}})
case 'n':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: nil})
case 't':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: typeInfo.OID, Value: &pb.Field_Text{Text: string(s.Datum)}})
case 'u':
// fill the unchanged field with old value when ReplicaIdentity is full
// otherwise, skip the unchanged field
if typeInfo.ReplicaIdentity == ReplicaIdentityFull && old[i].Format != 'u' {
s.Format = old[i].Format
s.Datum = old[i].Datum
goto ReAppend
}
continue
}
}
return fields
}
8 changes: 5 additions & 3 deletions pkg/decode/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package decode
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/jackc/pgx/v5"
Expand All @@ -27,9 +28,10 @@ func (c *change) Apply(ctx context.Context, conn *pgx.Conn) (err error) {
fmts[i] = 1
}

table := c.Expect.Table
switch c.Expect.Op {
case pb.Change_INSERT:
_, err = conn.PgConn().ExecParams(ctx, "insert into t values ($1,$2,$3,$4,$5,$6)", vals, oids, fmts, fmts).Close()
_, err = conn.PgConn().ExecParams(ctx, fmt.Sprintf("insert into %s values ($1,$2,$3,$4,$5,$6)", table), vals, oids, fmts, fmts).Close()
case pb.Change_UPDATE:
if c.Expect.Old != nil {
vals[5] = c.Expect.Old[0].GetBinary()
Expand All @@ -39,12 +41,12 @@ func (c *change) Apply(ctx context.Context, conn *pgx.Conn) (err error) {
oids[5] = c.Expect.New[0].Oid
}
fmts[5] = 1
_, err = conn.PgConn().ExecParams(ctx, "update t set id=$1,uid=$2,txt=$3,js=$4,ts=$5 where id=$6", vals, oids, fmts, fmts).Close()
_, err = conn.PgConn().ExecParams(ctx, fmt.Sprintf("update %s set id=$1,uid=$2,txt=$3,js=$4,ts=$5 where id=$6", table), vals, oids, fmts, fmts).Close()
case pb.Change_DELETE:
vals[0] = c.Expect.Old[0].GetBinary()
oids[0] = c.Expect.Old[0].Oid
fmts[0] = 1
_, err = conn.PgConn().ExecParams(ctx, "delete from t where id=$1", vals[:1], oids[:1], fmts[:1], fmts[:1]).Close()
_, err = conn.PgConn().ExecParams(ctx, fmt.Sprintf("delete from %s where id=$1", table), vals[:1], oids[:1], fmts[:1], fmts[:1]).Close()
}
return err
}
Expand Down
32 changes: 2 additions & 30 deletions pkg/decode/pglogical.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (p *PGLogicalDecoder) Decode(in []byte) (m *pb.Message, err error) {
}

c := &pb.Change{Schema: rel.NspName, Table: rel.RelName, Op: OpMap[in[0]]}
c.Old = p.makePBTuple(rel, r.Old, true)
c.New = p.makePBTuple(rel, r.New, false)
c.Old = makeOldPBTuple(p.schema, rel, r.Old, true)
c.New = makeNewPBTuple(p.schema, rel, r.Old, r.New, false)

if len(c.Old) != 0 || len(c.New) != 0 {
return &pb.Message{Type: &pb.Message_Change{Change: c}}, nil
Expand All @@ -78,34 +78,6 @@ func (p *PGLogicalDecoder) GetPluginArgs() []string {
return p.pluginArgs
}

func (p *PGLogicalDecoder) makePBTuple(rel Relation, src []Field, noNull bool) (fields []*pb.Field) {
if src == nil {
return nil
}
fields = make([]*pb.Field, 0, len(src))
for i, s := range src {
if noNull && s.Datum == nil {
continue
}
oid, err := p.schema.GetTypeOID(rel.NspName, rel.RelName, rel.Fields[i])
if err != nil {
// TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view
continue
}
switch s.Format {
case 'b':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Binary{Binary: s.Datum}})
case 'n':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: nil})
case 't':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Text{Text: string(s.Datum)}})
case 'u':
continue // unchanged toast field should be excluded
}
}
return fields
}

func (p *PGLogicalDecoder) ReadBegin(in []byte) (*pb.Message, error) {
if len(in) != 1+1+8+8+4 {
return nil, errors.New("begin wrong length")
Expand Down
62 changes: 54 additions & 8 deletions pkg/decode/pglogical_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func TestPGLogicalDecoder(t *testing.T) {
defer conn.Close(ctx)

conn.Exec(ctx, "DROP SCHEMA public CASCADE; CREATE SCHEMA public; CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";")
conn.Exec(ctx, "CREATE TABLE t (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)")
conn.Exec(ctx, "CREATE TABLE t1 (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)")
conn.Exec(ctx, "CREATE TABLE t2 (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)")
conn.Exec(ctx, "ALTER TABLE t2 REPLICA IDENTITY FULL")
conn.Exec(ctx, fmt.Sprintf("SELECT pg_drop_replication_slot('%s')", TestSlot))
conn.Exec(ctx, sql.CreateLogicalSlot, TestSlot, PGLogicalOutputPlugin)

Expand All @@ -45,7 +47,7 @@ func TestPGLogicalDecoder(t *testing.T) {
now := time.Now()
changes := []*change{
{
Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t",
Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t1",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}},
Expand All @@ -57,7 +59,7 @@ func TestPGLogicalDecoder(t *testing.T) {
},
},
{
Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t",
Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t1",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(2, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("3e89ee8c-3657-4103-99a7-680292a0c22c"), pgtype.UUIDOID)}},
Expand All @@ -69,7 +71,7 @@ func TestPGLogicalDecoder(t *testing.T) {
},
},
{
Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t",
Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t1",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}},
Expand All @@ -80,7 +82,7 @@ func TestPGLogicalDecoder(t *testing.T) {
},
},
{
Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t",
Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t1",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(3, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("f0d3ad8e-709f-4f67-9860-e149c671d82a"), pgtype.UUIDOID)}},
Expand All @@ -94,19 +96,62 @@ func TestPGLogicalDecoder(t *testing.T) {
},
},
{
Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t",
Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t1",
Old: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(3, pgtype.Int8OID)}},
},
},
},
{
Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t",
Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t1",
Old: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
},
},
},
{
Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t2",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}},
{Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
},
},
{
Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t2",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}},
{Name: "txt", Oid: 25, Value: nil},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
Old: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}},
{Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
},
},
{
Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t2",
Old: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
},
},
}
for _, change := range changes {
if err = change.Apply(ctx, conn); err != nil {
Expand Down Expand Up @@ -165,7 +210,8 @@ recv:
}
case 2:
if c := m.GetChange(); c == nil || !proto.Equal(c, changes[(count-2)/3].Expect) {
t.Fatalf("unexpected %v", m.String())
fmt.Println(count)
t.Fatalf("unexpected %v\n %v", m.String(), changes[(count-2)/3].Expect.String())
}
}
}
Expand Down
32 changes: 2 additions & 30 deletions pkg/decode/pgoutput.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (p *PGOutputDecoder) Decode(in []byte) (m *pb.Message, err error) {
}

c := &pb.Change{Schema: rel.NspName, Table: rel.RelName, Op: OpMap[in[0]]}
c.Old = p.makePBTuple(rel, r.Old, true)
c.New = p.makePBTuple(rel, r.New, false)
c.Old = makeOldPBTuple(p.schema, rel, r.Old, true)
c.New = makeNewPBTuple(p.schema, rel, r.Old, r.New, false)

if len(c.Old) != 0 || len(c.New) != 0 {
return &pb.Message{Type: &pb.Message_Change{Change: c}}, nil
Expand All @@ -68,34 +68,6 @@ func (p *PGOutputDecoder) GetPluginArgs() []string {
return p.pluginArgs
}

func (p *PGOutputDecoder) makePBTuple(rel Relation, src []Field, noNull bool) (fields []*pb.Field) {
if src == nil {
return nil
}
fields = make([]*pb.Field, 0, len(src))
for i, s := range src {
if noNull && s.Datum == nil {
continue
}
oid, err := p.schema.GetTypeOID(rel.NspName, rel.RelName, rel.Fields[i])
if err != nil {
// TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view
continue
}
switch s.Format {
case 'b':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Binary{Binary: s.Datum}})
case 'n':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: nil})
case 't':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Text{Text: string(s.Datum)}})
case 'u':
continue // unchanged toast field should be excluded
}
}
return fields
}

func (p *PGOutputDecoder) ReadBegin(in []byte) (*pb.Message, error) {
if len(in) != 1+1+8+8+3 {
return nil, errors.New("begin wrong length")
Expand Down
45 changes: 45 additions & 0 deletions pkg/decode/pgoutput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func TestPGOutputDecoder(t *testing.T) {

conn.Exec(ctx, "DROP SCHEMA public CASCADE; CREATE SCHEMA public; CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";")
conn.Exec(ctx, "CREATE TABLE t (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)")
conn.Exec(ctx, "CREATE TABLE t2 (id bigint primary key, uid uuid, txt text, js jsonb, ts timestamptz, bs bytea)")
conn.Exec(ctx, "ALTER TABLE t2 REPLICA IDENTITY FULL")
conn.Exec(ctx, fmt.Sprintf("SELECT pg_drop_replication_slot('%s')", TestSlot))
conn.Exec(ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES;", TestSlot))
conn.Exec(ctx, sql.CreateLogicalSlot, TestSlot, PGOutputPlugin)
Expand Down Expand Up @@ -106,6 +108,49 @@ func TestPGOutputDecoder(t *testing.T) {
},
},
},
{
Expect: &pb.Change{Op: pb.Change_INSERT, Schema: "public", Table: "t2",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}},
{Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
},
},
{
Expect: &pb.Change{Op: pb.Change_UPDATE, Schema: "public", Table: "t2",
New: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}},
{Name: "txt", Oid: 25, Value: nil},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
Old: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("08d6af78-550c-4071-80be-2fece2db0474"), pgtype.UUIDOID)}},
{Name: "txt", Oid: 25, Value: &pb.Field_Binary{Binary: b(nT(5), pgtype.TextOID)}},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now, pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
},
},
{
Expect: &pb.Change{Op: pb.Change_DELETE, Schema: "public", Table: "t2",
Old: []*pb.Field{
{Name: "id", Oid: 20, Value: &pb.Field_Binary{Binary: b(1, pgtype.Int8OID)}},
{Name: "uid", Oid: 2950, Value: &pb.Field_Binary{Binary: b([]byte("782b2492-3e7c-431b-9238-c1136ea57190"), pgtype.UUIDOID)}},
{Name: "js", Oid: 3802, Value: &pb.Field_Binary{Binary: b([]byte(`{"a": {"b": {"c": {"d": null}}}}`), pgtype.JSONBOID)}},
{Name: "ts", Oid: 1184, Value: &pb.Field_Binary{Binary: b(now.Add(time.Second), pgtype.TimestamptzOID)}},
{Name: "bs", Oid: 17, Value: &pb.Field_Binary{Binary: b(nB(500000), pgtype.ByteaOID)}},
},
},
},
}
for _, change := range changes {
if err = change.Apply(ctx, conn); err != nil {
Expand Down
Loading

0 comments on commit fb405dc

Please sign in to comment.