@@ -149,8 +149,10 @@ type syncState struct {
149
149
}
150
150
151
151
// NewPhysicalInitialJob creates a new physical initial job.
152
- func NewPhysicalInitialJob (cfg config.JobConfig , global * global.Config , engineProps global.EngineProps , cloneManager pool.FSManager ,
153
- tm * telemetry.Agent ) (* PhysicalInitial , error ) {
152
+ func NewPhysicalInitialJob (
153
+ cfg config.JobConfig , global * global.Config , engineProps global.EngineProps , cloneManager pool.FSManager ,
154
+ tm * telemetry.Agent ,
155
+ ) (* PhysicalInitial , error ) {
154
156
p := & PhysicalInitial {
155
157
name : cfg .Spec .Name ,
156
158
cloneManager : cloneManager ,
@@ -397,7 +399,13 @@ func (p *PhysicalInitial) checkSyncInstance(ctx context.Context) (string, error)
397
399
398
400
log .Msg ("Sync instance has been checked. It is running" )
399
401
400
- if err := p .checkpoint (ctx , syncContainer .ID ); err != nil {
402
+ if err := tools .RunCheckpoint (
403
+ ctx ,
404
+ p .dockerClient ,
405
+ syncContainer .ID ,
406
+ p .globalCfg .Database .User (),
407
+ p .globalCfg .Database .Name (),
408
+ ); err != nil {
401
409
return "" , errors .Wrap (err , "failed to make a checkpoint for sync instance" )
402
410
}
403
411
@@ -616,9 +624,8 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
616
624
}
617
625
}
618
626
619
- // Checkpoint.
620
- if err := p .checkpoint (ctx , containerID ); err != nil {
621
- return err
627
+ if err := tools .RunCheckpoint (ctx , p .dockerClient , containerID , p .globalCfg .Database .User (), p .globalCfg .Database .Name ()); err != nil {
628
+ return errors .Wrap (err , "failed to run checkpoint" )
622
629
}
623
630
624
631
if err := cfgManager .RemoveRecoveryConfig (); err != nil {
@@ -646,7 +653,10 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
646
653
return nil
647
654
}
648
655
649
- func (p * PhysicalInitial ) getDSAFromWAL (ctx context.Context , pgVersion float64 , containerID , cloneDir string ) (string , error ) {
656
+ func (p * PhysicalInitial ) getDSAFromWAL (ctx context.Context , pgVersion float64 , containerID , cloneDir string ) (
657
+ string ,
658
+ error ,
659
+ ) {
650
660
log .Dbg (cloneDir )
651
661
652
662
walDirectory := walDir (cloneDir , pgVersion )
@@ -692,7 +702,12 @@ func walDir(cloneDir string, pgVersion float64) string {
692
702
return path .Join (cloneDir , dir )
693
703
}
694
704
695
- func (p * PhysicalInitial ) parseWAL (ctx context.Context , containerID string , pgVersion float64 , walFilePath string ) string {
705
+ func (p * PhysicalInitial ) parseWAL (
706
+ ctx context.Context ,
707
+ containerID string ,
708
+ pgVersion float64 ,
709
+ walFilePath string ,
710
+ ) string {
696
711
cmd := walCommand (pgVersion , walFilePath )
697
712
698
713
output , err := tools .ExecCommandWithOutput (ctx , p .dockerClient , containerID , types.ExecConfig {
@@ -768,7 +783,11 @@ func buildRecoveryConfig(fileConfig, userRecoveryConfig map[string]string) map[s
768
783
return recoveryConf
769
784
}
770
785
771
- func (p * PhysicalInitial ) markDSA (ctx context.Context , defaultDSA , containerID , dataDir string , pgVersion float64 ) error {
786
+ func (p * PhysicalInitial ) markDSA (
787
+ ctx context.Context ,
788
+ defaultDSA , containerID , dataDir string ,
789
+ pgVersion float64 ,
790
+ ) error {
772
791
extractedDataStateAt , err := p .extractDataStateAt (ctx , containerID , dataDir , pgVersion , defaultDSA )
773
792
if err != nil {
774
793
if defaultDSA == "" {
@@ -895,8 +914,10 @@ and the source doesn't have enough activity.
895
914
Step 3. Use the timestamp of the latest checkpoint. This is extracted from PGDATA using the
896
915
pg_controldata utility. Note that this is not an exact value of the latest activity in the source
897
916
before we took a copy of PGDATA, but we suppose it is not far from it. */
898
- func (p * PhysicalInitial ) extractDataStateAt (ctx context.Context , containerID , dataDir string , pgVersion float64 ,
899
- defaultDSA string ) (string , error ) {
917
+ func (p * PhysicalInitial ) extractDataStateAt (
918
+ ctx context.Context , containerID , dataDir string , pgVersion float64 ,
919
+ defaultDSA string ,
920
+ ) (string , error ) {
900
921
output , err := p .getLastXActReplayTimestamp (ctx , containerID )
901
922
if err != nil {
902
923
log .Dbg ("unable to get last replay timestamp from the promotion container: " , err )
@@ -1002,20 +1023,6 @@ func (p *PhysicalInitial) runPromoteCommand(ctx context.Context, containerID, cl
1002
1023
return nil
1003
1024
}
1004
1025
1005
- func (p * PhysicalInitial ) checkpoint (ctx context.Context , containerID string ) error {
1006
- commandCheckpoint := []string {"psql" , "-U" , p .globalCfg .Database .User (), "-d" , p .globalCfg .Database .Name (), "-XAtc" , "checkpoint" }
1007
- log .Msg ("Run checkpoint command" , commandCheckpoint )
1008
-
1009
- output , err := tools .ExecCommandWithOutput (ctx , p .dockerClient , containerID , types.ExecConfig {Cmd : commandCheckpoint })
1010
- if err != nil {
1011
- return errors .Wrap (err , "failed to make checkpoint" )
1012
- }
1013
-
1014
- log .Msg ("Checkpoint result: " , output )
1015
-
1016
- return nil
1017
- }
1018
-
1019
1026
func (p * PhysicalInitial ) markDatabaseData () error {
1020
1027
if err := p .dbMarker .CreateConfig (); err != nil {
1021
1028
return errors .Wrap (err , "failed to create a DBMarker config of the database" )
0 commit comments