Skip to content

Commit 4ba3726

Browse files
author
yifuzhou
committed
执行迁移计划command结构优化
1 parent 08ca54e commit 4ba3726

File tree

8 files changed

+312
-283
lines changed

8 files changed

+312
-283
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.ctrip.xpipe.redis.console.keeper.Command;
2+
3+
import com.ctrip.framework.xpipe.redis.ProxyRegistry;
4+
import com.ctrip.xpipe.api.command.Command;
5+
import com.ctrip.xpipe.api.command.CommandFuture;
6+
import com.ctrip.xpipe.api.command.CommandFutureListener;
7+
import com.ctrip.xpipe.api.endpoint.Endpoint;
8+
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
9+
import com.ctrip.xpipe.command.AbstractCommand;
10+
import com.ctrip.xpipe.netty.commands.NettyClient;
11+
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
12+
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
13+
import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand;
14+
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
15+
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import java.util.concurrent.ScheduledExecutorService;
20+
21+
public abstract class AbstractKeeperCommand<V> extends AbstractCommand<V> {
22+
23+
protected XpipeNettyClientKeyedObjectPool keyedObjectPool;
24+
25+
protected ScheduledExecutorService scheduled;
26+
27+
protected static final Logger logger = LoggerFactory.getLogger(AbstractKeeperCommand.class);
28+
29+
private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI)));
30+
31+
protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled) {
32+
this.keyedObjectPool = keyedObjectPool;
33+
this.scheduled = scheduled;
34+
}
35+
36+
protected InfoCommand generteInfoCommand(Endpoint key) {
37+
if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) {
38+
commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
39+
}
40+
SimpleObjectPool<NettyClient> keyPool = keyedObjectPool.getKeyPool(key);
41+
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
42+
}
43+
44+
protected <V> void addHookAndExecute(Command<V> command, Callbackable<V> callback) {
45+
logger.info("[zyfTest][addHookAndExecute] start execute");
46+
CommandFuture<V> future = command.execute();
47+
logger.info("[zyfTest][addHookAndExecute] start addListener");
48+
future.addListener(new CommandFutureListener<V>() {
49+
@Override
50+
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
51+
if(!commandFuture.isSuccess()) {
52+
logger.info("[zyfTest][addHookAndExecute] listener fail");
53+
callback.fail(commandFuture.cause());
54+
} else {
55+
logger.info("[zyfTest][addHookAndExecute] listener success");
56+
callback.success(commandFuture.get());
57+
}
58+
}
59+
});
60+
try {
61+
logger.info("[zyfTest][addHookAndExecute] before get");
62+
future.get();
63+
logger.info("[zyfTest][addHookAndExecute] get over");
64+
} catch (Exception e){
65+
throw new RuntimeException(e);
66+
}
67+
}
68+
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.ctrip.xpipe.redis.console.keeper.Command;
2+
3+
import com.ctrip.xpipe.api.command.Command;
4+
import com.ctrip.xpipe.api.endpoint.Endpoint;
5+
import com.ctrip.xpipe.api.pool.ObjectPoolException;
6+
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
7+
import com.ctrip.xpipe.command.RetryCommandFactory;
8+
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
9+
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
10+
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
11+
12+
import java.util.concurrent.ScheduledExecutorService;
13+
14+
public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {
15+
16+
private Endpoint active;
17+
18+
private Endpoint backUp;
19+
20+
private long intervalTime;
21+
22+
private long activeMasterReplOffset;
23+
24+
private long backupMasterReplOffset;
25+
26+
public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) {
27+
super(keyedObjectPool, scheduled);
28+
this.active = active;
29+
this.backUp = backUp;
30+
this.intervalTime = intervalTime;
31+
}
32+
33+
@Override
34+
public String getName() {
35+
return "FullSyncJudgeCommand";
36+
}
37+
38+
@Override
39+
protected void doExecute() throws Throwable {
40+
try {
41+
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000);
42+
Command<String> activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active));
43+
Command<String> backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp));
44+
addHookAndExecute(activeRetryInfoCommand, new Callbackable<String>() {
45+
@Override
46+
public void success(String message) {
47+
activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
48+
}
49+
50+
@Override
51+
public void fail(Throwable throwable) {
52+
logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable);
53+
}
54+
});
55+
56+
try {
57+
Thread.sleep(intervalTime);
58+
} catch (InterruptedException e) {
59+
throw new RuntimeException(e);
60+
}
61+
62+
addHookAndExecute(backUpRetryInfoCommand, new Callbackable<String>() {
63+
@Override
64+
public void success(String message) {
65+
backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
66+
}
67+
68+
@Override
69+
public void fail(Throwable throwable) {
70+
logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable);
71+
}
72+
});
73+
74+
if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
75+
this.future().setSuccess();
76+
}
77+
} finally {
78+
try {
79+
keyedObjectPool.clear(active);
80+
keyedObjectPool.clear(backUp);
81+
} catch (ObjectPoolException e) {
82+
logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e);
83+
}
84+
}
85+
}
86+
87+
@Override
88+
protected void doReset() {
89+
90+
}
91+
92+
93+
94+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package com.ctrip.xpipe.redis.console.keeper.Command;
2+
3+
import com.ctrip.xpipe.api.command.Command;
4+
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
5+
import com.ctrip.xpipe.command.RetryCommandFactory;
6+
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
7+
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
8+
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
9+
import com.ctrip.xpipe.redis.console.model.RedisTbl;
10+
import com.ctrip.xpipe.redis.console.service.KeeperContainerService;
11+
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
12+
import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta;
13+
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
14+
15+
import java.util.List;
16+
import java.util.concurrent.ScheduledExecutorService;
17+
18+
public class SwitchMasterCommand<T> extends AbstractKeeperCommand<T>{
19+
20+
private String activeIp;
21+
22+
private String backupIp;
23+
24+
private List<RedisTbl> keepers;
25+
26+
private KeeperContainerService keeperContainerService;
27+
28+
public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List<RedisTbl> keepers, KeeperContainerService keeperContainerService) {
29+
super(keyedObjectPool, scheduled);
30+
this.activeIp = activeIp;
31+
this.backupIp = backupIp;
32+
this.keepers = keepers;
33+
this.keeperContainerService = keeperContainerService;
34+
}
35+
36+
@Override
37+
public String getName() {
38+
return "SwitchMasterCommand";
39+
}
40+
41+
@Override
42+
protected void doExecute() throws Throwable {
43+
try {
44+
logger.info("[zyfTest][SwitchMasterCommand] start");
45+
if (keepers.size() != 2) {
46+
logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers);
47+
return;
48+
}
49+
int activeKeeperPort = -1;
50+
String backUpKeeperIp = null;
51+
for (RedisTbl keeper : keepers) {
52+
if (keeper.getRedisIp().equals(activeIp)) {
53+
activeKeeperPort = keeper.getRedisPort();
54+
} else {
55+
backUpKeeperIp = keeper.getRedisIp();
56+
}
57+
}
58+
59+
if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) {
60+
logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}"
61+
, activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp);
62+
return;
63+
}
64+
65+
KeeperTransMeta keeperInstanceMeta = null;
66+
logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers");
67+
List<KeeperInstanceMeta> allKeepers = keeperContainerService.getAllKeepers(activeIp);
68+
logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers");
69+
for (KeeperInstanceMeta keeper : allKeepers) {
70+
if (keeper.getKeeperMeta().getPort() == activeKeeperPort) {
71+
keeperInstanceMeta = keeper;
72+
break;
73+
}
74+
}
75+
76+
if (keeperInstanceMeta == null) {
77+
logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort);
78+
return;
79+
}
80+
logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers");
81+
keeperContainerService.resetKeepers(keeperInstanceMeta);
82+
logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers");
83+
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000);
84+
Command<String> retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort)));
85+
logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand");
86+
int finalActiveKeeperPort = activeKeeperPort;
87+
addHookAndExecute(retryInfoCommand, new Callbackable<String>() {
88+
@Override
89+
public void success(String message) {
90+
logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success");
91+
if (!new InfoResultExtractor(message).getKeeperActive()) {
92+
future().setSuccess();
93+
}
94+
}
95+
96+
@Override
97+
public void fail(Throwable throwable) {
98+
logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail");
99+
logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable);
100+
}
101+
});
102+
if (retryInfoCommand.future().isSuccess()) {
103+
future().setSuccess();
104+
logger.info("[zyfTest][SwitchMasterCommand] over success");
105+
}
106+
} catch (Exception e) {
107+
logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e);
108+
}
109+
}
110+
111+
@Override
112+
protected void doReset() {
113+
114+
}
115+
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperAdvancedService.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ List<KeeperBasicInfo> findBestKeepers(String dcName, int beginPort, BiPredicate<
2020

2121
List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp);
2222

23-
List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp, boolean isAutoRebalance);
24-
2523
List<RedisTbl> getSwitchMaterNewKeepers(ShardModel shardModel);
2624

2725
List<KeeperBasicInfo> findBestKeepersByKeeperContainer(String targetKeeperContainerIp, int beginPort,

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperAdvancedService.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,10 @@ public List<KeeperBasicInfo> findBestKeepers(String dcName, int beginPort, BiPre
7272

7373
@Override
7474
public List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) {
75-
return getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp, false);
76-
}
77-
78-
@Override
79-
public List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp, boolean isAutoRebalance) {
8075
List<RedisTbl> newKeepers = new ArrayList<>();
8176
logger.debug("[migrateKeepers] origin keepers {} from cluster:{}, dc:{}, shard:{}",shardModel.getKeepers(), clusterName, dcName, shardModel.getShardTbl().getShardName());
8277
for (RedisTbl keeper : shardModel.getKeepers()) {
8378
if (!ObjectUtils.equals(keeper.getRedisIp(), srcKeeperContainerIp)) {
84-
if (isAutoRebalance) {
85-
keeper.setMaster(true);
86-
}
8779
newKeepers.add(keeper);
8880
}
8981
}
@@ -110,8 +102,7 @@ && isDifferentAz(keeperSelected, alreadyUsedAzId, dcName)) {
110102
newKeepers.add(new RedisTbl().setKeepercontainerId(keeperSelected.getKeeperContainerId())
111103
.setRedisIp(keeperSelected.getHost())
112104
.setRedisPort(keeperSelected.getPort())
113-
.setRedisRole(XPipeConsoleConstant.ROLE_KEEPER)
114-
.setMaster(!newKeepers.get(0).isMaster()));
105+
.setRedisRole(XPipeConsoleConstant.ROLE_KEEPER));
115106
break;
116107
}
117108
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,13 @@ public void beginMigrateKeeperContainers(List<MigrationKeeperContainerDetailMode
5757
migrateShard, srcKeeperContainerIp, targetKeeperContainerIp);
5858
String event;
5959
if (keeperContainer.isSwitchActive()) {
60+
logger.info("[zyfTest] start switchMaster");
6061
if (shardModelService.switchMaster(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) {
62+
logger.info("[zyfTest] switchMaster success");
6163
keeperContainer.migrateKeeperCompleteCountIncrease();
6264
event = KEEPER_SWITCH_MASTER_SUCCESS;
6365
} else {
66+
logger.info("[zyfTest] switchMaster fail");
6467
event = KEEPER_SWITCH_MASTER_FAIL;
6568
}
6669
}else if (keeperContainer.isKeeperPairOverload()) {

0 commit comments

Comments
 (0)