diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java index 916852b1a0..8384fd4dea 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java @@ -10,7 +10,7 @@ public interface KeeperContainerUsedInfoAnalyzer { - void updateKeeperContainerUsedInfo(Map modelMap); + void updateKeeperContainerUsedInfo(String dcName, Map modelMap); List getAllDcReadyToMigrationKeeperContainers(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java index dd280983a4..b77f6e93e6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java @@ -1,13 +1,12 @@ package com.ctrip.xpipe.redis.console.keeper.impl; +import com.ctrip.xpipe.api.cluster.CrossDcLeaderAware; import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.api.monitor.Task; import com.ctrip.xpipe.api.monitor.TransactionMonitor; import com.ctrip.xpipe.command.ParallelCommandChain; import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; -import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; -import com.ctrip.xpipe.redis.checker.model.RedisMsg; import com.ctrip.xpipe.redis.console.config.ConsoleConfig; import com.ctrip.xpipe.redis.console.keeper.command.AbstractGetAllDcCommand; import com.ctrip.xpipe.redis.console.keeper.command.KeeperContainerInfoGetCommand; @@ -16,7 +15,6 @@ import com.ctrip.xpipe.redis.console.model.*; import com.ctrip.xpipe.redis.core.service.AbstractService; import com.ctrip.xpipe.spring.AbstractSpringConfigContext; -import com.ctrip.xpipe.tuple.Pair; import com.ctrip.xpipe.utils.DateTimeUtils; import com.ctrip.xpipe.utils.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; @@ -25,12 +23,13 @@ import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService implements KeeperContainerUsedInfoAnalyzer { +public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService implements KeeperContainerUsedInfoAnalyzer, CrossDcLeaderAware { private static final Logger logger = LoggerFactory.getLogger(DefaultKeeperContainerUsedInfoAnalyzer.class); @@ -41,9 +40,9 @@ public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService impl @Resource(name = AbstractSpringConfigContext.GLOBAL_EXECUTOR) private Executor executors; - private Map currentDcAllKeeperContainerUsedInfoModelMap = new HashMap<>(); + private Map> dcKeeperContainerUsedInfoMap = new ConcurrentHashMap<>(); - private List keeperContainerMigrationResult = new ArrayList<>(); + private Map> dcKeeperContainerMigrationResultMap = new ConcurrentHashMap<>(); private static final String currentDc = FoundationService.DEFAULT.getDataCenter().toUpperCase(); @@ -56,10 +55,10 @@ public DefaultKeeperContainerUsedInfoAnalyzer(ConsoleConfig config, } @Override - public void updateKeeperContainerUsedInfo(Map modelMap) { + public void updateKeeperContainerUsedInfo(String dcName, Map modelMap) { modelMap.values().forEach(model -> model.setUpdateTime(DateTimeUtils.currentTimeAsString())); - currentDcAllKeeperContainerUsedInfoModelMap = modelMap; - if (currentDcAllKeeperContainerUsedInfoModelMap.isEmpty()) return; + dcKeeperContainerUsedInfoMap.put(dcName, modelMap); + if (modelMap.isEmpty()) return; logger.info("[analyzeKeeperContainerUsedInfo] start analyze allKeeperContainerUsedInfoModelsList"); executors.execute(new AbstractExceptionLogTask() { @Override @@ -68,13 +67,14 @@ protected void doRun() throws Exception { transaction.logTransactionSwallowException("keeperContainer.analyze", currentDc, new Task() { @Override public void go() throws Exception { - keeperContainerMigrationResult = migrationAnalyzer.getMigrationPlans(currentDcAllKeeperContainerUsedInfoModelMap); + List migrationPlans = migrationAnalyzer.getMigrationPlans(dcKeeperContainerUsedInfoMap.get(dcName)); + dcKeeperContainerMigrationResultMap.put(dcName, migrationPlans); } @Override public Map getData() { Map transactionData = new HashMap<>(); - transactionData.put("keeperContainerSize", currentDcAllKeeperContainerUsedInfoModelMap.size()); + transactionData.put("keeperContainerSize", dcKeeperContainerUsedInfoMap.size()); return transactionData; } }); @@ -98,12 +98,16 @@ public List getAllDcKeeperContainerUsedInfoModelsL @Override public List getCurrentDcReadyToMigrationKeeperContainers() { - return keeperContainerMigrationResult; + List result = new ArrayList<>(); + dcKeeperContainerMigrationResultMap.values().forEach(result::addAll); + return result; } @Override public List getCurrentDcKeeperContainerUsedInfoModelsList() { - return new ArrayList<>(currentDcAllKeeperContainerUsedInfoModelMap.values()); + List result = new ArrayList<>(); + dcKeeperContainerUsedInfoMap.values().forEach(map -> result.addAll(map.values())); + return result; } public List getAllDcResult(Supplier> localDcResultSupplier, AbstractGetAllDcCommand> command, List result) { @@ -139,4 +143,14 @@ void setExecutors(Executor executors){ this.executors = executors; } + @Override + public void isCrossDcLeader() { + //do nothing + } + + @Override + public void notCrossDcLeader() { + dcKeeperContainerUsedInfoMap.keySet().removeIf(dc -> !currentDc.equalsIgnoreCase(dc)); + dcKeeperContainerMigrationResultMap.keySet().removeIf(dc -> !currentDc.equalsIgnoreCase(dc)); + } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java index f0ca755df2..c85ffe3453 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java @@ -1,7 +1,7 @@ package com.ctrip.xpipe.redis.console.keeper.impl; +import com.ctrip.xpipe.api.cluster.CrossDcLeaderAware; import com.ctrip.xpipe.api.foundation.FoundationService; -import com.ctrip.xpipe.cache.TimeBoundCache; import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.metric.MetricData; @@ -45,7 +45,7 @@ import static com.ctrip.xpipe.cluster.ClusterType.ONE_WAY; @Component -public class KeeperContainerUsedInfoMsgCollector extends AbstractService implements ConsoleLeaderAware { +public class KeeperContainerUsedInfoMsgCollector extends AbstractService implements ConsoleLeaderAware, CrossDcLeaderAware { @Autowired private MetaCache metaCache; @@ -75,6 +75,10 @@ public class KeeperContainerUsedInfoMsgCollector extends AbstractService impleme private DynamicDelayPeriodTask keeperContainerUsedInfoMsgCollectorTask; + private ScheduledExecutorService extraSyncDCScheduled; + + private DynamicDelayPeriodTask extraSyncDCKeeperContainerUsedInfoMsgCollectorTask; + protected Map, Date>> redisMasterMsgCache = new ConcurrentHashMap<>(); private Map dcCheckerReportSituationMap = new HashMap<>(); @@ -98,8 +102,9 @@ public class KeeperContainerUsedInfoMsgCollector extends AbstractService impleme @PostConstruct public void init() { this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("KeeperContainerUsedInfoMsgCollector")); - this.keeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getAllCurrentDcRedisMsg", - this::getAllCurrentDcRedisMsgAndCalculate, () -> config.getKeeperCheckerIntervalMilli(), scheduled); + this.keeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getAllCurrentDcRedisMsg", () -> getDcRedisMsgAndCalculate(currentDc), () -> config.getKeeperCheckerIntervalMilli(), scheduled); + this.extraSyncDCScheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("ExtraSyncDCKeeperContainerUsedInfoMsgCollector")); + this.extraSyncDCKeeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getExtraSyncDCRedisMsg", () -> consoleConfig.getExtraSyncDC().forEach(this::getDcRedisMsgAndCalculate), () -> config.getKeeperCheckerIntervalMilli(), extraSyncDCScheduled); } public void saveMsg(int index, Map redisMsgMap) { @@ -107,7 +112,7 @@ public void saveMsg(int index, Map redisMsgMap) { redisMasterMsgCache.put(index, new Pair<>(redisMsgMap, new Date())); } - protected void getAllCurrentDcRedisMsgAndCalculate() { + protected void getDcRedisMsgAndCalculate(String dcName) { Map allRedisMasterMsg = new HashMap<>(); Map tmpDcCheckerReportSituationMap = new HashMap<>(); for (Map.Entry entry : consoleConfig.getConsoleDomains().entrySet()) { @@ -128,9 +133,9 @@ protected void getAllCurrentDcRedisMsgAndCalculate() { dcCheckerReportSituationMap = tmpDcCheckerReportSituationMap; if (allRedisMasterMsg.isEmpty()) return; - Map modelMap = generateKeeperContainerUsedInfoModels(allRedisMasterMsg); + Map modelMap = generateKeeperContainerUsedInfoModels(allRedisMasterMsg, dcName); reportKeeperData(modelMap); - keeperContainerUsedInfoAnalyzer.updateKeeperContainerUsedInfo(modelMap); + keeperContainerUsedInfoAnalyzer.updateKeeperContainerUsedInfo(dcName, modelMap); } public DcCheckerReportMsg getCurrentDcRedisMasterMsg() { @@ -159,7 +164,7 @@ private void removeExpireData() { } } - protected Map generateKeeperContainerUsedInfoModels(Map redisMsgMap) { + protected Map generateKeeperContainerUsedInfoModels(Map redisMsgMap, String selectedDcName) { Map result = new HashMap<>(); Map checkedCluster = new HashMap<>(); for (DcMeta dcMeta : metaCache.getXpipeMeta().getDcs().values()) { @@ -170,8 +175,8 @@ protected Map generateKeeperContainerUsedI continue; } checkedCluster.put(clusterMeta.getId(), null); - ClusterMeta currentDcMeta = getCurrentDcMeta(clusterMeta); - if (currentDcMeta == null) { + ClusterMeta selectedDcMeta = getDcMeta(clusterMeta, selectedDcName); + if (selectedDcMeta == null) { continue; } ClusterMeta masterDcMeta = dcMeta.getId().equals(clusterMeta.getActiveDc()) ? clusterMeta : metaCache.getXpipeMeta().findDc(clusterMeta.getActiveDc()).findCluster(clusterMeta.getId()); @@ -181,15 +186,15 @@ protected Map generateKeeperContainerUsedI RedisMsg redisMsg = redisMsgMap.get(new HostPort(redisMeta.getIp(), redisMeta.getPort())); if (redisMsg != null) { clusterAllRedisMsg.addRedisMsg(redisMsg); - ShardMeta currentDcShardMeta = currentDcMeta.findShard(shardMeta.getId()); - generateKeeperMsg(currentDc, clusterMeta.getId(), currentDcShardMeta.getId(), currentDcShardMeta.getKeepers(), result, redisMsg); + ShardMeta currentDcShardMeta = selectedDcMeta.findShard(shardMeta.getId()); + generateKeeperMsg(selectedDcName, clusterMeta.getId(), currentDcShardMeta.getId(), currentDcShardMeta.getKeepers(), result, redisMsg); } } } } -// reportClusterData(clusterMeta.getId(), clusterAllRedisMsg); + reportClusterData(clusterMeta.getId(), clusterAllRedisMsg, selectedDcName); } - if (currentDc.equals(dcMeta.getId())) { + if (selectedDcName.equals(dcMeta.getId())) { dcMeta.getKeeperContainers().forEach(keeperContainerMeta -> { if (!result.containsKey(keeperContainerMeta.getIp())) { result.put(keeperContainerMeta.getIp(), getKeeperBasicModel(keeperContainerMeta.getIp(), dcMeta.getId())); @@ -222,17 +227,17 @@ private void generateKeeperMsg(String dc, String cluster, String shard, List result = collector.generateKeeperContainerUsedInfoModels(redisMsgMap); + Map result = collector.generateKeeperContainerUsedInfoModels(redisMsgMap, "jq"); // Verify assertNotNull(result);