Skip to content

Commit 8b9e56c

Browse files
author
yifuzhou
committed
执行迁移计划流程控制优化
1 parent 4ba3726 commit 8b9e56c

32 files changed

+624
-414
lines changed

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void onAction(KeeperInfoStatsActionContext context) {
2929
long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue();
3030
deleteKeeper(info);
3131
Map<DcClusterShardKeeper, Long> keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new);
32-
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow);
32+
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.isKeeperActive(), info.getHostPort().getPort()), keeperFlow);
3333
} catch (Throwable throwable) {
3434
logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable);
3535
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ctrip.xpipe.redis.console.controller.consoleportal;
22

33
import com.ctrip.xpipe.redis.checker.controller.result.RetMessage;
4+
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
45
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
56
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
67
import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer;
@@ -92,16 +93,25 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra
9293

9394
@RequestMapping(value = "/keepercontainer/overload/migration/begin", method = RequestMethod.POST)
9495
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
95-
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
9696
try {
97-
keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
97+
if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) {
98+
return RetMessage.createFailMessage("The previous migration tasks are still in progress!");
99+
}
98100
} catch (Throwable th) {
99-
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
101+
logger.warn("[beginToMigrateOverloadKeeperContainers][fail] {}", keeperContainerDetailModels, th);
100102
return RetMessage.createFailMessage(th.getMessage());
101103
}
102104
return RetMessage.createSuccessMessage();
103105
}
104106

107+
@RequestMapping(value = "/keepercontainer/overload/migration/terminate", method = RequestMethod.POST)
108+
public RetMessage migrateKeeperTaskTerminate() {
109+
if(keeperContainerMigrationService.stopMigrate()){
110+
return RetMessage.createSuccessMessage("All migration tasks have been completed");
111+
}
112+
return RetMessage.createSuccessMessage("No migration tasks in progress");
113+
}
114+
105115
@RequestMapping(value = "/keepercontainer/overload/info/lasted", method = RequestMethod.GET)
106116
public List<KeeperContainerUsedInfoModel> getLastedAllReadyMigrateKeeperContainers() {
107117
return analyzer.getAllDcKeeperContainerUsedInfoModelsList();

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) {
5959
.getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName());
6060

6161
for (ShardModel shardModel : allShardModel) {
62-
if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(),
62+
if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(),
6363
clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(),
6464
(model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null)
6565
? null : model.getTargetKeeperContainer().getAddr().getHost())) {

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt
3939

4040
private final List<ALERT_TYPE> alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS);
4141

42+
public final static String KEEPER_MIGRATION = "keeper_migration";
43+
4244
public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success";
4345

4446
public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail";
@@ -47,14 +49,12 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt
4749

4850
public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail";
4951

50-
public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success";
51-
52-
public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail";
53-
5452
public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success";
5553

5654
public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail";
5755

56+
public final static String KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR = "keeper_migration_active_rollback_error";
57+
5858
public final static String KEEPER_MIGRATION_BACKUP_SUCCESS = "keeper_migration_backup_success";
5959

6060
public final static String KEEPER_MIGRATION_BACKUP_FAIL = "keeper_migration_backup_fail";
@@ -87,7 +87,7 @@ void migrateAllKeepers(List<MigrationKeeperContainerDetailModel> readyToMigratio
8787
ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(),
8888
migrateShard.getClusterId(), migrateShard.getShardId(), false, null);
8989

90-
if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
90+
if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
9191
migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) {
9292
logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard);
9393
alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp,
Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
package com.ctrip.xpipe.redis.console.keeper.Command;
22

33
import com.ctrip.framework.xpipe.redis.ProxyRegistry;
4-
import com.ctrip.xpipe.api.command.Command;
5-
import com.ctrip.xpipe.api.command.CommandFuture;
6-
import com.ctrip.xpipe.api.command.CommandFutureListener;
74
import com.ctrip.xpipe.api.endpoint.Endpoint;
85
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
96
import com.ctrip.xpipe.command.AbstractCommand;
107
import com.ctrip.xpipe.netty.commands.NettyClient;
118
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
12-
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
13-
import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand;
149
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
1510
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
11+
import com.ctrip.xpipe.utils.VisibleForTesting;
1612
import org.slf4j.Logger;
1713
import org.slf4j.LoggerFactory;
1814

@@ -33,37 +29,9 @@ protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool,
3329
this.scheduled = scheduled;
3430
}
3531

36-
protected InfoCommand generteInfoCommand(Endpoint key) {
37-
if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) {
38-
commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
39-
}
32+
protected InfoCommand generateInfoReplicationCommand(Endpoint key) {
4033
SimpleObjectPool<NettyClient> keyPool = keyedObjectPool.getKeyPool(key);
4134
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
4235
}
4336

44-
protected <V> void addHookAndExecute(Command<V> command, Callbackable<V> callback) {
45-
logger.info("[zyfTest][addHookAndExecute] start execute");
46-
CommandFuture<V> future = command.execute();
47-
logger.info("[zyfTest][addHookAndExecute] start addListener");
48-
future.addListener(new CommandFutureListener<V>() {
49-
@Override
50-
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
51-
if(!commandFuture.isSuccess()) {
52-
logger.info("[zyfTest][addHookAndExecute] listener fail");
53-
callback.fail(commandFuture.cause());
54-
} else {
55-
logger.info("[zyfTest][addHookAndExecute] listener success");
56-
callback.success(commandFuture.get());
57-
}
58-
}
59-
});
60-
try {
61-
logger.info("[zyfTest][addHookAndExecute] before get");
62-
future.get();
63-
logger.info("[zyfTest][addHookAndExecute] get over");
64-
} catch (Exception e){
65-
throw new RuntimeException(e);
66-
}
67-
}
68-
6937
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.ctrip.xpipe.redis.console.keeper.Command;
2+
3+
import com.ctrip.xpipe.api.endpoint.Endpoint;
4+
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
5+
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
6+
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
7+
import com.ctrip.xpipe.utils.VisibleForTesting;
8+
9+
import java.util.concurrent.ScheduledExecutorService;
10+
11+
public class CheckKeeperActiveCommand<T> extends AbstractKeeperCommand<T>{
12+
13+
private Endpoint keeper;
14+
15+
private boolean expectedActive;
16+
17+
public CheckKeeperActiveCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedActive) {
18+
super(keyedObjectPool, scheduled);
19+
this.keeper = keeper;
20+
this.expectedActive = expectedActive;
21+
}
22+
23+
@Override
24+
public String getName() {
25+
return "CheckKeeperActiveCommand";
26+
}
27+
28+
@Override
29+
protected void doExecute() throws Throwable {
30+
InfoCommand infoCommand = generateInfoReplicationCommand(keeper);
31+
if (new InfoResultExtractor(infoCommand.execute().get()).isKeeperActive() == expectedActive) {
32+
this.future().setSuccess();
33+
return;
34+
}
35+
this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedActive)));
36+
}
37+
38+
@Override
39+
protected void doReset() {
40+
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.ctrip.xpipe.redis.console.keeper.Command;
2+
3+
import com.ctrip.xpipe.api.endpoint.Endpoint;
4+
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
5+
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
6+
import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole;
7+
8+
import java.util.concurrent.ScheduledExecutorService;
9+
10+
import static com.ctrip.xpipe.redis.core.protocal.MASTER_STATE.REDIS_REPL_CONNECTED;
11+
12+
public class CheckKeeperConnectedCommand<T> extends AbstractKeeperCommand<T> {
13+
14+
private Endpoint keeper;
15+
16+
public CheckKeeperConnectedCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
17+
super(keyedObjectPool, scheduled);
18+
this.keeper = keeper;
19+
}
20+
21+
@Override
22+
public String getName() {
23+
return "CheckKeeperConnectedCommand";
24+
}
25+
26+
@Override
27+
protected void doExecute() throws Throwable {
28+
SlaveRole role = (SlaveRole)new RoleCommand(keyedObjectPool.getKeyPool(keeper), scheduled).execute().get();
29+
if (REDIS_REPL_CONNECTED == role.getMasterState()) {
30+
this.future().setSuccess();
31+
return;
32+
}
33+
this.future().setFailure(new Exception(String.format("ping %s has no pong response", keeper)));
34+
}
35+
36+
@Override
37+
protected void doReset() {
38+
39+
}
40+
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java

Lines changed: 12 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,24 @@
11
package com.ctrip.xpipe.redis.console.keeper.Command;
22

3-
import com.ctrip.xpipe.api.command.Command;
43
import com.ctrip.xpipe.api.endpoint.Endpoint;
5-
import com.ctrip.xpipe.api.pool.ObjectPoolException;
6-
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
7-
import com.ctrip.xpipe.command.RetryCommandFactory;
84
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
9-
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
105
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
116

127
import java.util.concurrent.ScheduledExecutorService;
138

149
public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {
1510

16-
private Endpoint active;
11+
private Endpoint activeInstance;
1712

18-
private Endpoint backUp;
19-
20-
private long intervalTime;
13+
private Endpoint backUpInstance;
2114

2215
private long activeMasterReplOffset;
2316

24-
private long backupMasterReplOffset;
25-
26-
public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) {
17+
public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance, long activeMasterReplOffset) {
2718
super(keyedObjectPool, scheduled);
28-
this.active = active;
29-
this.backUp = backUp;
30-
this.intervalTime = intervalTime;
19+
this.activeInstance = activeInstance;
20+
this.backUpInstance = backUpInstance;
21+
this.activeMasterReplOffset = activeMasterReplOffset;
3122
}
3223

3324
@Override
@@ -37,51 +28,13 @@ public String getName() {
3728

3829
@Override
3930
protected void doExecute() throws Throwable {
40-
try {
41-
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000);
42-
Command<String> activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active));
43-
Command<String> backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp));
44-
addHookAndExecute(activeRetryInfoCommand, new Callbackable<String>() {
45-
@Override
46-
public void success(String message) {
47-
activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
48-
}
49-
50-
@Override
51-
public void fail(Throwable throwable) {
52-
logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable);
53-
}
54-
});
55-
56-
try {
57-
Thread.sleep(intervalTime);
58-
} catch (InterruptedException e) {
59-
throw new RuntimeException(e);
60-
}
61-
62-
addHookAndExecute(backUpRetryInfoCommand, new Callbackable<String>() {
63-
@Override
64-
public void success(String message) {
65-
backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
66-
}
67-
68-
@Override
69-
public void fail(Throwable throwable) {
70-
logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable);
71-
}
72-
});
73-
74-
if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
75-
this.future().setSuccess();
76-
}
77-
} finally {
78-
try {
79-
keyedObjectPool.clear(active);
80-
keyedObjectPool.clear(backUp);
81-
} catch (ObjectPoolException e) {
82-
logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e);
83-
}
31+
long backupMasterReplOffset;
32+
backupMasterReplOffset = new InfoResultExtractor(generateInfoReplicationCommand(backUpInstance).execute().get()).getMasterReplOffset();
33+
if (backupMasterReplOffset > 0 && activeMasterReplOffset > 0 && backupMasterReplOffset > activeMasterReplOffset) {
34+
this.future().setSuccess();
35+
return;
8436
}
37+
this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance)));
8538
}
8639

8740
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.ctrip.xpipe.redis.console.keeper.Command;
2+
3+
import com.ctrip.xpipe.api.endpoint.Endpoint;
4+
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
5+
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
6+
7+
import java.util.concurrent.ScheduledExecutorService;
8+
9+
public class KeeperContainerReplOffsetGetCommand<V> extends AbstractKeeperCommand<Object>{
10+
11+
private Endpoint keeper;
12+
13+
public KeeperContainerReplOffsetGetCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
14+
super(keyedObjectPool, scheduled);
15+
this.keeper = keeper;
16+
}
17+
18+
@Override
19+
public String getName() {
20+
return "KeeperContainerReplOffsetGetCommand";
21+
}
22+
23+
@Override
24+
protected void doExecute() throws Throwable {
25+
this.future().setSuccess(new InfoResultExtractor(generateInfoReplicationCommand(keeper).execute().get()).getMasterReplOffset());
26+
}
27+
28+
@Override
29+
protected void doReset() {
30+
31+
}
32+
}

0 commit comments

Comments
 (0)