diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java index 5123f952ad..50ac8b131b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java @@ -29,7 +29,7 @@ public void onAction(KeeperInfoStatsActionContext context) { long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue(); deleteKeeper(info); Map keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new); - keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow); + keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.isKeeperActive(), info.getHostPort().getPort()), keeperFlow); } catch (Throwable throwable) { logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java index 42f2d3e737..f6dae5cb3d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java @@ -92,16 +92,25 @@ public List getOverloadKeeperContainerMigra @RequestMapping(value = "/keepercontainer/overload/migration/begin", method = RequestMethod.POST) public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List keeperContainerDetailModels) { - logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels); try { - keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels); + if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) { + return RetMessage.createFailMessage("[beginToMigrateOverloadKeeperContainers][fail] has unfinished migration task!"); + } } catch (Throwable th) { - logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage()); + logger.warn("[beginToMigrateOverloadKeeperContainers][fail] {}", keeperContainerDetailModels, th); return RetMessage.createFailMessage(th.getMessage()); } return RetMessage.createSuccessMessage(); } + @RequestMapping(value = "/keepercontainer/overload/migration/terminate", method = RequestMethod.POST) + public RetMessage migrateKeeperTaskTerminate() { + if(keeperContainerMigrationService.stopMigrate()){ + return RetMessage.createSuccessMessage(); + } + return RetMessage.createFailMessage("Migrate tasks has finished"); + } + @RequestMapping(value = "/keepercontainer/overload/info/lasted", method = RequestMethod.GET) public List getLastedAllReadyMigrateKeeperContainers() { return analyzer.getAllDcKeeperContainerUsedInfoModelsList(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java index 189916e888..7cea77edf2 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) { .getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName()); for (ShardModel shardModel : allShardModel) { - if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(), + if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(), (model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null) ? null : model.getTargetKeeperContainer().getAddr().getHost())) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java index 38d6222bb7..9791c81567 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt private final List alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS); + public final static String KEEPER_MIGRATION = "keeper_migration"; + public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success"; public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail"; @@ -47,10 +49,6 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail"; - public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success"; - - public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail"; - public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success"; public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail"; @@ -87,7 +85,7 @@ void migrateAllKeepers(List readyToMigratio ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), false, null); - if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) { logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard); alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp, diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java index 341d5835f1..ede11fdd53 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java @@ -1,18 +1,14 @@ package com.ctrip.xpipe.redis.console.keeper.Command; import com.ctrip.framework.xpipe.redis.ProxyRegistry; -import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.api.command.CommandFuture; -import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.command.AbstractCommand; import com.ctrip.xpipe.netty.commands.NettyClient; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; -import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import com.ctrip.xpipe.utils.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,37 +29,9 @@ protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, this.scheduled = scheduled; } - protected InfoCommand generteInfoCommand(Endpoint key) { - if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { - commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; - } + protected InfoCommand generateInfoReplicationCommand(Endpoint key) { SimpleObjectPool keyPool = keyedObjectPool.getKeyPool(key); return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut); } - protected void addHookAndExecute(Command command, Callbackable callback) { - logger.info("[zyfTest][addHookAndExecute] start execute"); - CommandFuture future = command.execute(); - logger.info("[zyfTest][addHookAndExecute] start addListener"); - future.addListener(new CommandFutureListener() { - @Override - public void operationComplete(CommandFuture commandFuture) throws Exception { - if(!commandFuture.isSuccess()) { - logger.info("[zyfTest][addHookAndExecute] listener fail"); - callback.fail(commandFuture.cause()); - } else { - logger.info("[zyfTest][addHookAndExecute] listener success"); - callback.success(commandFuture.get()); - } - } - }); - try { - logger.info("[zyfTest][addHookAndExecute] before get"); - future.get(); - logger.info("[zyfTest][addHookAndExecute] get over"); - } catch (Exception e){ - throw new RuntimeException(e); - } - } - } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java new file mode 100644 index 0000000000..3f6c2422c7 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java @@ -0,0 +1,41 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; +import com.ctrip.xpipe.utils.VisibleForTesting; + +import java.util.concurrent.ScheduledExecutorService; + +public class CheckKeeperActiveCommand extends AbstractKeeperCommand{ + + private Endpoint keeper; + + private boolean expectedActive; + + public CheckKeeperActiveCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedActive) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + this.expectedActive = expectedActive; + } + + @Override + public String getName() { + return "CheckKeeperActiveCommand"; + } + + @Override + protected void doExecute() throws Throwable { + InfoCommand infoCommand = generateInfoReplicationCommand(keeper); + if (new InfoResultExtractor(infoCommand.execute().get()).isKeeperActive() == expectedActive) { + this.future().setSuccess(); + } + this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedActive))); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java new file mode 100644 index 0000000000..9e318582ac --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java @@ -0,0 +1,39 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand; +import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole; + +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.redis.core.protocal.MASTER_STATE.REDIS_REPL_CONNECTED; + +public class CheckKeeperConnectedCommand extends AbstractKeeperCommand { + + private Endpoint keeper; + + public CheckKeeperConnectedCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + } + + @Override + public String getName() { + return "CheckKeeperConnectedCommand"; + } + + @Override + protected void doExecute() throws Throwable { + SlaveRole role = (SlaveRole)new RoleCommand(keyedObjectPool.getKeyPool(keeper), scheduled).execute().get(); + if (REDIS_REPL_CONNECTED == role.getMasterState()) { + this.future().setSuccess(); + } + this.future().setFailure(new Exception(String.format("ping %s has no pong response", keeper))); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java index 01f5407176..902e0f5896 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java @@ -1,33 +1,24 @@ package com.ctrip.xpipe.redis.console.keeper.Command; -import com.ctrip.xpipe.api.command.Command; import com.ctrip.xpipe.api.endpoint.Endpoint; -import com.ctrip.xpipe.api.pool.ObjectPoolException; -import com.ctrip.xpipe.command.DefaultRetryCommandFactory; -import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import java.util.concurrent.ScheduledExecutorService; public class FullSyncJudgeCommand extends AbstractKeeperCommand { - private Endpoint active; + private Endpoint activeInstance; - private Endpoint backUp; - - private long intervalTime; + private Endpoint backUpInstance; private long activeMasterReplOffset; - private long backupMasterReplOffset; - - public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) { + public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance, long activeMasterReplOffset) { super(keyedObjectPool, scheduled); - this.active = active; - this.backUp = backUp; - this.intervalTime = intervalTime; + this.activeInstance = activeInstance; + this.backUpInstance = backUpInstance; + this.activeMasterReplOffset = activeMasterReplOffset; } @Override @@ -37,51 +28,15 @@ public String getName() { @Override protected void doExecute() throws Throwable { - try { - RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); - Command activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active)); - Command backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp)); - addHookAndExecute(activeRetryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable); - } - }); - - try { - Thread.sleep(intervalTime); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - addHookAndExecute(backUpRetryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable); - } - }); + long backupMasterReplOffset; + backupMasterReplOffset = new InfoResultExtractor(generateInfoReplicationCommand(backUpInstance).execute().get()).getMasterReplOffset(); - if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) { - this.future().setSuccess(); - } - } finally { - try { - keyedObjectPool.clear(active); - keyedObjectPool.clear(backUp); - } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e); - } + logger.debug("[FullSyncJudgeCommand] activeMasterReplOffset: {}:{}, backupMasterReplOffset: {}:{}", + activeInstance, activeMasterReplOffset, backUpInstance, backupMasterReplOffset); + if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) { + this.future().setSuccess(); } + this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance))); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java new file mode 100644 index 0000000000..ba5df5238c --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java @@ -0,0 +1,32 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; + +import java.util.concurrent.ScheduledExecutorService; + +public class KeeperContainerReplOffsetGetCommand extends AbstractKeeperCommand{ + + private Endpoint keeper; + + public KeeperContainerReplOffsetGetCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + } + + @Override + public String getName() { + return "KeeperContainerReplOffsetGetCommand"; + } + + @Override + protected void doExecute() throws Throwable { + this.future().setSuccess(new InfoResultExtractor(generateInfoReplicationCommand(keeper).execute().get()).getMasterReplOffset()); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java new file mode 100644 index 0000000000..d973957768 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java @@ -0,0 +1,35 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.command.AbstractCommand; +import com.ctrip.xpipe.redis.console.service.KeeperContainerService; + +public class KeeperResetCommand extends AbstractCommand { + + private String activeKeeperIp; + + private long shardId; + + private KeeperContainerService keeperContainerService; + + public KeeperResetCommand(String activeKeeperIp, long shardId, KeeperContainerService keeperContainerService) { + this.activeKeeperIp = activeKeeperIp; + this.shardId = shardId; + this.keeperContainerService = keeperContainerService; + } + + @Override + public String getName() { + return "KeeperResetCommand"; + } + + @Override + protected void doExecute() throws Throwable { + keeperContainerService.resetKeepers(activeKeeperIp, shardId); + this.future().setSuccess(); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java deleted file mode 100644 index a2478a9a2d..0000000000 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java +++ /dev/null @@ -1,115 +0,0 @@ -package com.ctrip.xpipe.redis.console.keeper.Command; - -import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.command.DefaultRetryCommandFactory; -import com.ctrip.xpipe.command.RetryCommandFactory; -import com.ctrip.xpipe.endpoint.DefaultEndPoint; -import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; -import com.ctrip.xpipe.redis.console.model.RedisTbl; -import com.ctrip.xpipe.redis.console.service.KeeperContainerService; -import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; - -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; - -public class SwitchMasterCommand extends AbstractKeeperCommand{ - - private String activeIp; - - private String backupIp; - - private List keepers; - - private KeeperContainerService keeperContainerService; - - public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List keepers, KeeperContainerService keeperContainerService) { - super(keyedObjectPool, scheduled); - this.activeIp = activeIp; - this.backupIp = backupIp; - this.keepers = keepers; - this.keeperContainerService = keeperContainerService; - } - - @Override - public String getName() { - return "SwitchMasterCommand"; - } - - @Override - protected void doExecute() throws Throwable { - try { - logger.info("[zyfTest][SwitchMasterCommand] start"); - if (keepers.size() != 2) { - logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers); - return; - } - int activeKeeperPort = -1; - String backUpKeeperIp = null; - for (RedisTbl keeper : keepers) { - if (keeper.getRedisIp().equals(activeIp)) { - activeKeeperPort = keeper.getRedisPort(); - } else { - backUpKeeperIp = keeper.getRedisIp(); - } - } - - if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { - logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}" - , activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp); - return; - } - - KeeperTransMeta keeperInstanceMeta = null; - logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers"); - List allKeepers = keeperContainerService.getAllKeepers(activeIp); - logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers"); - for (KeeperInstanceMeta keeper : allKeepers) { - if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { - keeperInstanceMeta = keeper; - break; - } - } - - if (keeperInstanceMeta == null) { - logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); - return; - } - logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers"); - keeperContainerService.resetKeepers(keeperInstanceMeta); - logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers"); - RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); - Command retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort))); - logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand"); - int finalActiveKeeperPort = activeKeeperPort; - addHookAndExecute(retryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success"); - if (!new InfoResultExtractor(message).getKeeperActive()) { - future().setSuccess(); - } - } - - @Override - public void fail(Throwable throwable) { - logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail"); - logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable); - } - }); - if (retryInfoCommand.future().isSuccess()) { - future().setSuccess(); - logger.info("[zyfTest][SwitchMasterCommand] over success"); - } - } catch (Exception e) { - logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); - } - } - - @Override - protected void doReset() { - - } -} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java deleted file mode 100644 index 212b2a8391..0000000000 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.ctrip.xpipe.redis.console.keeper.impl; - -public class DefaultKeeperContainerAvailablePool { -} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java index 03a398ba6a..e7c494d74d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java @@ -168,16 +168,7 @@ public int hashCode() { @Override public String toString() { - return "MigrationKeeperContainerDetailModel{" + - "srcKeeperContainer=" + srcKeeperContainer + - ", targetKeeperContainer=" + targetKeeperContainer + - ", migrateKeeperCount=" + migrateKeeperCount + - ", migrateKeeperCompleteCount=" + migrateKeeperCompleteCount + - ", switchActive=" + switchActive + - ", keeperPairOverload=" + keeperPairOverload + - ", cause='" + cause + '\'' + - ", migrateShards=" + migrateShards + - ", updateTime=" + updateTime + - '}'; + String type = switchActive ? "switchActiveKeeper" : (keeperPairOverload ? "migrateBackupKeeper" : "migrateActiveKeeper"); + return String.format("[%s]%s->%s:%s", type, srcKeeperContainer.getKeeperIp(), targetKeeperContainer.getKeeperIp(), migrateShards); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java index 4fd2f8353f..14c98ba09b 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java @@ -5,7 +5,9 @@ import java.util.List; public interface KeeperContainerMigrationService { - void beginMigrateKeeperContainers(List keeperContainerDetailModels); + boolean beginMigrateKeeperContainers(List keeperContainerDetailModels); List getMigrationProcess(); + + boolean stopMigrate(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java index 3c84923aa5..6a1d80cec0 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java @@ -42,7 +42,7 @@ public interface KeeperContainerService { List getAllKeepers(String keeperContainerIp); - void resetKeepers(KeeperTransMeta keeperInstanceMeta); + void resetKeepers(String activeKeeperIp, Long replId); Map keeperContainerIdDcMap(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java index 1f116dde43..1833186d75 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java @@ -12,9 +12,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.*; @@ -32,67 +30,69 @@ public class DefaultKeeperContainerMigrationService implements KeeperContainerMi private volatile AtomicBoolean isBegin = new AtomicBoolean(false); @Override - public void beginMigrateKeeperContainers(List keeperContainerDetailModels) { + public boolean beginMigrateKeeperContainers(List keeperContainerDetailModels) { if (!isBegin.compareAndSet(false, true)) { logger.info("[beginMigrateKeeperContainers] has already begin!!"); - return; + return false; } - logger.debug("[beginMigrateKeeperContainers] begin migrate keeper containers {}", keeperContainerDetailModels); - readyToMigrationKeeperContainers = keeperContainerDetailModels; - Set alreadyMigrateShards = new HashSet<>(); - for (MigrationKeeperContainerDetailModel keeperContainer : readyToMigrationKeeperContainers) { - List migrateShards = keeperContainer.getMigrateShards(); - if (CollectionUtils.isEmpty(migrateShards)) continue; + try { + readyToMigrationKeeperContainers = keeperContainerDetailModels; + for (MigrationKeeperContainerDetailModel keeperContainer : readyToMigrationKeeperContainers) { + if(!isBegin.get()) break; + List migrateShards = keeperContainer.getMigrateShards(); + if (CollectionUtils.isEmpty(migrateShards)) continue; - String srcKeeperContainerIp = keeperContainer.getSrcKeeperContainer().getKeeperIp(); - String targetKeeperContainerIp = keeperContainer.getTargetKeeperContainer().getKeeperIp(); - for (DcClusterShard migrateShard : migrateShards) { - ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), - migrateShard.getClusterId(), migrateShard.getShardId(), false, null); - if (!alreadyMigrateShards.add(migrateShard)) { - logger.info("[beginMigrateKeeperContainers] shard {} has already migrated, should not migrate in the same time", migrateShard); - continue; - } - logger.debug("[beginMigrateKeeperContainers] begin migrate shard {} from srcKeeperContainer:{} to targetKeeperContainer:{}", - migrateShard, srcKeeperContainerIp, targetKeeperContainerIp); - String event; - if (keeperContainer.isSwitchActive()) { - logger.info("[zyfTest] start switchMaster"); - if (shardModelService.switchMaster(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) { - logger.info("[zyfTest] switchMaster success"); - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_SWITCH_MASTER_SUCCESS; - } else { - logger.info("[zyfTest] switchMaster fail"); - event = KEEPER_SWITCH_MASTER_FAIL; - } - }else if (keeperContainer.isKeeperPairOverload()) { - if (shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, - srcKeeperContainerIp, targetKeeperContainerIp)) { - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_MIGRATION_BACKUP_SUCCESS; - } else { - event = KEEPER_MIGRATION_BACKUP_FAIL; - } - }else { - if (shardModelService.migrateAutoBalanceKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, - srcKeeperContainerIp, targetKeeperContainerIp)) { - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_MIGRATION_ACTIVE_START_SUCCESS; - } else { - event = KEEPER_MIGRATION_ACTIVE_START_FAIL; + String srcKeeperContainerIp = keeperContainer.getSrcKeeperContainer().getKeeperIp(); + String targetKeeperContainerIp = keeperContainer.getTargetKeeperContainer().getKeeperIp(); + for (DcClusterShard migrateShard : migrateShards) { + if(!isBegin.get()) break; + ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), + migrateShard.getClusterId(), migrateShard.getShardId(), false, null); + logger.debug("[beginMigrateKeeperContainers][{}-{}-{}][{}->{}]", + migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, targetKeeperContainerIp); + String event; + if (keeperContainer.isSwitchActive()) { + if (shardModelService.switchActiveKeeper(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_SWITCH_MASTER_SUCCESS; + } else { + event = KEEPER_SWITCH_MASTER_FAIL; + } + }else if (keeperContainer.isKeeperPairOverload()) { + if (shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + srcKeeperContainerIp, targetKeeperContainerIp)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_MIGRATION_BACKUP_SUCCESS; + } else { + event = KEEPER_MIGRATION_BACKUP_FAIL; + } + }else { + if (shardModelService.migrateActiveKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + srcKeeperContainerIp, targetKeeperContainerIp)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_MIGRATION_ACTIVE_SUCCESS; + } else { + event = KEEPER_MIGRATION_ACTIVE_FAIL; + } } + CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION, event); + logger.info("[migrateKeeperContainers][{}-{}-{}][{}->{}] {}", + migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, targetKeeperContainerIp, event); } - CatEventMonitor.DEFAULT.logEvent(event, String.format("dc:%s, cluster:%s, shard:%s, src:%s, target:%s", - migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, - targetKeeperContainerIp)); } + return true; + } finally { + isBegin.set(false); } - isBegin.set(false); } @Override public List getMigrationProcess() { return readyToMigrationKeeperContainers; } + + @Override + public boolean stopMigrate() { + return isBegin.compareAndSet(true, false); + } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java index 2a2a83f87f..583132ace4 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java @@ -455,12 +455,14 @@ public List getAllKeepers(String keeperContainerIp) { } @Override - public void resetKeepers(KeeperTransMeta keeperInstanceMeta) { + public void resetKeepers(String activeKeeperIp, Long replId) { + KeeperTransMeta keeperInstanceMeta = new KeeperTransMeta(); + keeperInstanceMeta.setReplId(replId); getOrCreateRestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); - restTemplate.exchange(String.format("http://%s:8080/keepers/election/reset", keeperInstanceMeta.getKeeperMeta().getIp()), + restTemplate.exchange(String.format("http://%s:8080/keepers/election/reset", activeKeeperIp), HttpMethod.POST, requestEntity, Void.class); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java index 2becee4d9b..c645d620c8 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java @@ -18,11 +18,11 @@ ShardModel getShardModel(String dcName, String clusterName, String shardName, ShardModel getSourceShardModel(String clusterName, String srcDcName, String toDcName, String shardName); - boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardModel, + boolean migrateBackupKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp); - boolean switchMaster(String srcIp, String targetIp, ShardModel shardModel); + boolean switchActiveKeeper(String srcIp, String targetIp, ShardModel shardModel); - boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, - String srcKeeperContainerIp, String targetKeeperContainerIp); + boolean migrateActiveKeeper(String dcName, String clusterName, ShardModel shardModel, + String srcKeeperContainerIp, String targetKeeperContainerIp); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index 18de8817ae..eac07b5510 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -1,35 +1,22 @@ package com.ctrip.xpipe.redis.console.service.model.impl; -import com.ctrip.framework.xpipe.redis.ProxyRegistry; import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.api.command.CommandFuture; -import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.pool.ObjectPoolException; -import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.command.DefaultRetryCommandFactory; import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.command.SequenceCommandChain; import com.ctrip.xpipe.endpoint.DefaultEndPoint; -import com.ctrip.xpipe.netty.commands.NettyClient; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; import com.ctrip.xpipe.redis.console.constant.XPipeConsoleConstant; import com.ctrip.xpipe.redis.console.exception.DataNotFoundException; import com.ctrip.xpipe.redis.console.exception.ServerException; -import com.ctrip.xpipe.redis.console.keeper.Command.FullSyncJudgeCommand; -import com.ctrip.xpipe.redis.console.keeper.Command.SwitchMasterCommand; +import com.ctrip.xpipe.redis.console.keeper.Command.*; import com.ctrip.xpipe.redis.console.model.*; import com.ctrip.xpipe.redis.console.repository.AzGroupClusterRepository; import com.ctrip.xpipe.redis.console.service.*; import com.ctrip.xpipe.redis.console.service.model.ShardModelService; -import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; -import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import com.ctrip.xpipe.utils.ObjectUtils; import com.ctrip.xpipe.utils.VisibleForTesting; import com.ctrip.xpipe.utils.XpipeThreadFactory; @@ -83,9 +70,11 @@ public class ShardModelServiceImpl implements ShardModelService{ @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; - private RetryCommandFactory switchMasterCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 1000); + private RetryCommandFactory longResultRetryCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 500); - private RetryCommandFactory fullSyncCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); + private RetryCommandFactory retryCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 500); + + private RetryCommandFactory retryLongCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); @Override public List getAllShardModel(String dcName, String clusterName) { @@ -255,7 +244,7 @@ private void addRedisesAndKeepersToNormalShard(ShardModel shardModel, long dcClu } @Override - public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardModel, + public boolean migrateBackupKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp); @@ -263,43 +252,95 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel } @Override - public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { - Command switchMasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeIp, backupIp, shardModel.getKeepers(), keeperContainerService)); + public boolean switchActiveKeeper(String activeIp, String backupIp, ShardModel shardModel) { + List keepers = shardModel.getKeepers(); + if (keepers.size() != 2) { + logger.warn("[switchMaster][keeperSizeMissMatch][{}:{}->{}] {}", + shardModel.getShardTbl().getShardName(), activeIp, backupIp, keepers.size()); + return false; + } + Endpoint activeKeeper = null, backUpKeeper = null; + for (RedisTbl keeper : keepers) { + if (keeper.getRedisIp().equals(activeIp)) { + activeKeeper = new DefaultEndPoint(keeper.getRedisIp(), keeper.getRedisPort()); + } else { + backUpKeeper = new DefaultEndPoint(keeper.getRedisIp(), keeper.getRedisPort()); + } + } + + if (activeKeeper == null || backUpKeeper == null || !backupIp.equals(backUpKeeper.getHost())) { + logger.warn("[switchMaster][keeperActiveMissMatch][{}:{}->{}]keepers1:{}:{},keepers2:{}:{}" + , shardModel.getShardTbl().getShardName(), activeIp, backupIp, + keepers.get(0).getRedisIp(), keepers.get(0).getRedisPort(), keepers.get(1).getRedisIp(), keepers.get(1).getRedisPort()); + return false; + } + Command switchMasterCommand = retryCommandFactory.createRetryCommand(new KeeperResetCommand<>(activeKeeper.getHost(), shardModel.getShardTbl().getId(), keeperContainerService)); + Command checkKeeperRoleCommand = retryCommandFactory.createRetryCommand(new CheckKeeperActiveCommand<>(keyedObjectPool, scheduled, backUpKeeper, true)); + SequenceCommandChain chain = new SequenceCommandChain(false, false); + chain.add(switchMasterCommand); + chain.add(checkKeeperRoleCommand); try { - logger.info("[zyfTest] start switchMasterCommand execute"); - switchMasterCommand.execute().get(); - logger.info("[zyfTest] start switchMasterCommand execute over"); - logger.info("[zyfTest] start switchMasterCommand execute success?:{}",switchMasterCommand.future().isSuccess()); - return switchMasterCommand.future().isSuccess(); + chain.execute().get(); + return chain.future().isSuccess(); } catch (Exception e) { - logger.error("[switchMaster] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); + logger.error("[switchMaster][commandChainError][{}:{}->{}]", shardModel.getShardTbl().getShardName(), activeKeeper, backUpKeeper, e); return false; + } finally { + try { + keyedObjectPool.clear(activeKeeper); + keyedObjectPool.clear(backUpKeeper); + } catch (ObjectPoolException e) { + logger.error("[switchMaster][keyedObjectPoolClearError][{}:{}->{}]", shardModel.getShardTbl().getShardName(), activeKeeper, backUpKeeper, e); + } } } @Override - public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { + public boolean migrateActiveKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { List oldKeepers = shardModel.getKeepers(); List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp); if (!doMigrateKeepers(dcName, clusterName, shardModel, newKeepers)) { - throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); + logger.error("[migrateActiveKeeper][doMigrateKeepersFailed]{}:{}:{}", dcName, clusterName, shardModel); + return false; } - RedisTbl active = newKeepers.get(0); - RedisTbl backup = newKeepers.get(1); - DefaultEndPoint activeKey = new DefaultEndPoint(active.getRedisIp(), active.getRedisPort()); - DefaultEndPoint backupKey = new DefaultEndPoint(backup.getRedisIp(), backup.getRedisPort()); - Command fullSyncJudgeRetryCommand = fullSyncCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKey, backupKey, 1000)); - Command switchmasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeKey.getHost(), backupKey.getHost(), newKeepers, keeperContainerService)); - SequenceCommandChain chain = new SequenceCommandChain(false, false); - chain.add(fullSyncJudgeRetryCommand); - chain.add(switchmasterCommand); + Endpoint activeKeeper, backUpKeeper; + if (newKeepers.get(0).getRedisIp().equals(targetKeeperContainerIp)) { + activeKeeper = new DefaultEndPoint(newKeepers.get(1).getRedisIp(), newKeepers.get(1).getRedisPort()); + backUpKeeper = new DefaultEndPoint(newKeepers.get(0).getRedisIp(), newKeepers.get(0).getRedisPort()); + } else { + backUpKeeper = new DefaultEndPoint(newKeepers.get(1).getRedisIp(), newKeepers.get(1).getRedisPort()); + activeKeeper = new DefaultEndPoint(newKeepers.get(0).getRedisIp(), newKeepers.get(0).getRedisPort()); + } + SequenceCommandChain chain = null; try { + Command pingNewKeeperCommand = retryLongCommandFactory.createRetryCommand(new CheckKeeperConnectedCommand<>(keyedObjectPool, scheduled, backUpKeeper)); + pingNewKeeperCommand.execute().get(); + if (!pingNewKeeperCommand.future().isSuccess()) return false; + Command replOffsetGetCommand = longResultRetryCommandFactory.createRetryCommand(new KeeperContainerReplOffsetGetCommand<>(keyedObjectPool, scheduled, activeKeeper)); + long activeMasterReplOffset = replOffsetGetCommand.execute().get(); + if (!replOffsetGetCommand.future().isSuccess()) return false; + Command fullSyncJudgeRetryCommand = retryCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKeeper, backUpKeeper, activeMasterReplOffset)); + Command switchmasterCommand = retryCommandFactory.createRetryCommand(new KeeperResetCommand<>(activeKeeper.getHost(), shardModel.getShardTbl().getId(), keeperContainerService)); + Command checkKeeperRoleCommand = retryCommandFactory.createRetryCommand(new CheckKeeperActiveCommand<>(keyedObjectPool, scheduled, backUpKeeper, true)); + chain = new SequenceCommandChain(false, false); + chain.add(fullSyncJudgeRetryCommand); + chain.add(switchmasterCommand); + chain.add(checkKeeperRoleCommand); chain.execute().get(); - return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); - } catch (InterruptedException | ExecutionException e) { - logger.error("[fullSyncJudge] execute fullSyncJudgeRetryCommand fail", e); - return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); + return chain.future().isSuccess(); + } catch (ExecutionException | InterruptedException e) { + return false; + } finally { + if (chain == null || !chain.future().isSuccess()) { + getAutoBalanceResult(false, dcName, clusterName, shardModel, oldKeepers); + } + try { + keyedObjectPool.clear(activeKeeper); + keyedObjectPool.clear(backUpKeeper); + } catch (ObjectPoolException e) { + logger.error("[migrateActiveKeeper][keyedObjectPoolClearError][{}, {}]", activeKeeper, backUpKeeper, e); + } } } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java index 8894ae9f40..62eaf9f949 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java @@ -40,6 +40,8 @@ import com.ctrip.xpipe.redis.console.healthcheck.nonredis.sentinelconfig.SentinelConfigCheckTest; import com.ctrip.xpipe.redis.console.healthcheck.nonredis.unhealthycluster.UnhealthyClusterCheckerTest; import com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerActionTest; +import com.ctrip.xpipe.redis.console.keeper.impl.AbstractKeeperCommandTest; +import com.ctrip.xpipe.redis.console.keeper.impl.CheckerKeeperActiveCommandTest; import com.ctrip.xpipe.redis.console.keeper.impl.DefaultKeeperUsedInfoAnalyzerTest; import com.ctrip.xpipe.redis.console.keeper.impl.GetAllDcCommandTest; import com.ctrip.xpipe.redis.console.migration.MigrationShardRollbackTest; @@ -235,6 +237,8 @@ AutoMigrateOverloadKeeperContainerActionTest.class, DefaultKeeperUsedInfoAnalyzerTest.class, GetAllDcCommandTest.class, + AbstractKeeperCommandTest.class, + CheckerKeeperActiveCommandTest.class, RouteInfoControllerTest.class, RedisControllerTest.class, diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java index ac2c89345d..d5f6e1cd19 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java @@ -40,7 +40,7 @@ public void beforeAutoMigrateOverloadKeeperContainerActionTest() { ShardModel shardModel = new ShardModel(); Mockito.when(shardModelService.getShardModel(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyObject())) .thenReturn(shardModel); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) .thenReturn(true); } @@ -100,7 +100,7 @@ public void testMigrateAllKeepersFail() { .setSrcKeeperContainer(model2).setTargetKeeperContainer(model4).setMigrateKeeperCount(4).setMigrateShards(migrationShards2); readyToMigrationKeeperContainers.add(migrationKeeperContainerDetailModel2); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())).thenReturn(false); + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())).thenReturn(false); action.migrateAllKeepers(readyToMigrationKeeperContainers); Assert.assertEquals(0, migrationKeeperContainerDetailModel1.getMigrateKeeperCompleteCount()); diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java new file mode 100644 index 0000000000..28b7fa47fc --- /dev/null +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java @@ -0,0 +1,76 @@ +package com.ctrip.xpipe.redis.console.keeper.impl; + +import com.ctrip.xpipe.api.command.CommandFuture; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.console.keeper.Command.AbstractKeeperCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; + +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; + +import static org.mockito.Mockito.when; + + +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) +public class AbstractKeeperCommandTest { + + @Mock + XpipeNettyClientKeyedObjectPool keyedObjectPool; + + @Mock + ScheduledExecutorService scheduled; + + @Mock + SimpleObjectPool keyPool; + + @Mock + InfoCommand infoCommand; + + @Mock + CommandFuture infoCommandFuture; + + static Endpoint key = new DefaultEndPoint("10.10.10.10", 6379); + + @PostConstruct + public void post() { + when(infoCommand.execute()).thenReturn(infoCommandFuture); + when(keyedObjectPool.getKeyPool(key)).thenReturn(keyPool); + } + + @Test + public void testGetKeeperCommandName() throws Throwable { + TestAbstractKeeperCommandTest test = new TestAbstractKeeperCommandTest(keyedObjectPool, scheduled); + test.doExecute(); + } + + private static class TestAbstractKeeperCommandTest extends AbstractKeeperCommand{ + + protected TestAbstractKeeperCommandTest(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled) { + super(keyedObjectPool, scheduled); + } + + @Override + public String getName() { + return null; + } + + @Override + protected void doExecute() throws Throwable { + this.generateInfoReplicationCommand(key); + } + + @Override + protected void doReset() { + + } + } + +} diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java new file mode 100644 index 0000000000..bff28f98be --- /dev/null +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java @@ -0,0 +1,26 @@ +package com.ctrip.xpipe.redis.console.keeper.impl; + +import com.ctrip.xpipe.redis.console.keeper.Command.CheckKeeperActiveCommand; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; + +import java.util.concurrent.ExecutionException; + +import static org.mockito.Mockito.when; + + +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) +public class CheckerKeeperActiveCommandTest extends AbstractKeeperCommandTest{ + + @Test + public void checkerKeeperActiveCommandTest() { + CheckKeeperActiveCommand command = new CheckKeeperActiveCommand(keyedObjectPool, scheduled, key, true); + command.execute(); + Assert.assertFalse(command.future().isSuccess()); + } + + +} diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java index fb0e585003..89e1e2a26d 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java @@ -1,6 +1,5 @@ package com.ctrip.xpipe.redis.console.service; -import com.ctrip.xpipe.command.DefaultCommandFuture; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; import com.ctrip.xpipe.redis.console.model.RedisTbl; @@ -8,7 +7,6 @@ import com.ctrip.xpipe.redis.console.model.ShardTbl; import com.ctrip.xpipe.redis.console.service.impl.DefaultKeeperAdvancedService; import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -73,7 +71,7 @@ public void testMigrateAutoBalanceKeepers() throws Exception { Mockito.when(executor.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenReturn(future); Mockito.when(future.get()).thenReturn(null); try { - shardModelService.migrateAutoBalanceKeepers(dcName, clusterName, shardModel, srcIp, targetIp); + shardModelService.migrateActiveKeeper(dcName, clusterName, shardModel, srcIp, targetIp); } catch (Exception e) { Assert.assertEquals(e.getClass(), RuntimeException.class); } @@ -90,4 +88,9 @@ public void testGetSwitchMaterNewKeepers() { Assert.assertFalse(switchMaterNewKeepers.get(0).isMaster()); } + @Test + public void testSwitchActiveKeeper() { + + } + } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java index 8bb7ad0821..fac19b8b41 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java @@ -9,7 +9,6 @@ import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -40,9 +39,9 @@ public void before() { ShardModel shardModel = new ShardModel(); Mockito.when(shardModelService.getShardModel(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyObject())) .thenReturn(shardModel); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) .thenReturn(true); - Mockito.when(shardModelService.switchMaster(Mockito.anyString(), Mockito.anyString(), Mockito.any())) + Mockito.when(shardModelService.switchActiveKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any())) .thenReturn(true); } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java index 7dcb8e6d10..e5dd837520 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java @@ -325,7 +325,7 @@ public void resetKeepersTest() { HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), eq(requestEntity), eq(Void.class))).thenReturn(null); - keeperContainerService.resetKeepers(keeperInstanceMeta); + keeperContainerService.resetKeepers(keeperInstanceMeta.getKeeperMeta().getIp(), keeperInstanceMeta.getReplId()); } @Test diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java index 3d8043d9d5..bb556846f5 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java @@ -120,7 +120,8 @@ public long getSyncPartialErr() { public Long getSwapUsedDbSize() { return extractAsLong(KEY_SWAP_USED_DB_SIZE);} - public boolean getKeeperActive() { return "ACTIVE".equals(extract(KEY_KEEPER_ACTIVE)); } + //Todo @yifuzhou + public boolean isKeeperActive() { return "ACTIVE".equals(extract(KEY_KEEPER_ACTIVE)); } public long getMasterReplOffset() { Long result = extractAsLong(KEY_MASTER_REPL_OFFSET);