Skip to content

Commit

Permalink
keeper msg collect for extra dc
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Mar 5, 2025
1 parent 0ae60e3 commit 8d979a8
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

public interface KeeperContainerUsedInfoAnalyzer {

void updateKeeperContainerUsedInfo(Map<String, KeeperContainerUsedInfoModel> modelMap);
void updateKeeperContainerUsedInfo(String dcName, Map<String, KeeperContainerUsedInfoModel> modelMap);

List<MigrationKeeperContainerDetailModel> getAllDcReadyToMigrationKeeperContainers();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.ctrip.xpipe.redis.console.keeper.impl;

import com.ctrip.xpipe.api.cluster.CrossDcLeaderAware;
import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.api.monitor.Task;
import com.ctrip.xpipe.api.monitor.TransactionMonitor;
import com.ctrip.xpipe.command.ParallelCommandChain;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.checker.model.RedisMsg;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.command.AbstractGetAllDcCommand;
import com.ctrip.xpipe.redis.console.keeper.command.KeeperContainerInfoGetCommand;
Expand All @@ -16,7 +15,6 @@
import com.ctrip.xpipe.redis.console.model.*;
import com.ctrip.xpipe.redis.core.service.AbstractService;
import com.ctrip.xpipe.spring.AbstractSpringConfigContext;
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.DateTimeUtils;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -25,12 +23,13 @@

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;


public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService implements KeeperContainerUsedInfoAnalyzer {
public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService implements KeeperContainerUsedInfoAnalyzer, CrossDcLeaderAware {

private static final Logger logger = LoggerFactory.getLogger(DefaultKeeperContainerUsedInfoAnalyzer.class);

Expand All @@ -41,9 +40,9 @@ public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService impl
@Resource(name = AbstractSpringConfigContext.GLOBAL_EXECUTOR)
private Executor executors;

private Map<String, KeeperContainerUsedInfoModel> currentDcAllKeeperContainerUsedInfoModelMap = new HashMap<>();
private Map<String, Map<String, KeeperContainerUsedInfoModel>> dcKeeperContainerUsedInfoMap = new ConcurrentHashMap<>();

private List<MigrationKeeperContainerDetailModel> keeperContainerMigrationResult = new ArrayList<>();
private Map<String, List<MigrationKeeperContainerDetailModel>> dcKeeperContainerMigrationResultMap = new ConcurrentHashMap<>();

private static final String currentDc = FoundationService.DEFAULT.getDataCenter().toUpperCase();

Expand All @@ -56,10 +55,10 @@ public DefaultKeeperContainerUsedInfoAnalyzer(ConsoleConfig config,
}

@Override
public void updateKeeperContainerUsedInfo(Map<String, KeeperContainerUsedInfoModel> modelMap) {
public void updateKeeperContainerUsedInfo(String dcName, Map<String, KeeperContainerUsedInfoModel> modelMap) {
modelMap.values().forEach(model -> model.setUpdateTime(DateTimeUtils.currentTimeAsString()));
currentDcAllKeeperContainerUsedInfoModelMap = modelMap;
if (currentDcAllKeeperContainerUsedInfoModelMap.isEmpty()) return;
dcKeeperContainerUsedInfoMap.put(dcName, modelMap);
if (modelMap.isEmpty()) return;
logger.info("[analyzeKeeperContainerUsedInfo] start analyze allKeeperContainerUsedInfoModelsList");
executors.execute(new AbstractExceptionLogTask() {
@Override
Expand All @@ -68,13 +67,14 @@ protected void doRun() throws Exception {
transaction.logTransactionSwallowException("keeperContainer.analyze", currentDc, new Task() {
@Override
public void go() throws Exception {
keeperContainerMigrationResult = migrationAnalyzer.getMigrationPlans(currentDcAllKeeperContainerUsedInfoModelMap);
List<MigrationKeeperContainerDetailModel> migrationPlans = migrationAnalyzer.getMigrationPlans(dcKeeperContainerUsedInfoMap.get(dcName));
dcKeeperContainerMigrationResultMap.put(dcName, migrationPlans);
}

@Override
public Map<String, Object> getData() {
Map<String, Object> transactionData = new HashMap<>();
transactionData.put("keeperContainerSize", currentDcAllKeeperContainerUsedInfoModelMap.size());
transactionData.put("keeperContainerSize", dcKeeperContainerUsedInfoMap.size());
return transactionData;
}
});
Expand All @@ -98,12 +98,16 @@ public List<KeeperContainerUsedInfoModel> getAllDcKeeperContainerUsedInfoModelsL

@Override
public List<MigrationKeeperContainerDetailModel> getCurrentDcReadyToMigrationKeeperContainers() {
return keeperContainerMigrationResult;
List<MigrationKeeperContainerDetailModel> result = new ArrayList<>();
dcKeeperContainerMigrationResultMap.values().forEach(result::addAll);
return result;
}

@Override
public List<KeeperContainerUsedInfoModel> getCurrentDcKeeperContainerUsedInfoModelsList() {
return new ArrayList<>(currentDcAllKeeperContainerUsedInfoModelMap.values());
List<KeeperContainerUsedInfoModel> result = new ArrayList<>();
dcKeeperContainerUsedInfoMap.values().forEach(map -> result.addAll(map.values()));
return result;
}

public <T> List<T> getAllDcResult(Supplier<List<T>> localDcResultSupplier, AbstractGetAllDcCommand<List<T>> command, List<T> result) {
Expand Down Expand Up @@ -139,4 +143,14 @@ void setExecutors(Executor executors){
this.executors = executors;
}

@Override
public void isCrossDcLeader() {
//do nothing
}

@Override
public void notCrossDcLeader() {
dcKeeperContainerUsedInfoMap.keySet().removeIf(dc -> !currentDc.equalsIgnoreCase(dc));
dcKeeperContainerMigrationResultMap.keySet().removeIf(dc -> !currentDc.equalsIgnoreCase(dc));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.ctrip.xpipe.redis.console.keeper.impl;

import com.ctrip.xpipe.api.cluster.CrossDcLeaderAware;
import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.metric.MetricData;
Expand Down Expand Up @@ -45,7 +45,7 @@
import static com.ctrip.xpipe.cluster.ClusterType.ONE_WAY;

@Component
public class KeeperContainerUsedInfoMsgCollector extends AbstractService implements ConsoleLeaderAware {
public class KeeperContainerUsedInfoMsgCollector extends AbstractService implements ConsoleLeaderAware, CrossDcLeaderAware {

@Autowired
private MetaCache metaCache;
Expand Down Expand Up @@ -75,6 +75,10 @@ public class KeeperContainerUsedInfoMsgCollector extends AbstractService impleme

private DynamicDelayPeriodTask keeperContainerUsedInfoMsgCollectorTask;

private ScheduledExecutorService extraSyncDCScheduled;

private DynamicDelayPeriodTask extraSyncDCKeeperContainerUsedInfoMsgCollectorTask;

protected Map<Integer, Pair<Map<HostPort, RedisMsg>, Date>> redisMasterMsgCache = new ConcurrentHashMap<>();

private Map<String, CheckerReportSituation> dcCheckerReportSituationMap = new HashMap<>();
Expand All @@ -98,16 +102,17 @@ public class KeeperContainerUsedInfoMsgCollector extends AbstractService impleme
@PostConstruct
public void init() {
this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("KeeperContainerUsedInfoMsgCollector"));
this.keeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getAllCurrentDcRedisMsg",
this::getAllCurrentDcRedisMsgAndCalculate, () -> config.getKeeperCheckerIntervalMilli(), scheduled);
this.keeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getAllCurrentDcRedisMsg", () -> getDcRedisMsgAndCalculate(currentDc), () -> config.getKeeperCheckerIntervalMilli(), scheduled);
this.extraSyncDCScheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("ExtraSyncDCKeeperContainerUsedInfoMsgCollector"));
this.extraSyncDCKeeperContainerUsedInfoMsgCollectorTask = new DynamicDelayPeriodTask("getExtraSyncDCRedisMsg", () -> consoleConfig.getExtraSyncDC().forEach(this::getDcRedisMsgAndCalculate), () -> config.getKeeperCheckerIntervalMilli(), extraSyncDCScheduled);
}

public void saveMsg(int index, Map<HostPort, RedisMsg> redisMsgMap) {
reportCheckerReport(index);
redisMasterMsgCache.put(index, new Pair<>(redisMsgMap, new Date()));
}

protected void getAllCurrentDcRedisMsgAndCalculate() {
protected void getDcRedisMsgAndCalculate(String dcName) {
Map<HostPort, RedisMsg> allRedisMasterMsg = new HashMap<>();
Map<String, CheckerReportSituation> tmpDcCheckerReportSituationMap = new HashMap<>();
for (Map.Entry<String, String> entry : consoleConfig.getConsoleDomains().entrySet()) {
Expand All @@ -128,9 +133,9 @@ protected void getAllCurrentDcRedisMsgAndCalculate() {
dcCheckerReportSituationMap = tmpDcCheckerReportSituationMap;
if (allRedisMasterMsg.isEmpty()) return;

Map<String, KeeperContainerUsedInfoModel> modelMap = generateKeeperContainerUsedInfoModels(allRedisMasterMsg);
Map<String, KeeperContainerUsedInfoModel> modelMap = generateKeeperContainerUsedInfoModels(allRedisMasterMsg, dcName);
reportKeeperData(modelMap);
keeperContainerUsedInfoAnalyzer.updateKeeperContainerUsedInfo(modelMap);
keeperContainerUsedInfoAnalyzer.updateKeeperContainerUsedInfo(dcName, modelMap);
}

public DcCheckerReportMsg getCurrentDcRedisMasterMsg() {
Expand Down Expand Up @@ -159,7 +164,7 @@ private void removeExpireData() {
}
}

protected Map<String, KeeperContainerUsedInfoModel> generateKeeperContainerUsedInfoModels(Map<HostPort, RedisMsg> redisMsgMap) {
protected Map<String, KeeperContainerUsedInfoModel> generateKeeperContainerUsedInfoModels(Map<HostPort, RedisMsg> redisMsgMap, String selectedDcName) {
Map<String, KeeperContainerUsedInfoModel> result = new HashMap<>();
Map<String, Object> checkedCluster = new HashMap<>();
for (DcMeta dcMeta : metaCache.getXpipeMeta().getDcs().values()) {
Expand All @@ -170,8 +175,8 @@ protected Map<String, KeeperContainerUsedInfoModel> generateKeeperContainerUsedI
continue;
}
checkedCluster.put(clusterMeta.getId(), null);
ClusterMeta currentDcMeta = getCurrentDcMeta(clusterMeta);
if (currentDcMeta == null) {
ClusterMeta selectedDcMeta = getDcMeta(clusterMeta, selectedDcName);
if (selectedDcMeta == null) {
continue;
}
ClusterMeta masterDcMeta = dcMeta.getId().equals(clusterMeta.getActiveDc()) ? clusterMeta : metaCache.getXpipeMeta().findDc(clusterMeta.getActiveDc()).findCluster(clusterMeta.getId());
Expand All @@ -181,15 +186,15 @@ protected Map<String, KeeperContainerUsedInfoModel> generateKeeperContainerUsedI
RedisMsg redisMsg = redisMsgMap.get(new HostPort(redisMeta.getIp(), redisMeta.getPort()));
if (redisMsg != null) {
clusterAllRedisMsg.addRedisMsg(redisMsg);
ShardMeta currentDcShardMeta = currentDcMeta.findShard(shardMeta.getId());
generateKeeperMsg(currentDc, clusterMeta.getId(), currentDcShardMeta.getId(), currentDcShardMeta.getKeepers(), result, redisMsg);
ShardMeta currentDcShardMeta = selectedDcMeta.findShard(shardMeta.getId());
generateKeeperMsg(selectedDcName, clusterMeta.getId(), currentDcShardMeta.getId(), currentDcShardMeta.getKeepers(), result, redisMsg);
}
}
}
}
// reportClusterData(clusterMeta.getId(), clusterAllRedisMsg);
reportClusterData(clusterMeta.getId(), clusterAllRedisMsg, selectedDcName);
}
if (currentDc.equals(dcMeta.getId())) {
if (selectedDcName.equals(dcMeta.getId())) {
dcMeta.getKeeperContainers().forEach(keeperContainerMeta -> {
if (!result.containsKey(keeperContainerMeta.getIp())) {
result.put(keeperContainerMeta.getIp(), getKeeperBasicModel(keeperContainerMeta.getIp(), dcMeta.getId()));
Expand Down Expand Up @@ -222,17 +227,17 @@ private void generateKeeperMsg(String dc, String cluster, String shard, List<Kee
}
}

protected ClusterMeta getCurrentDcMeta(ClusterMeta clusterMeta) {
if (Objects.equals(clusterMeta.getActiveDc(), currentDc)) {
return metaCache.getXpipeMeta().findDc(currentDc).findCluster(clusterMeta.getId());
protected ClusterMeta getDcMeta(ClusterMeta clusterMeta, String dcName) {
if (Objects.equals(clusterMeta.getActiveDc(), dcName)) {
return metaCache.getXpipeMeta().findDc(dcName).findCluster(clusterMeta.getId());
}
if (clusterMeta.getBackupDcs() == null || clusterMeta.getBackupDcs().isEmpty()) {
return null;
}
String[] split = clusterMeta.getBackupDcs().split(",");
for (String dc : split) {
if (Objects.equals(dc, currentDc)) {
return metaCache.getXpipeMeta().findDc(currentDc).findCluster(clusterMeta.getId());
if (Objects.equals(dc, dcName)) {
return metaCache.getXpipeMeta().findDc(dcName).findCluster(clusterMeta.getId());
}
}
return null;
Expand Down Expand Up @@ -272,13 +277,13 @@ private KeeperContainerUsedInfoModel getKeeperBasicModel(String ip, String dcNam
return model;
}

public void reportClusterData(String clusterName, RedisMsg clusterAllRedisMsg) {
reportClusterData(clusterName, CLUSTER_DATA_TYPE, clusterAllRedisMsg.getUsedMemory());
reportClusterData(clusterName, CLUSTER_TRAFFIC_TYPE, clusterAllRedisMsg.getInPutFlow());
public void reportClusterData(String clusterName, RedisMsg clusterAllRedisMsg, String dcName) {
reportClusterData(clusterName, CLUSTER_DATA_TYPE, clusterAllRedisMsg.getUsedMemory(), dcName);
reportClusterData(clusterName, CLUSTER_TRAFFIC_TYPE, clusterAllRedisMsg.getInPutFlow(), dcName);
}

public void reportClusterData(String clusterName, String type, long value) {
MetricData data = new MetricData(type, currentDc, clusterName, null);
public void reportClusterData(String clusterName, String type, long value, String dcName) {
MetricData data = new MetricData(type, dcName, clusterName, null);
data.setValue(value);
data.setTimestampMilli(System.currentTimeMillis());
try {
Expand Down Expand Up @@ -346,6 +351,8 @@ public void destroy() {
try {
keeperContainerUsedInfoMsgCollectorTask.stop();
this.scheduled.shutdownNow();
extraSyncDCKeeperContainerUsedInfoMsgCollectorTask.stop();
this.extraSyncDCScheduled.shutdownNow();
} catch (Throwable th) {
logger.info("[preDestroy] keeperContainerUsedInfoMsgCollectorTask destroy fail", th);
}
Expand All @@ -355,7 +362,9 @@ public void destroy() {
public void isleader() {
try {
logger.debug("[isleader] become leader");
keeperContainerUsedInfoMsgCollectorTask.start();
if (!consoleConfig.getExtraSyncDC().contains(currentDc.toUpperCase())) {
keeperContainerUsedInfoMsgCollectorTask.start();
}
} catch (Throwable th) {
logger.info("[isleader] keeperContainerUsedInfoMsgCollectorTask start fail", th);
}
Expand All @@ -370,4 +379,24 @@ public void notLeader() {
logger.info("[notLeader] keeperContainerUsedInfoMsgCollectorTask stop fail", th);
}
}

@Override
public void isCrossDcLeader() {
try {
logger.debug("[isCrossDcLeader] become cross dc leader");
extraSyncDCKeeperContainerUsedInfoMsgCollectorTask.start();
} catch (Throwable th) {
logger.info("[isCrossDcLeader] extraSyncDCKeeperContainerUsedInfoMsgCollectorTask start fail", th);
}
}

@Override
public void notCrossDcLeader() {
try {
logger.debug("[isCrossDcLeader] loss cross dc leader");
extraSyncDCKeeperContainerUsedInfoMsgCollectorTask.stop();
} catch (Throwable th) {
logger.info("[isCrossDcLeader] extraSyncDCKeeperContainerUsedInfoMsgCollectorTask stop fail", th);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testGenerateKeeperContainerUsedInfoModels() throws ResourceNotFoundE
DcMeta dc2 = Mockito.mock(DcMeta.class);

// Test
Map<String, KeeperContainerUsedInfoModel> result = collector.generateKeeperContainerUsedInfoModels(redisMsgMap);
Map<String, KeeperContainerUsedInfoModel> result = collector.generateKeeperContainerUsedInfoModels(redisMsgMap, "jq");

// Verify
assertNotNull(result);
Expand Down

0 comments on commit 8d979a8

Please sign in to comment.