Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions gpMgmt/bin/gppylib/commands/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,33 @@ def create_slot(self):
format(self.name, self.host, self.port))
return True

class PgReplicationSlotCopy:
def __init__(self, src_slot, dst_slot_name):
self.src_slot = src_slot
self.dst_slot_name = dst_slot_name

def do_copy(self):
if not self.src_slot.slot_exists():
logger.exception("PgReplicationSlotCopy: src slot {} doesn't exist for host:{}, port:{}".format(self.src_slot.name, self.src_slot.host, self.src_slot.port))
return False
else:
logger.debug("Copy slot {} to {} for host:{}, port:{}".format(self.src_slot.name, self.dst_slot_name, self.src_slot.host, self.src_slot.port))
sql = "SELECT pg_copy_physical_replication_slot('{}', '{}', false);".format(self.src_slot.name, self.dst_slot_name)
try:
dburl = dbconn.DbURL(hostname=self.src_slot.host, port=self.src_slot.port)
with closing(dbconn.connect(dburl, utility=True, encoding='UTF8')) as conn:
dbconn.query(conn, sql)
except DatabaseError as e:
logger.exception("Failed to query pg_copy_physical_replication_slot for host:{}, port:{}: {}".
format(self.src_slot.host, self.src_slot.port, str(e)))
return False
except Exception as ex:
raise Exception("Failed to create a copy of replication slot for host:{}, port:{} : {}".
format(self.src_slot.host, self.src_slot.port, str(ex)))

logger.debug("Successfully created a copy {} of replication slot {} for host:{}, port:{}".
format(self.dst_slot_name, self.src_slot.name, self.src_slot.host, self.src_slot.port))
return True

class PgControlData(Command):
def __init__(self, name, datadir, ctxt=LOCAL, remoteHost=None):
Expand Down
169 changes: 159 additions & 10 deletions gpMgmt/bin/gppylib/operations/buildMirrorSegments.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ def _cleanup_before_recovery(self, gpArray, gpEnv):
self._clean_up_failed_segments()
self._set_seg_status_in_gparray()

def _cleanup_before_recovery_2(self, gpArray, gpEnv):
self.__logger.info("[RELOG] _stop_failed_segments - start")
self._stop_failed_segments(gpEnv)
self.__logger.info("[RELOG] _stop_failed_segments - end")
self.__logger.info("[RELOG] _wait_fts_to_mark_down_segments - start")
self._wait_fts_to_mark_down_segments(gpEnv, self._get_segments_to_mark_down())
self.__logger.info("[RELOG] _wait_fts_to_mark_down_segments - end")
if not self.__forceoverwrite:
self._clean_up_failed_segments()
self._set_seg_status_in_gparray()

def _get_segments_to_mark_down(self):
segments_to_mark_down = []
for toRecover in self.__mirrorsToBuild:
Expand Down Expand Up @@ -275,32 +286,94 @@ def __build_mirrors(self, actionName, gpEnv, gpArray):
from the mirrorsToBuild must be present in gpArray.

"""
self.__logger.info("[RELOG] __build_mirrors - start")
if len(self.__mirrorsToBuild) == 0:
self.__logger.info("No segments to {}".format(actionName))
return True

is_full_sync = True
for mirror in self.__mirrorsToBuild:
if not mirror.isFullSynchronization():
is_full_sync = False
break

is_dir_conflict = False
for mirror in self.__mirrorsToBuild:
if is_dir_conflict:
break;
failedSegment = mirror.getFailedSegment()
if failedSegment is None:
continue
for segmentPair in gpArray.getSegmentList():
primaryDB = segmentPair.primaryDB
mirrorDB = segmentPair.mirrorDB
if primaryDB is not None:
if (primaryDB.hostname == failedSegment.hostname or primaryDB.address == failedSegment.address) and \
primaryDB.datadir == failedSegment.datadir:
is_dir_conflict = True
break;
if mirrorDB is not None:
if (mirrorDB.hostname == failedSegment.hostname or mirrorDB.address == failedSegment.address) and \
mirrorDB.datadir == failedSegment.datadir:
is_dir_conflict = True
break;

if actionName not in [GpMirrorListToBuild.Action.ADDMIRRORS, GpMirrorListToBuild.Action.RECOVERMIRRORS]:
raise Exception('Invalid action. Valid values are {} and {}'.format(GpMirrorListToBuild.Action.RECOVERMIRRORS,
GpMirrorListToBuild.Action.ADDMIRRORS))

self.__logger.info("%s segment(s) to %s" % (len(self.__mirrorsToBuild), actionName))

self._cleanup_before_recovery(gpArray, gpEnv)
self._validate_gparray(gpArray)
recovery_result = False

if is_full_sync and not is_dir_conflict:
self.checkForPortAndDirectoryConflicts(gpArray)

recovery_info_by_host = recoveryinfo.build_recovery_info(self.__mirrorsToBuild)
self._validate_gparray(gpArray)

self._run_setup_recovery(actionName, recovery_info_by_host)
recovery_info_by_host = recoveryinfo.build_recovery_info(self.__mirrorsToBuild)

self._run_setup_recovery(actionName, recovery_info_by_host)

# 1 - do pg_pasebackup with slot name = 'internal_wal_replication_slot_temp'
recovery_results_stage_1 = self._run_recovery_stage_1_basebackup(actionName, recovery_info_by_host, gpEnv)

# 2 - Stop old mirrors
self._cleanup_before_recovery_2(gpArray, gpEnv)

backout_map = self._update_config(recovery_info_by_host, gpArray)

# 5 - Handle replication slots and start new mirrors
recovery_results_stage_2 = self._run_recovery_stage_2_start_segments(actionName, recovery_info_by_host, gpEnv)

if actionName == GpMirrorListToBuild.Action.RECOVERMIRRORS:
self._revert_config_update(recovery_results_stage_1, backout_map)

self._trigger_fts_probe(port=gpEnv.getCoordinatorPort())

self.__logger.info("[RELOG] __build_mirrors - end (segments are up)")

recovery_result = recovery_results_stage_1.recovery_successful() and recovery_results_stage_2.recovery_successful()

else:
self._cleanup_before_recovery(gpArray, gpEnv)
self._validate_gparray(gpArray)

backout_map = self._update_config(recovery_info_by_host, gpArray)
recovery_info_by_host = recoveryinfo.build_recovery_info(self.__mirrorsToBuild)

recovery_results = self._run_recovery(actionName, recovery_info_by_host, gpEnv)
if actionName == GpMirrorListToBuild.Action.RECOVERMIRRORS:
self._revert_config_update(recovery_results, backout_map)
self._run_setup_recovery(actionName, recovery_info_by_host)

self._trigger_fts_probe(port=gpEnv.getCoordinatorPort())
backout_map = self._update_config(recovery_info_by_host, gpArray)

return recovery_results.recovery_successful()
recovery_results = self._run_recovery(actionName, recovery_info_by_host, gpEnv)
if actionName == GpMirrorListToBuild.Action.RECOVERMIRRORS:
self._revert_config_update(recovery_results, backout_map)

self._trigger_fts_probe(port=gpEnv.getCoordinatorPort())

recovery_result = recovery_results.recovery_successful()

return recovery_result

def _trigger_fts_probe(self, port=0):
self.__logger.info('Triggering FTS probe')
Expand Down Expand Up @@ -587,6 +660,23 @@ def _run_recovery(self, action_name, recovery_info_by_host, gpEnv):
self._remove_progress_files(recovery_info_by_host, recovery_results)
return recovery_results

def _run_recovery_stage_1_basebackup(self, action_name, recovery_info_by_host, gpEnv):
completed_recovery_results = self._do_recovery_stage_1_pg_basebackup(recovery_info_by_host, gpEnv)
recovery_results = RecoveryResult(action_name, completed_recovery_results, self.__logger)
recovery_results.print_bb_rewind_differential_update_and_start_errors()

self._remove_progress_files(recovery_info_by_host, recovery_results)
return recovery_results

def _run_recovery_stage_2_start_segments(self, action_name, recovery_info_by_host, gpEnv):
completed_recovery_results = self._do_recovery_stage_2_start_segments(recovery_info_by_host, gpEnv)
recovery_results = RecoveryResult(action_name, completed_recovery_results, self.__logger)
recovery_results.print_bb_rewind_differential_update_and_start_errors()

self._remove_progress_files(recovery_info_by_host, recovery_results)
return recovery_results


def _do_recovery(self, recovery_info_by_host, gpEnv):
"""
# Recover and start segments using gpsegrecovery, which will internally call either
Expand Down Expand Up @@ -622,6 +712,65 @@ def _do_recovery(self, recovery_info_by_host, gpEnv):
progressCmds=progress_cmds)
return completed_recovery_results

def _do_recovery_stage_1_pg_basebackup(self, recovery_info_by_host, gpEnv):
"""
# do only pg_basebackup
"""
self.__logger.info('Initiating segment recovery - pg_basebackup')
cmds = []
progress_cmds = []
era = read_era(gpEnv.getCoordinatorDataDir(), logger=self.__logger)
for hostName, recovery_info_list in recovery_info_by_host.items():
for ri in recovery_info_list:
ri.is_only_basebackup = True
ri.is_only_start = False
progressCmd = self._get_progress_cmd(ri.progress_file, ri.target_segment_dbid, hostName, ri.is_differential_recovery)
if progressCmd:
progress_cmds.append(progressCmd)

cmds.append(gp.GpSegRecovery('Recover segments - Stage 1 - pg_basebackup',
recoveryinfo.serialize_list(recovery_info_list),
gplog.get_logger_dir(),
verbose=gplog.logging_is_verbose(),
batchSize=self.__parallelPerHost,
remoteHost=hostName,
era=era,
maxRate=self.__maxRate,
forceoverwrite=self.__forceoverwrite))
completed_recovery_results = self.__runWaitAndCheckWorkerPoolForErrorsAndClear(cmds, suppressErrorCheck=True,
progressCmds=progress_cmds)
return completed_recovery_results

def _do_recovery_stage_2_start_segments(self, recovery_info_by_host, gpEnv):
"""
# do only start of segments
"""
self.__logger.info('Initiating segment recovery - start segments')
cmds = []
progress_cmds = []
era = read_era(gpEnv.getCoordinatorDataDir(), logger=self.__logger)
for hostName, recovery_info_list in recovery_info_by_host.items():
for ri in recovery_info_list:
ri.is_only_basebackup = False
ri.is_only_start = True
progressCmd = self._get_progress_cmd(ri.progress_file, ri.target_segment_dbid, hostName, ri.is_differential_recovery)
if progressCmd:
progress_cmds.append(progressCmd)

cmds.append(gp.GpSegRecovery('Recover segments - Stage 2 - start segments',
recoveryinfo.serialize_list(recovery_info_list),
gplog.get_logger_dir(),
verbose=gplog.logging_is_verbose(),
batchSize=self.__parallelPerHost,
remoteHost=hostName,
era=era,
maxRate=self.__maxRate,
forceoverwrite=self.__forceoverwrite))
completed_recovery_results = self.__runWaitAndCheckWorkerPoolForErrorsAndClear(cmds, suppressErrorCheck=True,
progressCmds=progress_cmds)
return completed_recovery_results


def _do_setup_for_recovery(self, recovery_info_by_host):
self.__logger.info('Setting up the required segments for recovery')
cmds = []
Expand Down
6 changes: 4 additions & 2 deletions gpMgmt/bin/gppylib/recoveryinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RecoveryInfo(object):
Note: we don't have target hostname, since an object of this class will be accessed by the target host directly
"""
def __init__(self, target_datadir, target_port, target_segment_dbid, source_hostname, source_port,
source_datadir, is_full_recovery, is_differential_recovery, progress_file):
source_datadir, is_full_recovery, is_differential_recovery, progress_file, is_only_basebackup, is_only_start):
self.target_datadir = target_datadir
self.target_port = target_port
self.target_segment_dbid = target_segment_dbid
Expand All @@ -29,6 +29,8 @@ def __init__(self, target_datadir, target_port, target_segment_dbid, source_host
self.is_full_recovery = is_full_recovery
self.is_differential_recovery = is_differential_recovery
self.progress_file = progress_file
self.is_only_basebackup = is_only_basebackup
self.is_only_start = is_only_start

def __str__(self):
return json.dumps(self, default=lambda o: o.__dict__)
Expand Down Expand Up @@ -71,7 +73,7 @@ def build_recovery_info(mirrors_to_build):
target_segment.getSegmentDataDirectory(), target_segment.getSegmentPort(),
target_segment.getSegmentDbId(), source_segment.getSegmentHostName(),
source_segment.getSegmentPort(), source_segment.getSegmentDataDirectory(),
to_recover.isFullSynchronization(), to_recover.isDifferentialSynchronization(), progress_file))
to_recover.isFullSynchronization(), to_recover.isDifferentialSynchronization(), progress_file, False, False))
return recovery_info_by_host


Expand Down
Loading