Skip to content

Commit a6a953b

Browse files
committed
Merge branch '248-extract-correct-dsa' into 'master'
fix: extract DSA from sync instance and use it as fallback value (#248) See merge request postgres-ai/database-lab!279
2 parents 3e5da9b + 380b5bd commit a6a953b

File tree

2 files changed

+58
-25
lines changed

2 files changed

+58
-25
lines changed

pkg/retrieval/engine/postgres/snapshot/physical.go

+56-25
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ type QueryPreprocessing struct {
134134
MaxParallelWorkers int `yaml:"maxParallelWorkers"`
135135
}
136136

137+
// syncState defines state of a sync instance.
138+
type syncState struct {
139+
DSA string
140+
Err error
141+
}
142+
137143
// NewPhysicalInitialJob creates a new physical initial job.
138144
func NewPhysicalInitialJob(cfg config.JobConfig, global *dblabCfg.Global, cloneManager pool.FSManager) (*PhysicalInitial, error) {
139145
p := &PhysicalInitial{
@@ -298,11 +304,14 @@ func (p *PhysicalInitial) run(ctx context.Context) (err error) {
298304
}
299305
}()
300306

301-
var syncErr error
307+
var syState syncState
302308

303309
if p.options.Promotion.Enabled {
304-
if syncErr = p.checkSyncInstance(ctx); syncErr != nil {
305-
log.Dbg(fmt.Sprintf("failed to check the sync instance before snapshotting: %v", syncErr), "Changing the promotion strategy")
310+
syState.DSA, syState.Err = p.checkSyncInstance(ctx)
311+
312+
if syState.Err != nil {
313+
log.Dbg(fmt.Sprintf("failed to check the sync instance before snapshotting: %v", syState),
314+
"Recovery configs will be applied on the promotion stage")
306315
}
307316
}
308317

@@ -334,7 +343,7 @@ func (p *PhysicalInitial) run(ctx context.Context) (err error) {
334343

335344
// Promotion.
336345
if p.options.Promotion.Enabled {
337-
if err := p.promoteInstance(ctx, path.Join(p.fsPool.ClonesDir(), cloneName, p.fsPool.DataSubDir), syncErr); err != nil {
346+
if err := p.promoteInstance(ctx, path.Join(p.fsPool.ClonesDir(), cloneName, p.fsPool.DataSubDir), syState); err != nil {
338347
return errors.Wrap(err, "failed to promote instance")
339348
}
340349
}
@@ -361,23 +370,32 @@ func (p *PhysicalInitial) run(ctx context.Context) (err error) {
361370
return nil
362371
}
363372

364-
func (p *PhysicalInitial) checkSyncInstance(ctx context.Context) error {
373+
func (p *PhysicalInitial) checkSyncInstance(ctx context.Context) (string, error) {
374+
log.Msg("Check the sync instance state: ", p.syncInstanceName())
375+
365376
syncContainer, err := p.dockerClient.ContainerInspect(ctx, p.syncInstanceName())
366377
if err != nil {
367-
return err
378+
return "", err
368379
}
369380

370381
if err := tools.CheckContainerReadiness(ctx, p.dockerClient, syncContainer.ID); err != nil {
371-
return errors.Wrap(err, "failed to readiness check")
382+
return "", errors.Wrap(err, "failed to readiness check")
372383
}
373384

374385
log.Msg("Sync instance has been checked. It is running")
375386

376387
if err := p.checkpoint(ctx, syncContainer.ID); err != nil {
377-
return errors.Wrap(err, "failed to make a checkpoint for sync instance")
388+
return "", errors.Wrap(err, "failed to make a checkpoint for sync instance")
378389
}
379390

380-
return nil
391+
extractedDataStateAt, err := p.getLastXActReplayTimestamp(ctx, syncContainer.ID)
392+
if err != nil {
393+
return "", errors.Wrap(err, `failed to get last xact replay timestamp from the sync instance`)
394+
}
395+
396+
log.Msg("Sync instance data state at: ", extractedDataStateAt)
397+
398+
return extractedDataStateAt, nil
381399
}
382400

383401
func (p *PhysicalInitial) syncInstanceName() string {
@@ -440,7 +458,7 @@ func (p *PhysicalInitial) promoteContainerName() string {
440458
return promoteContainerPrefix + p.globalCfg.InstanceID
441459
}
442460

443-
func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string, syncErr error) (err error) {
461+
func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string, syState syncState) (err error) {
444462
p.promotionMutex.Lock()
445463
defer p.promotionMutex.Unlock()
446464

@@ -470,7 +488,7 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
470488
recoveryConfig := make(map[string]string)
471489

472490
// Item 5. Remove a recovery file: https://gitlab.com/postgres-ai/database-lab/-/issues/236#note_513401256
473-
if syncErr != nil {
491+
if syState.Err != nil {
474492
recoveryConfig = buildRecoveryConfig(recoveryFileConfig, p.options.Promotion.Recovery)
475493

476494
if err := cfgManager.ApplyRecovery(recoveryFileConfig); err != nil {
@@ -563,7 +581,7 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
563581
}
564582
}
565583

566-
if err := p.markDSA(ctx, promoteCont.ID, clonePath, cfgManager.GetPgVersion()); err != nil {
584+
if err := p.markDSA(ctx, syState.DSA, promoteCont.ID, clonePath, cfgManager.GetPgVersion()); err != nil {
567585
return errors.Wrap(err, "failed to mark dataStateAt")
568586
}
569587

@@ -621,13 +639,18 @@ func buildRecoveryConfig(fileConfig, userRecoveryConfig map[string]string) map[s
621639
return recoveryConf
622640
}
623641

624-
func (p *PhysicalInitial) markDSA(ctx context.Context, containerID, dataDir string, pgVersion float64) error {
642+
func (p *PhysicalInitial) markDSA(ctx context.Context, defaultDSA, containerID, dataDir string, pgVersion float64) error {
625643
extractedDataStateAt, err := p.extractDataStateAt(ctx, containerID, dataDir, pgVersion)
626644
if err != nil {
627-
return errors.Wrap(err, `failed to extract dataStateAt`)
645+
if defaultDSA == "" {
646+
return errors.Wrap(err, `failed to extract dataStateAt`)
647+
}
648+
649+
log.Msg("failed to extract dataStateAt. Use value from the sync instance: ", defaultDSA)
650+
extractedDataStateAt = defaultDSA
628651
}
629652

630-
log.Msg("Extracted Data state at: ", extractedDataStateAt)
653+
log.Msg("Data state at: ", extractedDataStateAt)
631654

632655
if p.dbMark.DataStateAt != "" && extractedDataStateAt == p.dbMark.DataStateAt {
633656
return newSkipSnapshotErr(fmt.Sprintf(
@@ -637,7 +660,7 @@ func (p *PhysicalInitial) markDSA(ctx context.Context, containerID, dataDir stri
637660

638661
p.dbMark.DataStateAt = extractedDataStateAt
639662

640-
log.Msg("Data state at: ", p.dbMark.DataStateAt)
663+
log.Msg("Mark data state at: ", p.dbMark.DataStateAt)
641664

642665
return nil
643666
}
@@ -726,15 +749,7 @@ func (p *PhysicalInitial) checkRecovery(ctx context.Context, containerID string)
726749
}
727750

728751
func (p *PhysicalInitial) extractDataStateAt(ctx context.Context, containerID, dataDir string, pgVersion float64) (string, error) {
729-
extractionCommand := []string{"psql", "-U", p.globalCfg.Database.User(), "-d", p.globalCfg.Database.Name(), "-XAtc",
730-
"select to_char(pg_last_xact_replay_timestamp() at time zone 'UTC', 'YYYYMMDDHH24MISS')"}
731-
732-
log.Msg("Running dataStateAt command", extractionCommand)
733-
734-
output, err := tools.ExecCommandWithOutput(ctx, p.dockerClient, containerID, types.ExecConfig{
735-
Cmd: extractionCommand,
736-
User: defaults.Username,
737-
})
752+
output, err := p.getLastXActReplayTimestamp(ctx, containerID)
738753

739754
if output == "" {
740755
log.Msg("The last replay timestamp not found. Extract the last checkpoint timestamp")
@@ -755,6 +770,22 @@ func (p *PhysicalInitial) extractDataStateAt(ctx context.Context, containerID, d
755770
return output, err
756771
}
757772

773+
func (p *PhysicalInitial) getLastXActReplayTimestamp(ctx context.Context, containerID string) (string, error) {
774+
extractionCommand := []string{"psql", "-U", p.globalCfg.Database.User(), "-d", p.globalCfg.Database.Name(), "-XAtc",
775+
"select to_char(pg_last_xact_replay_timestamp() at time zone 'UTC', 'YYYYMMDDHH24MISS')"}
776+
777+
log.Msg("Running dataStateAt command", extractionCommand)
778+
779+
output, err := tools.ExecCommandWithOutput(ctx, p.dockerClient, containerID, types.ExecConfig{
780+
Cmd: extractionCommand,
781+
User: defaults.Username,
782+
})
783+
784+
log.Msg("Extracted last replay timestamp: ", output)
785+
786+
return output, err
787+
}
788+
758789
func getCheckPointTimestamp(ctx context.Context, r io.Reader) (string, error) {
759790
scanner := bufio.NewScanner(r)
760791
checkpointTitleBytes := []byte(checkpointTimestampLabel)

pkg/retrieval/engine/postgres/tools/tools.go

+2
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ func StopPostgres(ctx context.Context, dockerClient *client.Client, containerID,
182182

183183
// CheckContainerReadiness checks health and reports if container is ready.
184184
func CheckContainerReadiness(ctx context.Context, dockerClient *client.Client, containerID string) (err error) {
185+
log.Msg("Check container readiness: ", containerID)
186+
185187
for {
186188
select {
187189
case <-ctx.Done():

0 commit comments

Comments
 (0)