From d482ef7900199976283df90c3be3b35c55aa5650 Mon Sep 17 00:00:00 2001 From: kenny Date: Sat, 3 Feb 2024 09:31:24 +0800 Subject: [PATCH] fix: should replay tx from latest repl slot confirmed flush lsn --- pkg/source/postgres.go | 22 ++++++++++++++++------ pkg/sql/source.go | 2 ++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/source/postgres.go b/pkg/source/postgres.go index 88d668d..6c5859f 100644 --- a/pkg/source/postgres.go +++ b/pkg/source/postgres.go @@ -107,13 +107,23 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro return nil, err } + var confirmedFlushLSN pglogrepl.LSN + if err = p.setupConn.QueryRow(context.Background(), sql.QueryReplicationSlot, p.ReplSlot).Scan(&confirmedFlushLSN); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, errors.New("replication slot not found") + } + return nil, err + } + p.log = logrus.WithFields(logrus.Fields{"From": "PGXSource"}) p.log.WithFields(logrus.Fields{ - "SystemID": ident.SystemID, - "Timeline": ident.Timeline, - "XLogPos": int64(ident.XLogPos), - "DBName": ident.DBName, - "Decoder": p.DecodePlugin, + "SystemID": ident.SystemID, + "Timeline": ident.Timeline, + "XLogPos": int64(ident.XLogPos), + "DBName": ident.DBName, + "Decoder": p.DecodePlugin, + "ReplSlot": p.ReplSlot, + "ReplSlotConfirmedFlushLSN": uint64(confirmedFlushLSN), }).Info("retrieved current info of source database") if cp.LSN != 0 { @@ -131,7 +141,7 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro } p.currentLsn = uint64(startLsn) } else { - p.currentLsn = uint64(ident.XLogPos) + p.currentLsn = uint64(confirmedFlushLSN) } p.currentSeq = 0 p.log.WithFields(logrus.Fields{ diff --git a/pkg/sql/source.go b/pkg/sql/source.go index ec9627d..e5b0f1a 100644 --- a/pkg/sql/source.go +++ b/pkg/sql/source.go @@ -24,3 +24,5 @@ var CreatePublication = `CREATE PUBLICATION %s FOR ALL TABLES;` var InstallExtension = `CREATE EXTENSION IF NOT EXISTS pgcapture;` var ServerVersionNum = `SHOW server_version_num;` + +var QueryReplicationSlot = `SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1;`