Skip to content

Commit 9e951eb

Browse files
committed
Merge branch '248-parse-wals' into 'master'
feat: parse WAL files to retrieve dataStateAt (#248) See merge request postgres-ai/database-lab!280
2 parents a6a953b + 9bebfd8 commit 9e951eb

File tree

2 files changed

+168
-15
lines changed

2 files changed

+168
-15
lines changed

pkg/retrieval/engine/postgres/snapshot/physical.go

+135-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"context"
1212
"fmt"
1313
"io"
14+
"io/ioutil"
1415
"path"
1516
"strings"
1617
"sync"
@@ -54,6 +55,9 @@ const (
5455
restoreCommandOption = "restore_command"
5556
targetActionOption = "recovery_target_action"
5657
promoteTargetAction = "promote"
58+
59+
// WAL parsing constants.
60+
walNameLen = 24
5761
)
5862

5963
var defaultRecoveryCfg = map[string]string{
@@ -390,7 +394,7 @@ func (p *PhysicalInitial) checkSyncInstance(ctx context.Context) (string, error)
390394

391395
extractedDataStateAt, err := p.getLastXActReplayTimestamp(ctx, syncContainer.ID)
392396
if err != nil {
393-
return "", errors.Wrap(err, `failed to get last xact replay timestamp from the sync instance`)
397+
return "", errors.Wrap(err, `failed to get last replay timestamp from the sync instance`)
394398
}
395399

396400
log.Msg("Sync instance data state at: ", extractedDataStateAt)
@@ -550,6 +554,19 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
550554
return errors.Wrap(err, "failed to start container")
551555
}
552556

557+
if syState.DSA == "" {
558+
dsa, err := p.getDSAFromWAL(ctx, cfgManager.GetPgVersion(), promoteCont.ID, clonePath)
559+
if err != nil {
560+
log.Dbg("cannot extract DSA form WAL files: ", err)
561+
}
562+
563+
if dsa != "" {
564+
log.Msg("DataStateAt extracted from WAL files: ", dsa)
565+
566+
syState.DSA = dsa
567+
}
568+
}
569+
553570
log.Msg("Starting PostgreSQL and waiting for readiness")
554571
log.Msg(fmt.Sprintf("View logs using the command: %s %s", tools.ViewLogsCmd, p.promoteContainerName()))
555572

@@ -623,6 +640,85 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
623640
return nil
624641
}
625642

643+
func (p *PhysicalInitial) getDSAFromWAL(ctx context.Context, pgVersion float64, containerID, cloneDir string) (string, error) {
644+
log.Dbg(cloneDir)
645+
646+
infos, err := ioutil.ReadDir(path.Join(cloneDir, "pg_wal"))
647+
if err != nil {
648+
return "", errors.Wrap(err, "failed to read the pg_wal dir")
649+
}
650+
651+
// Walk in the reverse order.
652+
for i := len(infos) - 1; i >= 0; i-- {
653+
fileName := infos[i].Name()
654+
walFilePath := path.Join(cloneDir, "pg_wal", fileName)
655+
656+
log.Dbg("Look up into file: ", walFilePath)
657+
658+
if len(fileName) != walNameLen {
659+
continue
660+
}
661+
662+
dateTime := p.parseWAL(ctx, containerID, pgVersion, walFilePath)
663+
if dateTime != "" {
664+
return dateTime, nil
665+
}
666+
}
667+
668+
log.Dbg("no found dataStateAt in WAL files")
669+
670+
return "", nil
671+
}
672+
673+
func (p *PhysicalInitial) parseWAL(ctx context.Context, containerID string, pgVersion float64, walFilePath string) string {
674+
cmd := fmt.Sprintf("/usr/lib/postgresql/%g/bin/pg_waldump %s -r Transaction | tail -1", pgVersion, walFilePath)
675+
676+
output, err := tools.ExecCommandWithOutput(ctx, p.dockerClient, containerID, types.ExecConfig{
677+
Cmd: []string{"sh", "-c", cmd},
678+
})
679+
if err != nil {
680+
log.Dbg("failed to parse WAL: ", err)
681+
return ""
682+
}
683+
684+
if output == "" {
685+
log.Dbg("empty timestamp output given")
686+
return ""
687+
}
688+
689+
log.Dbg("Parse the line from a WAL file", output)
690+
691+
return parseWALLine(output)
692+
}
693+
694+
func parseWALLine(line string) string {
695+
const (
696+
commitToken = "COMMIT"
697+
tokenLen = len(commitToken)
698+
layout = "2006-01-02 15:04:05.000000 MST"
699+
)
700+
701+
commitIndex := strings.LastIndex(line, commitToken)
702+
if commitIndex == -1 {
703+
log.Dbg("timestamp not found", line)
704+
return ""
705+
}
706+
707+
dateTimeString := strings.TrimSpace(line[commitIndex+tokenLen:])
708+
709+
if idx := strings.IndexByte(dateTimeString, ';'); idx > 0 {
710+
dateTimeString = dateTimeString[:idx]
711+
}
712+
713+
parsedDate, err := time.Parse(layout, dateTimeString)
714+
if err != nil {
715+
log.Dbg("failed to parse WAL time: ", dateTimeString)
716+
return ""
717+
}
718+
719+
return parsedDate.Format(tools.DataStateAtFormat)
720+
}
721+
626722
func buildRecoveryConfig(fileConfig, userRecoveryConfig map[string]string) map[string]string {
627723
recoveryConf := fileConfig
628724

@@ -640,7 +736,7 @@ func buildRecoveryConfig(fileConfig, userRecoveryConfig map[string]string) map[s
640736
}
641737

642738
func (p *PhysicalInitial) markDSA(ctx context.Context, defaultDSA, containerID, dataDir string, pgVersion float64) error {
643-
extractedDataStateAt, err := p.extractDataStateAt(ctx, containerID, dataDir, pgVersion)
739+
extractedDataStateAt, err := p.extractDataStateAt(ctx, containerID, dataDir, pgVersion, defaultDSA)
644740
if err != nil {
645741
if defaultDSA == "" {
646742
return errors.Wrap(err, `failed to extract dataStateAt`)
@@ -748,26 +844,50 @@ func (p *PhysicalInitial) checkRecovery(ctx context.Context, containerID string)
748844
return output, err
749845
}
750846

751-
func (p *PhysicalInitial) extractDataStateAt(ctx context.Context, containerID, dataDir string, pgVersion float64) (string, error) {
847+
func (p *PhysicalInitial) extractDataStateAt(ctx context.Context, containerID, dataDir string, pgVersion float64,
848+
defaultDSA string) (string, error) {
752849
output, err := p.getLastXActReplayTimestamp(ctx, containerID)
850+
if err != nil {
851+
log.Dbg("unable to get last replay timestamp from the promotion container: ", err)
852+
}
753853

754-
if output == "" {
755-
log.Msg("The last replay timestamp not found. Extract the last checkpoint timestamp")
854+
if output != "" && err == nil {
855+
return output, nil
856+
}
756857

757-
response, err := pgtool.ReadControlData(ctx, p.dockerClient, containerID, dataDir, pgVersion)
758-
if err != nil {
759-
return "", errors.Wrap(err, "failed to read control data")
760-
}
858+
if defaultDSA != "" {
859+
log.Msg("failed to extract dataStateAt. Use value from the sync instance: ", defaultDSA)
761860

762-
defer response.Close()
861+
return defaultDSA, nil
862+
}
763863

764-
output, err = getCheckPointTimestamp(ctx, response.Reader)
765-
if err != nil {
766-
return "", errors.Wrap(err, "failed to read control data")
767-
}
864+
// If the sync instance has not yet downloaded WAL when retrieving the default DSA, run it again.
865+
dsa, err := p.getDSAFromWAL(ctx, pgVersion, containerID, dataDir)
866+
if err != nil {
867+
log.Dbg("cannot extract DSA from WAL files in the promotion container: ", err)
768868
}
769869

770-
return output, err
870+
if dsa != "" {
871+
log.Msg("Use dataStateAt value from the promotion WAL files: ", defaultDSA)
872+
873+
return dsa, nil
874+
}
875+
876+
log.Msg("The last replay timestamp and dataStateAt from the sync instance are not found. Extract the last checkpoint timestamp")
877+
878+
response, err := pgtool.ReadControlData(ctx, p.dockerClient, containerID, dataDir, pgVersion)
879+
if err != nil {
880+
return "", errors.Wrap(err, "failed to read control data")
881+
}
882+
883+
defer response.Close()
884+
885+
output, err = getCheckPointTimestamp(ctx, response.Reader)
886+
if err != nil {
887+
return "", errors.Wrap(err, "failed to read control data")
888+
}
889+
890+
return output, nil
771891
}
772892

773893
func (p *PhysicalInitial) getLastXActReplayTimestamp(ctx context.Context, containerID string) (string, error) {

pkg/retrieval/engine/postgres/snapshot/physical_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
14+
15+
"gitlab.com/postgres-ai/database-lab/v2/pkg/log"
1416
)
1517

1618
func TestInitParamsExtraction(t *testing.T) {
@@ -45,3 +47,34 @@ Minimum recovery ending location: 0/0
4547
assert.EqualValues(t, tc.expectedDataStateAt, dsa)
4648
}
4749
}
50+
51+
func TestParsingWalLine(t *testing.T) {
52+
log.DEBUG = false
53+
54+
testCases := []struct {
55+
line string
56+
expectedDataStateAt string
57+
}{
58+
{
59+
line: "",
60+
expectedDataStateAt: "",
61+
},
62+
{
63+
line: "COMMIT",
64+
expectedDataStateAt: "",
65+
},
66+
{
67+
line: `Transaction len (rec/tot): 34/ 34, tx: 62566, lsn: C8/3E013E78, prev C8/3E013E40, desc: COMMIT 2021-05-23 02:50:59.993820 UTC`,
68+
expectedDataStateAt: "20210523025059",
69+
},
70+
{
71+
line: "rmgr: Transaction len (rec/tot): 82/ 82, tx: 62559, lsn: C8/370012E0, prev C8/37001290, desc: COMMIT 2021-05-23 01:17:21.531705 UTC; inval msgs: catcache 11 catcache 10",
72+
expectedDataStateAt: "20210523011721",
73+
},
74+
}
75+
76+
for _, tc := range testCases {
77+
dsa := parseWALLine(tc.line)
78+
assert.EqualValues(t, tc.expectedDataStateAt, dsa)
79+
}
80+
}

0 commit comments

Comments
 (0)