Skip to content

Commit

Permalink
Merge pull request #56 from replicase/fix/fix-pg-source-restart-loss-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
KennyChenFight authored Feb 3, 2024
2 parents dd3691e + d482ef7 commit eff18d5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
22 changes: 16 additions & 6 deletions pkg/source/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,23 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro
return nil, err
}

var confirmedFlushLSN pglogrepl.LSN
if err = p.setupConn.QueryRow(context.Background(), sql.QueryReplicationSlot, p.ReplSlot).Scan(&confirmedFlushLSN); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.New("replication slot not found")
}
return nil, err
}

p.log = logrus.WithFields(logrus.Fields{"From": "PGXSource"})
p.log.WithFields(logrus.Fields{
"SystemID": ident.SystemID,
"Timeline": ident.Timeline,
"XLogPos": int64(ident.XLogPos),
"DBName": ident.DBName,
"Decoder": p.DecodePlugin,
"SystemID": ident.SystemID,
"Timeline": ident.Timeline,
"XLogPos": int64(ident.XLogPos),
"DBName": ident.DBName,
"Decoder": p.DecodePlugin,
"ReplSlot": p.ReplSlot,
"ReplSlotConfirmedFlushLSN": uint64(confirmedFlushLSN),
}).Info("retrieved current info of source database")

if cp.LSN != 0 {
Expand All @@ -131,7 +141,7 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro
}
p.currentLsn = uint64(startLsn)
} else {
p.currentLsn = uint64(ident.XLogPos)
p.currentLsn = uint64(confirmedFlushLSN)
}
p.currentSeq = 0
p.log.WithFields(logrus.Fields{
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ var CreatePublication = `CREATE PUBLICATION %s FOR ALL TABLES;`
var InstallExtension = `CREATE EXTENSION IF NOT EXISTS pgcapture;`

var ServerVersionNum = `SHOW server_version_num;`

var QueryReplicationSlot = `SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1;`

0 comments on commit eff18d5

Please sign in to comment.