diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java index 916852b1a0..8384fd4dea 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/KeeperContainerUsedInfoAnalyzer.java @@ -10,7 +10,7 @@ public interface KeeperContainerUsedInfoAnalyzer { - void updateKeeperContainerUsedInfo(Map modelMap); + void updateKeeperContainerUsedInfo(String dcName, Map modelMap); List getAllDcReadyToMigrationKeeperContainers(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java index dd280983a4..c5a415e6cf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java @@ -25,6 +25,7 @@ import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -41,9 +42,9 @@ public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService impl @Resource(name = AbstractSpringConfigContext.GLOBAL_EXECUTOR) private Executor executors; - private Map currentDcAllKeeperContainerUsedInfoModelMap = new HashMap<>(); + private Map> currentDcAllKeeperContainerUsedInfoModelMap = new ConcurrentHashMap<>(); - private List keeperContainerMigrationResult = new ArrayList<>(); + private Map> keeperContainerMigrationResult = new ConcurrentHashMap<>(); private static final String currentDc = FoundationService.DEFAULT.getDataCenter().toUpperCase(); @@ -56,10 +57,10 @@ public DefaultKeeperContainerUsedInfoAnalyzer(ConsoleConfig config, } @Override - public void updateKeeperContainerUsedInfo(Map modelMap) { + public void updateKeeperContainerUsedInfo(String dcName, Map modelMap) { modelMap.values().forEach(model -> model.setUpdateTime(DateTimeUtils.currentTimeAsString())); - currentDcAllKeeperContainerUsedInfoModelMap = modelMap; - if (currentDcAllKeeperContainerUsedInfoModelMap.isEmpty()) return; + currentDcAllKeeperContainerUsedInfoModelMap.put(dcName, modelMap); + if (modelMap.isEmpty()) return; logger.info("[analyzeKeeperContainerUsedInfo] start analyze allKeeperContainerUsedInfoModelsList"); executors.execute(new AbstractExceptionLogTask() { @Override @@ -68,7 +69,8 @@ protected void doRun() throws Exception { transaction.logTransactionSwallowException("keeperContainer.analyze", currentDc, new Task() { @Override public void go() throws Exception { - keeperContainerMigrationResult = migrationAnalyzer.getMigrationPlans(currentDcAllKeeperContainerUsedInfoModelMap); + List migrationPlans = migrationAnalyzer.getMigrationPlans(currentDcAllKeeperContainerUsedInfoModelMap.get(dcName)); + keeperContainerMigrationResult.put(dcName, migrationPlans); } @Override @@ -98,12 +100,16 @@ public List getAllDcKeeperContainerUsedInfoModelsL @Override public List getCurrentDcReadyToMigrationKeeperContainers() { - return keeperContainerMigrationResult; + List result = new ArrayList<>(); + keeperContainerMigrationResult.values().forEach(result::addAll); + return result; } @Override public List getCurrentDcKeeperContainerUsedInfoModelsList() { - return new ArrayList<>(currentDcAllKeeperContainerUsedInfoModelMap.values()); + List result = new ArrayList<>(); + currentDcAllKeeperContainerUsedInfoModelMap.values().forEach(map -> result.addAll(map.values())); + return result; } public List getAllDcResult(Supplier> localDcResultSupplier, AbstractGetAllDcCommand> command, List result) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java index f0ca755df2..1f2a95b937 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/KeeperContainerUsedInfoMsgCollector.java @@ -1,7 +1,7 @@ package com.ctrip.xpipe.redis.console.keeper.impl; +import com.ctrip.xpipe.api.cluster.CrossDcLeaderAware; import com.ctrip.xpipe.api.foundation.FoundationService; -import com.ctrip.xpipe.cache.TimeBoundCache; import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.metric.MetricData; @@ -45,7 +45,7 @@ import static com.ctrip.xpipe.cluster.ClusterType.ONE_WAY; @Component -public class KeeperContainerUsedInfoMsgCollector extends AbstractService implements ConsoleLeaderAware { +public class KeeperContainerUsedInfoMsgCollector extends AbstractService implements ConsoleLeaderAware, CrossDcLeaderAware { @Autowired private MetaCache metaCache; @@ -75,6 +75,10 @@ public class KeeperContainerUsedInfoMsgCollector extends AbstractService impleme private DynamicDelayPeriodTask keeperContainerUsedInfoMsgCollectorTask; + private ScheduledExecutorService extraSyncDCScheduled; + + private DynamicDelayPeriodTask extraSyncDCKeeperContainerUsedInfoMsgCollectorTask; + protected Map, Date>> redisMasterMsgCache = new ConcurrentHashMap<>(); private Map dcCheckerReportSituationMap = new HashMap<>(); @@ -98,8 +102,9 @@ public class KeeperContainerUsedInfoMsgCollector extends AbstractService impleme @PostConstruct public void init() { this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("KeeperContainerUsedInfoMsgCollector")); - this.keeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getAllCurrentDcRedisMsg", - this::getAllCurrentDcRedisMsgAndCalculate, () -> config.getKeeperCheckerIntervalMilli(), scheduled); + this.keeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getAllCurrentDcRedisMsg", () -> getDcRedisMsgAndCalculate(currentDc), () -> config.getKeeperCheckerIntervalMilli(), scheduled); + this.extraSyncDCScheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("ExtraSyncDCKeeperContainerUsedInfoMsgCollector")); + this.extraSyncDCKeeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getExtraSyncDCRedisMsg", () -> consoleConfig.getExtraSyncDC().forEach(this::getDcRedisMsgAndCalculate), () -> config.getKeeperCheckerIntervalMilli(), extraSyncDCScheduled); } public void saveMsg(int index, Map redisMsgMap) { @@ -107,7 +112,7 @@ public void saveMsg(int index, Map redisMsgMap) { redisMasterMsgCache.put(index, new Pair<>(redisMsgMap, new Date())); } - protected void getAllCurrentDcRedisMsgAndCalculate() { + protected void getDcRedisMsgAndCalculate(String dcName) { Map allRedisMasterMsg = new HashMap<>(); Map tmpDcCheckerReportSituationMap = new HashMap<>(); for (Map.Entry entry : consoleConfig.getConsoleDomains().entrySet()) { @@ -128,9 +133,9 @@ protected void getAllCurrentDcRedisMsgAndCalculate() { dcCheckerReportSituationMap = tmpDcCheckerReportSituationMap; if (allRedisMasterMsg.isEmpty()) return; - Map modelMap = generateKeeperContainerUsedInfoModels(allRedisMasterMsg); + Map modelMap = generateKeeperContainerUsedInfoModels(allRedisMasterMsg, dcName); reportKeeperData(modelMap); - keeperContainerUsedInfoAnalyzer.updateKeeperContainerUsedInfo(modelMap); + keeperContainerUsedInfoAnalyzer.updateKeeperContainerUsedInfo(dcName, modelMap); } public DcCheckerReportMsg getCurrentDcRedisMasterMsg() { @@ -159,7 +164,7 @@ private void removeExpireData() { } } - protected Map generateKeeperContainerUsedInfoModels(Map redisMsgMap) { + protected Map generateKeeperContainerUsedInfoModels(Map redisMsgMap, String dcName) { Map result = new HashMap<>(); Map checkedCluster = new HashMap<>(); for (DcMeta dcMeta : metaCache.getXpipeMeta().getDcs().values()) { @@ -170,7 +175,7 @@ protected Map generateKeeperContainerUsedI continue; } checkedCluster.put(clusterMeta.getId(), null); - ClusterMeta currentDcMeta = getCurrentDcMeta(clusterMeta); + ClusterMeta currentDcMeta = getDcMeta(clusterMeta, dcName); if (currentDcMeta == null) { continue; } @@ -182,14 +187,16 @@ protected Map generateKeeperContainerUsedI if (redisMsg != null) { clusterAllRedisMsg.addRedisMsg(redisMsg); ShardMeta currentDcShardMeta = currentDcMeta.findShard(shardMeta.getId()); - generateKeeperMsg(currentDc, clusterMeta.getId(), currentDcShardMeta.getId(), currentDcShardMeta.getKeepers(), result, redisMsg); + generateKeeperMsg(dcName, clusterMeta.getId(), currentDcShardMeta.getId(), currentDcShardMeta.getKeepers(), result, redisMsg); } } } } -// reportClusterData(clusterMeta.getId(), clusterAllRedisMsg); + if (dcName.equals(dcMeta.getId())) { + reportClusterData(clusterMeta.getId(), clusterAllRedisMsg, dcName); + } } - if (currentDc.equals(dcMeta.getId())) { + if (dcName.equals(dcMeta.getId())) { dcMeta.getKeeperContainers().forEach(keeperContainerMeta -> { if (!result.containsKey(keeperContainerMeta.getIp())) { result.put(keeperContainerMeta.getIp(), getKeeperBasicModel(keeperContainerMeta.getIp(), dcMeta.getId())); @@ -222,17 +229,17 @@ private void generateKeeperMsg(String dc, String cluster, String shard, List result = collector.generateKeeperContainerUsedInfoModels(redisMsgMap); + Map result = collector.generateKeeperContainerUsedInfoModels(redisMsgMap, "jq"); // Verify assertNotNull(result);