From 09ead235deff640e6b841bef70b5507f668559f2 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Thu, 14 Mar 2024 21:22:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=8C=80keeper=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=B7=A1=E6=A3=80=E9=83=A8=E5=88=86bug=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/CheckerHealthController.java | 5 +- .../keeper/infoStats/KeeperFlowCollector.java | 30 ++- .../healthcheck/session/RedisSession.java | 6 + .../impl/KeeperContainerInfoReporter.java | 20 +- ...dActive.java => DcClusterShardKeeper.java} | 22 +- .../model/KeeperContainerUsedInfoModel.java | 18 +- .../actions/delay/DelayActionTest.java | 9 + .../infoStats/KeeperFlowCollectorTest.java | 203 +++++++++++++++++- .../impl/KeeperUsedInfoReporterTest.java | 32 +-- .../api/KeeperContainerController.java | 6 +- .../console/keeper/entity/IPPairData.java | 11 +- .../keeper/handler/AbstractHandler.java | 10 +- .../handler/KeeperContainerFilterChain.java | 27 ++- .../handler/KeeperDataOverloadHandler.java | 7 +- .../handler/KeeperPairOverloadHandler.java | 13 +- ...efaultKeeperContainerUsedInfoAnalyzer.java | 62 +++--- ...eperContainerUsedInfoAnalyzerContext.java} | 22 +- ...eperContainerUsedInfoAnalyzerContext.java} | 8 +- .../model/impl/ShardModelServiceImpl.java | 9 +- .../ctrip/xpipe/redis/console/AllTests.java | 4 +- .../DefaultKeeperUsedInfoAnalyzerTest.java | 10 +- .../console/service/ConfigServiceTest.java | 7 + .../service/ShardModelServiceTest.java | 38 +++- ...ltKeeperContainerMigrationServiceTest.java | 32 ++- .../redis/core/server/FakeRedisServer.java | 6 + ...disKeeperServerConnectToFakeRedisTest.java | 53 +++-- 26 files changed, 465 insertions(+), 205 deletions(-) rename redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/{DcClusterShardActive.java => DcClusterShardKeeper.java} (74%) rename redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/{DefaultKeeperContainerUsedInfoAnalyzerUtil.java => DefaultKeeperContainerUsedInfoAnalyzerContext.java} (84%) rename redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/{KeeperContainerUsedInfoAnalyzerUtil.java => KeeperContainerUsedInfoAnalyzerContext.java} (77%) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index f3b15d07e3..2e45938621 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -12,9 +12,8 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction; import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder; -import com.ctrip.xpipe.redis.checker.impl.KeeperContainerInfoReporter; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -115,7 +114,7 @@ public Map getAllHealthStatusDesc() { } @GetMapping("/health/keeper/status/all") - public ConcurrentMap> getAllKeeperFlows() { + public ConcurrentMap> getAllKeeperFlows() { return keeperFlowCollector.getHostPort2InputFlow(); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java index 8ea2b9618a..5123f952ad 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java @@ -3,7 +3,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; import com.ctrip.xpipe.redis.checker.healthcheck.KeeperInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.KeeperSupport; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import com.ctrip.xpipe.utils.MapUtils; import org.slf4j.Logger; @@ -19,7 +19,7 @@ public class KeeperFlowCollector implements KeeperInfoStatsActionListener, Keepe private final Logger logger = LoggerFactory.getLogger(getClass()); - protected ConcurrentMap> hostPort2InputFlow = new ConcurrentHashMap<>(); + protected ConcurrentMap> hostPort2InputFlow = new ConcurrentHashMap<>(); @Override public void onAction(KeeperInfoStatsActionContext context) { @@ -27,8 +27,9 @@ public void onAction(KeeperInfoStatsActionContext context) { InfoResultExtractor extractor = context.getResult(); KeeperInstanceInfo info = context.instance().getCheckInfo(); long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue(); - Map keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new); - keeperContainerResult.put(new DcClusterShardActive(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow); + deleteKeeper(info); + Map keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new); + keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow); } catch (Throwable throwable) { logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable); } @@ -38,22 +39,19 @@ public void onAction(KeeperInfoStatsActionContext context) { @Override public void stopWatch(HealthCheckAction action) { KeeperInstanceInfo instanceInfo = (KeeperInstanceInfo) action.getActionInstance().getCheckInfo(); - logger.info("stopWatch: {}", new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), instanceInfo.isActive(), instanceInfo.getHostPort().getPort())); - logger.info("stop before: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) - .get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort()))); - logger.info("stop before: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) - .get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort()))); + logger.info("stopWatch: {}", new DcClusterShardKeeper(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), instanceInfo.isActive(), instanceInfo.getHostPort().getPort())); + deleteKeeper(instanceInfo); + } + + private void deleteKeeper(KeeperInstanceInfo instanceInfo) { + if (hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) == null) return; hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) - .remove(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort())); + .remove(new DcClusterShardKeeper(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort())); hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) - .remove(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort())); - logger.info("stop after: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) - .get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort()))); - logger.info("stop after: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) - .get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort()))); + .remove(new DcClusterShardKeeper(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort())); } - public ConcurrentMap> getHostPort2InputFlow() { + public ConcurrentMap> getHostPort2InputFlow() { return hostPort2InputFlow; } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java index 676151f9fe..9efc2b36c9 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java @@ -13,6 +13,7 @@ import com.ctrip.xpipe.redis.core.protocal.cmd.pubsub.*; import com.ctrip.xpipe.redis.core.protocal.pojo.RedisInfo; import com.ctrip.xpipe.redis.core.protocal.pojo.Role; +import com.ctrip.xpipe.utils.VisibleForTesting; import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -359,6 +360,11 @@ public String toString() { return ""; } + @VisibleForTesting + public void setEndpoint(Endpoint endpoint) { + this.endpoint = endpoint; + } + public interface RollCallback { void role(String role, Role detail); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/KeeperContainerInfoReporter.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/KeeperContainerInfoReporter.java index 3d6662edc3..db3ec8b53e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/KeeperContainerInfoReporter.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/KeeperContainerInfoReporter.java @@ -8,11 +8,10 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; import com.ctrip.xpipe.redis.core.entity.DcMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperContainerMeta; import com.ctrip.xpipe.redis.core.entity.KeeperDiskInfo; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.utils.VisibleForTesting; @@ -30,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; public class KeeperContainerInfoReporter implements GroupCheckerLeaderAware { @@ -107,8 +105,8 @@ public void notLeader() { public void reportKeeperContainerInfo() { try { logger.debug("[reportKeeperContainerInfo] start"); - Map> collectedInfos = keeperFlowCollector.getHostPort2InputFlow(); - Map> hostPort2InputFlow = new HashMap<>(); + Map> collectedInfos = keeperFlowCollector.getHostPort2InputFlow(); + Map> hostPort2InputFlow = new HashMap<>(); for (DcMeta dcMeta : metaCache.getXpipeMeta().getDcs().values()) { if (CURRENT_IDC.equalsIgnoreCase(dcMeta.getId())) { dcMeta.getKeeperContainers().forEach(keeperContainerMeta -> { @@ -131,24 +129,24 @@ public void reportKeeperContainerInfo() { long totalRedisUsedMemory = 0; int activeKeeperCount = 0; int totalKeeperCount = 0; - Map detailInfo = new HashMap<>(); - for (Map.Entry entry : inputFlowMap.entrySet()) { + Map detailInfo = new HashMap<>(); + for (Map.Entry entry : inputFlowMap.entrySet()) { totalKeeperCount++; totalInputFlow += entry.getValue(); - DcClusterShardActive dcClusterShardActive = entry.getKey(); + DcClusterShardKeeper dcClusterShardKeeper = entry.getKey(); long inputFlow = entry.getValue(); - Long redisUsedMemory = dcClusterShardUsedMemory.get(new DcClusterShard().setDcId(dcClusterShardActive.getDcId()).setClusterId(dcClusterShardActive.getClusterId()).setShardId(dcClusterShardActive.getShardId())); + Long redisUsedMemory = dcClusterShardUsedMemory.get(new DcClusterShard().setDcId(dcClusterShardKeeper.getDcId()).setClusterId(dcClusterShardKeeper.getClusterId()).setShardId(dcClusterShardKeeper.getShardId())); if (redisUsedMemory == null) { logger.warn("[reportKeeperContainerInfo] redisUsedMemory is null, dcClusterShard: {}", entry.getKey()); redisUsedMemory = 0L; } totalRedisUsedMemory += redisUsedMemory; - if (dcClusterShardActive.isActive()) { + if (dcClusterShardKeeper.isActive()) { activeRedisUsedMemory += redisUsedMemory; activeInputFlow += inputFlow; activeKeeperCount++; } - detailInfo.put(dcClusterShardActive, new KeeperUsedInfo(redisUsedMemory, inputFlow, keeperIp)); + detailInfo.put(dcClusterShardKeeper, new KeeperUsedInfo(redisUsedMemory, inputFlow, keeperIp)); } try { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/DcClusterShardActive.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/DcClusterShardKeeper.java similarity index 74% rename from redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/DcClusterShardActive.java rename to redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/DcClusterShardKeeper.java index 9eb6433fd8..d4804bebc7 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/DcClusterShardActive.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/DcClusterShardKeeper.java @@ -1,35 +1,35 @@ package com.ctrip.xpipe.redis.checker.model; -import com.ctrip.xpipe.endpoint.HostPort; - import java.io.Serializable; import java.util.Objects; -public class DcClusterShardActive extends DcClusterShard implements Serializable { +public class DcClusterShardKeeper extends DcClusterShard implements Serializable { private boolean active; + private String ip; + private int port; - public DcClusterShardActive(){} + public DcClusterShardKeeper(){} - public DcClusterShardActive(String dcId, String clusterId, String shardId, boolean active, int port) { + public DcClusterShardKeeper(String dcId, String clusterId, String shardId, boolean active, int port) { super(dcId, clusterId, shardId); this.active = active; this.port = port; } - public DcClusterShardActive(String dcId, String clusterId, String shardId, boolean active) { + public DcClusterShardKeeper(String dcId, String clusterId, String shardId, boolean active) { super(dcId, clusterId, shardId); this.active = active; } - public DcClusterShardActive(DcClusterShard dcClusterShard, boolean active) { + public DcClusterShardKeeper(DcClusterShard dcClusterShard, boolean active) { super(dcClusterShard.getDcId(), dcClusterShard.getClusterId(), dcClusterShard.getShardId()); this.active = active; } - public DcClusterShardActive(String info) { + public DcClusterShardKeeper(String info) { String[] split = info.split(SPLITTER); if (split.length >= 5) { this.dcId = split[0]; @@ -59,15 +59,15 @@ public void setPort(int port) { @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof DcClusterShardActive)) return false; + if (!(o instanceof DcClusterShardKeeper)) return false; if (!super.equals(o)) return false; - DcClusterShardActive that = (DcClusterShardActive) o; + DcClusterShardKeeper that = (DcClusterShardKeeper) o; return isActive() == that.isActive(); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), isActive()); + return Objects.hash(super.hashCode(), port); } @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java index 6112b42638..efffe46e06 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java @@ -28,7 +28,7 @@ public class KeeperContainerUsedInfoModel { private int totalKeeperCount; - private Map detailInfo; + private Map detailInfo; private boolean keeperContainerActive; @@ -50,7 +50,7 @@ public KeeperContainerUsedInfoModel(String keeperIp, String dcName, long activeI this.activeRedisUsedMemory = activeRedisUsedMemory; } - public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entry dcClusterShard) { + public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entry dcClusterShard) { this.keeperIp = model.getKeeperIp(); this.dcName = model.getDcName(); this.org = model.getOrg(); @@ -95,10 +95,10 @@ public static KeeperContainerUsedInfoModel cloneKeeperContainerUsedInfoModel(Kee return newModel; } - private static Map getKeeperUsedInfoMap(KeeperContainerUsedInfoModel model) { - Map clonedDetailInfo = new HashMap<>(); - for (Map.Entry entry : model.getDetailInfo().entrySet()) { - DcClusterShardActive key = new DcClusterShardActive(entry.getKey().getDcId(), entry.getKey().getClusterId(), entry.getKey().getShardId(), entry.getKey().isActive(), entry.getKey().getPort()); + private static Map getKeeperUsedInfoMap(KeeperContainerUsedInfoModel model) { + Map clonedDetailInfo = new HashMap<>(); + for (Map.Entry entry : model.getDetailInfo().entrySet()) { + DcClusterShardKeeper key = new DcClusterShardKeeper(entry.getKey().getDcId(), entry.getKey().getClusterId(), entry.getKey().getShardId(), entry.getKey().isActive(), entry.getKey().getPort()); KeeperUsedInfo value = new KeeperUsedInfo(entry.getValue().getPeerData(), entry.getValue().getInputFlow(), entry.getValue().keeperIP); clonedDetailInfo.put(key, value); } @@ -169,11 +169,11 @@ public KeeperContainerUsedInfoModel setTotalRedisUsedMemory(long totalRedisUsedM return this; } - public Map getDetailInfo() { + public Map getDetailInfo() { return detailInfo; } - public KeeperContainerUsedInfoModel setDetailInfo(Map detailInfo) { + public KeeperContainerUsedInfoModel setDetailInfo(Map detailInfo) { this.detailInfo = detailInfo; return this; } @@ -359,7 +359,7 @@ public String toString() { @VisibleForTesting public KeeperContainerUsedInfoModel createKeeper(String clusterId, String shardId, boolean active, long inputFlow, long redisUsedMemory){ if (this.detailInfo == null) this.detailInfo = new HashMap<>(); - detailInfo.put(new DcClusterShardActive(this.dcName, clusterId, shardId, active), new KeeperUsedInfo(redisUsedMemory, inputFlow, this.keeperIp)); + detailInfo.put(new DcClusterShardKeeper(this.dcName, clusterId, shardId, active), new KeeperUsedInfo(redisUsedMemory, inputFlow, this.keeperIp)); this.setDetailInfo(detailInfo); return this; } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/DelayActionTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/DelayActionTest.java index fdd4c1f4e7..72a0d0866f 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/DelayActionTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/delay/DelayActionTest.java @@ -2,6 +2,7 @@ import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.redis.checker.healthcheck.*; import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService; import com.ctrip.xpipe.redis.checker.healthcheck.config.HealthCheckConfig; @@ -81,6 +82,14 @@ public void afterDelayActionTest() { Assert.assertFalse(instanceNull.get()); } + @Test + public void testToString() { + RedisSession session1 = new RedisSession(); + Assert.assertEquals(session1.toString(), ""); + session1.setEndpoint(new DefaultEndPoint("10.10.10.10", 6380)); + Assert.assertEquals(session1.toString(), "redis://10.10.10.10:6380"); + } + @Test public void testRedisUp() throws Exception { delayHealth.set(false); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollectorTest.java index ed610c296e..5087e2911a 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollectorTest.java @@ -1,12 +1,21 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats; +import com.ctrip.xpipe.api.lifecycle.LifecycleState; +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.endpoint.ClusterShardHostPort; +import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.AbstractCheckerTest; -import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.*; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.RedisCheckRule; +import com.ctrip.xpipe.redis.checker.healthcheck.config.HealthCheckConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.impl.AbstractHealthCheckInstance; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.MockitoAnnotations; +import java.util.List; + /** * Created by yu * 2023/8/29 @@ -44,9 +53,201 @@ public void before() throws Exception { @Test public void testParseResult() { + listener.onAction(context); listener.onAction(context); Assert.assertTrue(listener.worksfor(context)); Assert.assertEquals(1, listener.getHostPort2InputFlow().size()); + listener.stopWatch(new HealthCheckAction() { + @Override + public void addListener(HealthCheckActionListener listener) { + + } + + @Override + public void removeListener(HealthCheckActionListener listener) { + + } + + @Override + public void addListeners(List list) { + + } + + @Override + public void addController(HealthCheckActionController controller) { + + } + + @Override + public void addControllers(List list) { + + } + + @Override + public void removeController(HealthCheckActionController controller) { + + } + + @Override + public HealthCheckInstance getActionInstance() { + return new HealthCheckInstance() { + @Override + public CheckInfo getCheckInfo() { + return new KeeperInstanceInfo() { + @Override + public ClusterShardHostPort getClusterShardHostport() { + return null; + } + + @Override + public String getShardId() { + return "shard"; + } + + @Override + public String getDcId() { + return "dc"; + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public HostPort getHostPort() { + return new HostPort("10.10.10.10", 11); + } + + @Override + public String getClusterId() { + return "cluster"; + } + + @Override + public ClusterType getClusterType() { + return null; + } + + @Override + public String getActiveDc() { + return null; + } + + @Override + public void setActiveDc(String activeDc) { + + } + + @Override + public List getRedisCheckRules() { + return null; + } + + @Override + public void setAzGroupType(String type) { + + } + + @Override + public String getAzGroupType() { + return null; + } + + @Override + public void setAsymmetricCluster(boolean isHeteroCluster) { + + } + + @Override + public boolean isAsymmetricCluster() { + return false; + } + }; + } + + @Override + public HealthCheckConfig getHealthCheckConfig() { + return null; + } + + @Override + public void register(HealthCheckAction action) { + + } + + @Override + public void unregister(HealthCheckAction action) { + + } + + @Override + public List getHealthCheckActions() { + return null; + } + + @Override + public void dispose() throws Exception { + + } + + @Override + public void initialize() throws Exception { + + } + + @Override + public LifecycleState getLifecycleState() { + return null; + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public int getOrder() { + return 0; + } + }; + } + + @Override + public void dispose() throws Exception { + + } + + @Override + public void initialize() throws Exception { + + } + + @Override + public LifecycleState getLifecycleState() { + return null; + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public int getOrder() { + return 0; + } + }); } @Test diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/KeeperUsedInfoReporterTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/KeeperUsedInfoReporterTest.java index 46ef4b2e08..e2cb880552 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/KeeperUsedInfoReporterTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/KeeperUsedInfoReporterTest.java @@ -7,7 +7,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.core.entity.*; import com.ctrip.xpipe.redis.core.meta.MetaCache; @@ -70,27 +70,27 @@ public void befor() { keeperContainerMetas.add(new KeeperContainerMeta().setIp("127.0.0.3")); dcMeta.getKeeperContainers().addAll(keeperContainerMetas); Mockito.when(metaCache.getXpipeMeta()).thenReturn(xpipeMeta); - DcClusterShardActive dcClusterShard1 = new DcClusterShardActive("jq", "cluster1", "shard1", false); - DcClusterShardActive dcClusterShard2 = new DcClusterShardActive("jq", "cluster1", "shard2", true); - DcClusterShardActive dcClusterShard3 = new DcClusterShardActive("jq", "cluster2", "shard1", true); - DcClusterShardActive dcClusterShard4 = new DcClusterShardActive("jq", "cluster2", "shard2", true); - DcClusterShardActive dcClusterShard5 = new DcClusterShardActive("jq", "cluster3", "shard1", true); - DcClusterShardActive dcClusterShard6 = new DcClusterShardActive("jq", "cluster3", "shard2", true); - DcClusterShardActive dcClusterShard7 = new DcClusterShardActive("jq", "cluster3", "shard3", true); - - ConcurrentMap> keeperFlowMap = Maps.newConcurrentMap(); - Map map1 = new HashMap<>(); + DcClusterShardKeeper dcClusterShard1 = new DcClusterShardKeeper("jq", "cluster1", "shard1", false); + DcClusterShardKeeper dcClusterShard2 = new DcClusterShardKeeper("jq", "cluster1", "shard2", true); + DcClusterShardKeeper dcClusterShard3 = new DcClusterShardKeeper("jq", "cluster2", "shard1", true); + DcClusterShardKeeper dcClusterShard4 = new DcClusterShardKeeper("jq", "cluster2", "shard2", true); + DcClusterShardKeeper dcClusterShard5 = new DcClusterShardKeeper("jq", "cluster3", "shard1", true); + DcClusterShardKeeper dcClusterShard6 = new DcClusterShardKeeper("jq", "cluster3", "shard2", true); + DcClusterShardKeeper dcClusterShard7 = new DcClusterShardKeeper("jq", "cluster3", "shard3", true); + + ConcurrentMap> keeperFlowMap = Maps.newConcurrentMap(); + Map map1 = new HashMap<>(); map1.put(dcClusterShard1, 2L); map1.put(dcClusterShard4, 2L); map1.put(dcClusterShard5, 2L); keeperFlowMap.put("127.0.0.1", map1); - Map map2 = new HashMap<>(); + Map map2 = new HashMap<>(); map2.put(dcClusterShard2, 2L); map2.put(dcClusterShard6, 2L); keeperFlowMap.put("127.0.0.2", map2); - Map map3 = new HashMap<>(); + Map map3 = new HashMap<>(); map3.put(dcClusterShard3, 2L); map3.put(dcClusterShard7, 2L); keeperFlowMap.put("127.0.0.3", map3); @@ -144,17 +144,17 @@ public void testReportKeeperContainerInfo() { @Test public void DcClusterShardActive(){ - DcClusterShardActive active = new DcClusterShardActive(); + DcClusterShardKeeper active = new DcClusterShardKeeper(); active.setActive(true); active.setDcId("dc"); active.setClusterId("cluster"); active.setShardId("shard"); active.setPort(123); Assert.assertEquals(active.toString(), "dc:cluster:shard:true:123"); - DcClusterShardActive active1 = new DcClusterShardActive(active.toString()); + DcClusterShardKeeper active1 = new DcClusterShardKeeper(active.toString()); Assert.assertEquals(active.toString(), active1.toString()); Assert.assertEquals(active, active1); - DcClusterShardActive active2 = new DcClusterShardActive("dc","cluster","shard", true, 123); + DcClusterShardKeeper active2 = new DcClusterShardKeeper("dc","cluster","shard", true, 123); Assert.assertEquals(active2.hashCode(), active1.hashCode()); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/KeeperContainerController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/KeeperContainerController.java index 0c371ed513..88c4c6084e 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/KeeperContainerController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/KeeperContainerController.java @@ -57,16 +57,16 @@ public RetMessage setDiskType(@RequestBody ConfigModel configModel) { } @RequestMapping(value = "/keepercontainer/diskType", method = RequestMethod.GET) - public RetMessage getDiskType() { + public String getDiskType() { try { Map map = new HashMap<>(); List configs = configService.getConfigs(KEY_KEEPER_CONTAINER_STANDARD); for (ConfigModel configModel : configs) { map.put(configModel.getSubKey(), configModel.getVal()); } - return RetMessage.createSuccessMessage(map.toString()); + return map.toString(); } catch (Exception e) { - return RetMessage.createFailMessage(e.getMessage()); + return e.getMessage(); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/entity/IPPairData.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/entity/IPPairData.java index 8d15dcfc5d..7bbdee4f3e 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/entity/IPPairData.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/entity/IPPairData.java @@ -1,7 +1,6 @@ package com.ctrip.xpipe.redis.console.keeper.entity; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; -import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; import java.util.HashMap; @@ -10,18 +9,18 @@ public class IPPairData { private long inputFlow; private long peerData; - private final Map keeperUsedInfoMap = new HashMap<>(); + private final Map keeperUsedInfoMap = new HashMap<>(); public IPPairData() { } - public void removeDcClusterShard(Map.Entry migrateDcClusterShard) { + public void removeDcClusterShard(Map.Entry migrateDcClusterShard) { this.inputFlow -= migrateDcClusterShard.getValue().getInputFlow(); this.peerData -= migrateDcClusterShard.getValue().getPeerData(); keeperUsedInfoMap.remove(migrateDcClusterShard.getKey()); } - public void addDcClusterShard(Map.Entry migrateDcClusterShard) { + public void addDcClusterShard(Map.Entry migrateDcClusterShard) { this.inputFlow += migrateDcClusterShard.getValue().getInputFlow(); this.peerData += migrateDcClusterShard.getValue().getPeerData(); keeperUsedInfoMap.put(migrateDcClusterShard.getKey(), migrateDcClusterShard.getValue()); @@ -35,7 +34,7 @@ public long getPeerData() { return peerData; } - public Map getKeeperUsedInfoMap() { + public Map getKeeperUsedInfoMap() { return keeperUsedInfoMap; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/AbstractHandler.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/AbstractHandler.java index b3980b8022..4bf5f51e11 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/AbstractHandler.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/AbstractHandler.java @@ -2,11 +2,11 @@ public abstract class AbstractHandler implements Handler{ - private Handler nextHandler; + private Handler nextHandle; public Handler setNextHandler(Handler nextHandler) { - this.nextHandler = nextHandler; - return this; + this.nextHandle = nextHandler; + return this.nextHandle; } protected abstract boolean doNextHandler(T t); @@ -14,8 +14,8 @@ public Handler setNextHandler(Handler nextHandler) { @Override public boolean handle(T t) { if (doNextHandler(t)) { - if (nextHandler != null) { - return nextHandler.handle(t); + if (this.nextHandle != null) { + return this.nextHandle.handle(t); } else { return true; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java index 771abc7c10..9d8fb003f6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java @@ -1,10 +1,9 @@ package com.ctrip.xpipe.redis.console.keeper.handler; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.console.config.ConsoleConfig; -import com.ctrip.xpipe.redis.console.keeper.util.DefaultKeeperContainerUsedInfoAnalyzerUtil; -import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerUtil; +import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext; import com.ctrip.xpipe.utils.VisibleForTesting; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -18,25 +17,25 @@ public class KeeperContainerFilterChain { private ConsoleConfig config; public boolean doKeeperContainerFilter(KeeperContainerUsedInfoModel targetContainer){ - return new HostActiveHandler() - .setNextHandler(new HostDiskOverloadHandler(config)) - .setNextHandler(new KeeperContainerOverloadHandler()) - .handle(targetContainer); + Handler handler = new HostActiveHandler(); + handler.setNextHandler(new HostDiskOverloadHandler(config)) + .setNextHandler(new KeeperContainerOverloadHandler()); + return handler.handle(targetContainer); } - public boolean doKeeperFilter(Map.Entry keeperUsedInfoEntry, + public boolean doKeeperFilter(Map.Entry keeperUsedInfoEntry, KeeperContainerUsedInfoModel srcKeeperContainer, KeeperContainerUsedInfoModel targetKeeperContainer, - KeeperContainerUsedInfoAnalyzerUtil analyzerUtil){ - return new KeeperDataOverloadHandler(targetKeeperContainer) - .setNextHandler(new KeeperPairOverloadHandler(analyzerUtil, srcKeeperContainer, targetKeeperContainer, config)) - .handle(keeperUsedInfoEntry); + KeeperContainerUsedInfoAnalyzerContext analyzerUtil){ + Handler> handler = new KeeperDataOverloadHandler(targetKeeperContainer); + handler.setNextHandler(new KeeperPairOverloadHandler(analyzerUtil, srcKeeperContainer, targetKeeperContainer, config)); + return handler.handle(keeperUsedInfoEntry); } - public boolean doKeeperPairFilter(Map.Entry keeperUsedInfoEntry, + public boolean doKeeperPairFilter(Map.Entry keeperUsedInfoEntry, KeeperContainerUsedInfoModel keeperContainer1, KeeperContainerUsedInfoModel keeperContainer2, - KeeperContainerUsedInfoAnalyzerUtil analyzerUtil) { + KeeperContainerUsedInfoAnalyzerContext analyzerUtil) { return new KeeperPairOverloadHandler(analyzerUtil, keeperContainer1, keeperContainer2, config) .handle(keeperUsedInfoEntry); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperDataOverloadHandler.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperDataOverloadHandler.java index b6b010f2d6..e147b58fc6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperDataOverloadHandler.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperDataOverloadHandler.java @@ -1,13 +1,12 @@ package com.ctrip.xpipe.redis.console.keeper.handler; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; -import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel; import java.util.Map; -public class KeeperDataOverloadHandler extends AbstractHandler>{ +public class KeeperDataOverloadHandler extends AbstractHandler>{ private KeeperContainerUsedInfoModel targetKeeperContainer; @@ -17,7 +16,7 @@ public KeeperDataOverloadHandler(KeeperContainerUsedInfoModel targetKeeperContai } @Override - protected boolean doNextHandler(Map.Entry keeperUsedInfoEntry) { + protected boolean doNextHandler(Map.Entry keeperUsedInfoEntry) { return keeperUsedInfoEntry.getValue().getInputFlow() + targetKeeperContainer.getActiveInputFlow() < targetKeeperContainer.getInputFlowStandard() && keeperUsedInfoEntry.getValue().getPeerData() + targetKeeperContainer.getActiveRedisUsedMemory() < targetKeeperContainer.getRedisUsedMemoryStandard(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperPairOverloadHandler.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperPairOverloadHandler.java index a132074d0e..0c5004f5f0 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperPairOverloadHandler.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperPairOverloadHandler.java @@ -1,18 +1,17 @@ package com.ctrip.xpipe.redis.console.keeper.handler; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; import com.ctrip.xpipe.redis.console.config.ConsoleConfig; import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData; -import com.ctrip.xpipe.redis.console.keeper.util.DefaultKeeperContainerUsedInfoAnalyzerUtil; -import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerUtil; +import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext; import java.util.Map; -public class KeeperPairOverloadHandler extends AbstractHandler>{ +public class KeeperPairOverloadHandler extends AbstractHandler>{ - private KeeperContainerUsedInfoAnalyzerUtil analyzerUtil; + private KeeperContainerUsedInfoAnalyzerContext analyzerUtil; private KeeperContainerUsedInfoModel keeperContainer1; @@ -20,7 +19,7 @@ public class KeeperPairOverloadHandler extends AbstractHandler keeperUsedInfoEntry) { + protected boolean doNextHandler(Map.Entry keeperUsedInfoEntry) { IPPairData longLongPair = analyzerUtil.getIPPairData(keeperContainer1.getKeeperIp(), keeperContainer2.getKeeperIp()); if (longLongPair == null) return true; double keeperPairOverLoadFactor = config.getKeeperPairOverLoadFactor(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java index b5f5bbba10..15423689f3 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzer.java @@ -6,7 +6,7 @@ import com.ctrip.xpipe.command.ParallelCommandChain; import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; import com.ctrip.xpipe.monitor.CatEventMonitor; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; import com.ctrip.xpipe.redis.console.config.ConsoleConfig; @@ -18,8 +18,8 @@ import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer; import com.ctrip.xpipe.redis.console.keeper.handler.KeeperContainerFilterChain; import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData; -import com.ctrip.xpipe.redis.console.keeper.util.DefaultKeeperContainerUsedInfoAnalyzerUtil; -import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerUtil; +import com.ctrip.xpipe.redis.console.keeper.util.DefaultKeeperContainerUsedInfoAnalyzerContext; +import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext; import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel; import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel; import com.ctrip.xpipe.redis.console.service.KeeperContainerAnalyzerService; @@ -64,16 +64,12 @@ public class DefaultKeeperContainerUsedInfoAnalyzer extends AbstractService impl private PriorityQueue minPeerDataKeeperContainers; - private KeeperContainerUsedInfoAnalyzerUtil analyzerUtil = new DefaultKeeperContainerUsedInfoAnalyzerUtil(); - - private boolean breakFlag; + private KeeperContainerUsedInfoAnalyzerContext analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(); private static final String currentDc = FoundationService.DEFAULT.getDataCenter().toUpperCase(); private static final String KEEPER_RESOURCE_LACK = "keeper_resource_lack"; - private static final String KEEPER_ANALYZE_BREAK = "keeper_analyze_break"; - public DefaultKeeperContainerUsedInfoAnalyzer() {} public DefaultKeeperContainerUsedInfoAnalyzer(ConsoleConfig config, @@ -120,7 +116,7 @@ public List getCurrentDcMaxKeeperContainerFullSynchronizationTime() { List result = new ArrayList<>(); double keeperContainerIoRate = config.getKeeperContainerIoRate(); if (!currentDcAllKeeperContainerUsedInfoModelMap.isEmpty()) { - currentDcMaxKeeperContainerActiveRedisUsedMemory = analyzerUtil.getMaxActiveRedisUsedMemory(currentDcAllKeeperContainerUsedInfoModelMap); + currentDcMaxKeeperContainerActiveRedisUsedMemory = analyzerContext.getMaxActiveRedisUsedMemory(currentDcAllKeeperContainerUsedInfoModelMap); } result.add((int) (currentDcMaxKeeperContainerActiveRedisUsedMemory /1024/1024/keeperContainerIoRate/60)); return result; @@ -176,7 +172,7 @@ protected void doRun() throws Exception { transaction.logTransactionSwallowException("keeperContainer.analyze", currentDc, new Task() { @Override public void go() throws Exception { - analyzeKeeperContainerUsedInfo(); +// analyzeKeeperContainerUsedInfo(); } @Override @@ -206,20 +202,15 @@ private void removeExpireData() { @VisibleForTesting void analyzeKeeperContainerUsedInfo() { logger.info("[analyzeKeeperContainerUsedInfo] start, keeperContainer number {}", currentDcAllKeeperContainerUsedInfoModelMap.size()); - breakFlag = !analyzerUtil.initKeeperPairData(currentDcAllKeeperContainerUsedInfoModelMap); + if(!analyzerContext.initKeeperPairData(currentDcAllKeeperContainerUsedInfoModelMap)) return; keeperContainerAnalyzerService.initStandard(currentDcAllKeeperContainerUsedInfoModelMap); generateAllSortedDescKeeperContainerUsedInfoModelQueue(); List result = new ArrayList<>(); for (KeeperContainerUsedInfoModel infoModel : currentDcAllKeeperContainerUsedInfoModelMap.values()) { - if (breakFlag) { - CatEventMonitor.DEFAULT.logEvent(KEEPER_ANALYZE_BREAK, currentDc); - currentDcKeeperContainerMigrationResult = new ArrayList<>(); - return; - } List migrationKeeperDetails = getOverloadKeeperMigrationDetails(infoModel); if (migrationKeeperDetails != null) result.addAll(migrationKeeperDetails); - for (String ip : analyzerUtil.getAllPairsIP(infoModel.getKeeperIp())) { + for (String ip : analyzerContext.getAllPairsIP(infoModel.getKeeperIp())) { List keeperPairMigrationKeeperDetails = getKeeperPairMigrationKeeperDetails(infoModel, currentDcAllKeeperContainerUsedInfoModelMap.get(ip)); if (keeperPairMigrationKeeperDetails != null) result.addAll(keeperPairMigrationKeeperDetails); @@ -267,26 +258,22 @@ private List getOverloadKeeperMigrationDeta KeeperContainerOverloadCause overloadCause) { logger.info("[analyzeKeeperContainerUsedInfo] srcIp: {}, overloadCause:{}, overloadData:{}", src.getKeeperIp(), overloadCause.name(), overloadData); PriorityQueue anotherQueue = isPeerDataOverload ? minInputFlowKeeperContainers : minPeerDataKeeperContainers; - List> allDescDcClusterShards = getDescDcClusterShardDetails(src.getDetailInfo(), isPeerDataOverload); + List> allDescDcClusterShards = getDescDcClusterShardDetails(src.getDetailInfo(), isPeerDataOverload); List result = new ArrayList<>(); KeeperContainerUsedInfoModel target = null; MigrationKeeperContainerDetailModel keeperContainerMigrationDetail = null; MigrationKeeperContainerDetailModel switchActiveMigrationDetail = new MigrationKeeperContainerDetailModel(src, null, 0, true, false, overloadCause.name(), new ArrayList<>()); - for (Map.Entry dcClusterShard : allDescDcClusterShards) { + for (Map.Entry dcClusterShard : allDescDcClusterShards) { if (overloadData <= 0) break; if (!dcClusterShard.getKey().isActive()) continue; - String backUpKeeperIP = analyzerUtil.getBackUpKeeperIp(dcClusterShard.getKey()); + String backUpKeeperIP = analyzerContext.getBackUpKeeperIp(dcClusterShard.getKey()); if (backUpKeeperIP == null) { - breakFlag = true; - return null; + throw new RuntimeException("backUpKeeperIP is null, dcClusterShard: " + dcClusterShard.getKey()); } KeeperContainerUsedInfoModel backUpKeeper = currentDcAllKeeperContainerUsedInfoModelMap.get(backUpKeeperIP); - if (keeperContainerFilterChain.doKeeperContainerFilter(backUpKeeper) && keeperContainerFilterChain.doKeeperFilter(dcClusterShard, src, backUpKeeper, analyzerUtil)) { + if (keeperContainerFilterChain.doKeeperContainerFilter(backUpKeeper) && keeperContainerFilterChain.doKeeperFilter(dcClusterShard, src, backUpKeeper, analyzerContext)) { switchActiveMigrationDetail.addReadyToMigrateShard(dcClusterShard.getKey()); updateAllSortedDescKeeperContainerUsedInfoModelQueue(backUpKeeper.getKeeperIp(), new KeeperContainerUsedInfoModel(backUpKeeper, dcClusterShard)); - if (target != null && backUpKeeper.getKeeperIp().equals(target.getKeeperIp())) { - target = new KeeperContainerUsedInfoModel(backUpKeeper, dcClusterShard); - } overloadData = updateOverLoadData(isPeerDataOverload, overloadData, dcClusterShard.getValue()); continue; } @@ -296,20 +283,21 @@ private List getOverloadKeeperMigrationDeta logger.warn("[analyzeKeeperContainerUsedInfo] no available keeper containers {} for overload keeper container {}", availableKeeperContainers, src); CatEventMonitor.DEFAULT.logEvent(KEEPER_RESOURCE_LACK, currentDc); generateResult(result, keeperContainerMigrationDetail, switchActiveMigrationDetail); +// 資源不足case return result; } keeperContainerMigrationDetail = new MigrationKeeperContainerDetailModel(src, target, 0, false, false, overloadCause.name(), new ArrayList<>()); } - if (!keeperContainerFilterChain.doKeeperFilter(dcClusterShard, src, target, analyzerUtil) || - !keeperContainerFilterChain.doKeeperPairFilter(dcClusterShard, backUpKeeper, target, analyzerUtil)) { + if (!keeperContainerFilterChain.doKeeperFilter(dcClusterShard, src, target, analyzerContext) || + !keeperContainerFilterChain.doKeeperPairFilter(dcClusterShard, backUpKeeper, target, analyzerContext)) { updateSortedDescKeeperContainerUsedInfoModelQueue(anotherQueue, target.getKeeperIp(), target); target = null; if (keeperContainerMigrationDetail.getMigrateKeeperCount() != 0) result.add(keeperContainerMigrationDetail); continue; } keeperContainerMigrationDetail.addReadyToMigrateShard(dcClusterShard.getKey()); - target.setActiveInputFlow(target.getActiveInputFlow() + dcClusterShard.getValue().getInputFlow()).setTotalRedisUsedMemory(target.getTotalRedisUsedMemory() + dcClusterShard.getValue().getPeerData()); - analyzerUtil.updateMigrateIpPair(src.getKeeperIp(), backUpKeeper.getKeeperIp(), target.getKeeperIp(), dcClusterShard); + target.setActiveInputFlow(target.getActiveInputFlow() + dcClusterShard.getValue().getInputFlow()).setActiveRedisUsedMemory(target.getActiveRedisUsedMemory() + dcClusterShard.getValue().getPeerData()); + analyzerContext.updateMigrateIpPair(src.getKeeperIp(), backUpKeeper.getKeeperIp(), target.getKeeperIp(), dcClusterShard); overloadData = updateOverLoadData(isPeerDataOverload, overloadData, dcClusterShard.getValue()); } @@ -328,7 +316,7 @@ private List getKeeperPairMigrationKeeperDe .setFlowOverload((long) (Math.min(pairB.getInputFlowStandard(), pairA.getInputFlowStandard()) * keeperPairOverLoadFactor)) .setPeerDataOverload((long) (Math.min(pairB.getRedisUsedMemoryStandard(), pairA.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor)); - IPPairData longLongPair = analyzerUtil.getIPPairData(pairA.getKeeperIp(), pairB.getKeeperIp()); + IPPairData longLongPair = analyzerContext.getIPPairData(pairA.getKeeperIp(), pairB.getKeeperIp()); long overloadInputFlow = longLongPair.getInputFlow() - minStandardModel.getFlowOverload(); long overloadPeerData = longLongPair.getPeerData() - minStandardModel.getPeerDataOverload(); KeeperContainerOverloadCause overloadCause = getKeeperPairOverloadCause(overloadInputFlow, overloadPeerData); @@ -365,13 +353,13 @@ private List getKeeperPairMigrationDetails( long overloadData, PriorityQueue availableKeeperContainers, KeeperContainerOverloadCause overloadCause){ - List> allDcClusterShards = getDescDcClusterShardDetails(analyzerUtil.getAllDetailInfo(pairA.getKeeperIp(), pairB.getKeeperIp()), isPeerDataOverload); + List> allDcClusterShards = getDescDcClusterShardDetails(analyzerContext.getAllDetailInfo(pairA.getKeeperIp(), pairB.getKeeperIp()), isPeerDataOverload); List result = new ArrayList<>(); KeeperContainerUsedInfoModel target = null; List usedTarget = new ArrayList<>(); MigrationKeeperContainerDetailModel keeperContainerDetailModel = null; logger.debug("[analyzeKeeperPairOverLoad] pairA: {}, pairB: {}, overloadCause:{}, overloadData:{}, availableKeeperContainers:{} ", pairA, pairB, isPeerDataOverload, overloadData, availableKeeperContainers); - for (Map.Entry dcClusterShard : allDcClusterShards) { + for (Map.Entry dcClusterShard : allDcClusterShards) { if (overloadData <= 0) break; if (!dcClusterShard.getKey().isActive()) continue; String srcKeeperIp = dcClusterShard.getValue().getKeeperIP(); @@ -391,14 +379,14 @@ private List getKeeperPairMigrationDetails( } keeperContainerDetailModel = new MigrationKeeperContainerDetailModel(srcKeeperContainer, target, 0, false, true, overloadCause.name(), new ArrayList<>()); } - if (!keeperContainerFilterChain.doKeeperPairFilter(dcClusterShard, srcKeeperContainer, target, analyzerUtil)) { + if (!keeperContainerFilterChain.doKeeperPairFilter(dcClusterShard, srcKeeperContainer, target, analyzerContext)) { usedTarget.add(target); target = null; generateResult(result, keeperContainerDetailModel); continue; } keeperContainerDetailModel.addReadyToMigrateShard(dcClusterShard.getKey()); - analyzerUtil.updateMigrateIpPair(srcKeeperContainer.getKeeperIp(), backUpKeeperContainer.getKeeperIp(), target.getKeeperIp(), dcClusterShard); + analyzerContext.updateMigrateIpPair(srcKeeperContainer.getKeeperIp(), backUpKeeperContainer.getKeeperIp(), target.getKeeperIp(), dcClusterShard); overloadData = updateOverLoadData(isPeerDataOverload, overloadData, dcClusterShard.getValue()); } @@ -426,8 +414,8 @@ private KeeperContainerOverloadCause getKeeperContainerOverloadCause(long overlo } } - private List> getDescDcClusterShardDetails(Map allDetailInfo, boolean isPeerDataOverload) { - Comparator> comparator = isPeerDataOverload ? + private List> getDescDcClusterShardDetails(Map allDetailInfo, boolean isPeerDataOverload) { + Comparator> comparator = isPeerDataOverload ? Comparator.comparingLong(e -> e.getValue().getPeerData()) : Comparator.comparingLong(e -> e.getValue().getInputFlow()); return allDetailInfo.entrySet().stream() .sorted(comparator.reversed()) diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerUtil.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java similarity index 84% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerUtil.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java index 5d3cf22215..f82623c596 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerUtil.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java @@ -2,7 +2,7 @@ import com.ctrip.xpipe.monitor.CatEventMonitor; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData; @@ -10,14 +10,14 @@ import org.slf4j.LoggerFactory; import java.util.*; -public class DefaultKeeperContainerUsedInfoAnalyzerUtil implements KeeperContainerUsedInfoAnalyzerUtil{ +public class DefaultKeeperContainerUsedInfoAnalyzerContext implements KeeperContainerUsedInfoAnalyzerContext { - private static final Logger logger = LoggerFactory.getLogger(DefaultKeeperContainerUsedInfoAnalyzerUtil.class); + private static final Logger logger = LoggerFactory.getLogger(DefaultKeeperContainerUsedInfoAnalyzerContext.class); private Map> ipPairMap = new HashMap<>(); - private Map allDetailInfo = new HashMap<>(); + private Map allDetailInfo = new HashMap<>(); private static final String KEEPER_NO_BACKUP = "keeper_no_backup"; - public DefaultKeeperContainerUsedInfoAnalyzerUtil() {} + public DefaultKeeperContainerUsedInfoAnalyzerContext() {} @Override public long getMaxActiveRedisUsedMemory(Map usedInfo) { @@ -37,7 +37,7 @@ public boolean initKeeperPairData(Map used allDetailInfo.putAll(infoModel.getDetailInfo()); } } - for (Map.Entry entry : allDetailInfo.entrySet()) { + for (Map.Entry entry : allDetailInfo.entrySet()) { if (!entry.getKey().isActive()) continue; KeeperUsedInfo activeKeeperUsedInfo = entry.getValue(); String backUpKeeperIp = getBackUpKeeperIp(entry.getKey()); @@ -49,7 +49,7 @@ public boolean initKeeperPairData(Map used @Override public String getBackUpKeeperIp(DcClusterShard activeKeeper) { - KeeperUsedInfo backUpKeeperUsedInfo = allDetailInfo.get(new DcClusterShardActive(activeKeeper, false)); + KeeperUsedInfo backUpKeeperUsedInfo = allDetailInfo.get(new DcClusterShardKeeper(activeKeeper, false)); if (backUpKeeperUsedInfo == null) { CatEventMonitor.DEFAULT.logEvent(KEEPER_NO_BACKUP, activeKeeper.toString()); logger.warn("[analyzeKeeperPair] active keeper {} has no backup keeper", activeKeeper); @@ -79,17 +79,17 @@ public IPPairData getIPPairData(String ip1, String ip2) { } @Override - public Map getAllDetailInfo(String ip1, String ip2) { + public Map getAllDetailInfo(String ip1, String ip2) { return ipPairMap.get(ip1).get(ip2).getKeeperUsedInfoMap(); } @Override - public void updateMigrateIpPair(String srcKeeperIp, String srcKeeperIpPair, String targetKeeperIp, Map.Entry migrateDcClusterShard) { + public void updateMigrateIpPair(String srcKeeperIp, String srcKeeperIpPair, String targetKeeperIp, Map.Entry migrateDcClusterShard) { removeIpPair(srcKeeperIp, srcKeeperIpPair, migrateDcClusterShard); addIpPair(srcKeeperIpPair, targetKeeperIp, migrateDcClusterShard); } - private void addIpPair(String ip1, String ip2, Map.Entry migrateDcClusterShard) { + private void addIpPair(String ip1, String ip2, Map.Entry migrateDcClusterShard) { ipPairMap.computeIfAbsent(ip1, k -> new HashMap<>()); ipPairMap.get(ip1).computeIfAbsent(ip2, k -> new IPPairData()); ipPairMap.get(ip1).get(ip2).addDcClusterShard(migrateDcClusterShard); @@ -98,7 +98,7 @@ private void addIpPair(String ip1, String ip2, Map.Entry migrateDcClusterShard) { + private void removeIpPair(String ip1, String ip2, Map.Entry migrateDcClusterShard) { if (ipPairMap.containsKey(ip1) && ipPairMap.get(ip1).containsKey(ip2)) { ipPairMap.get(ip1).get(ip2).removeDcClusterShard(migrateDcClusterShard); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerUtil.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java similarity index 77% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerUtil.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java index 43e8d6da9b..58d528ef5a 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerUtil.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java @@ -1,7 +1,7 @@ package com.ctrip.xpipe.redis.console.keeper.util; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*; import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData; @@ -9,7 +9,7 @@ import java.util.List; import java.util.Map; -public interface KeeperContainerUsedInfoAnalyzerUtil { +public interface KeeperContainerUsedInfoAnalyzerContext { boolean initKeeperPairData(Map usedInfoMap); @@ -19,9 +19,9 @@ public interface KeeperContainerUsedInfoAnalyzerUtil { IPPairData getIPPairData(String ip1, String ip2); - Map getAllDetailInfo(String ip1, String ip2); + Map getAllDetailInfo(String ip1, String ip2); - void updateMigrateIpPair(String srcKeeperIp, String srcKeeperIpPair, String targetKeeperIp, Map.Entry migrateDcClusterShard); + void updateMigrateIpPair(String srcKeeperIp, String srcKeeperIpPair, String targetKeeperIp, Map.Entry migrateDcClusterShard); long getMaxActiveRedisUsedMemory(Map usedInfoMap); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index 85fb1cff89..81b99ef9e5 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -24,6 +24,7 @@ import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import com.ctrip.xpipe.utils.ObjectUtils; +import com.ctrip.xpipe.utils.VisibleForTesting; import com.ctrip.xpipe.utils.XpipeThreadFactory; import com.dianping.cat.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; @@ -263,7 +264,13 @@ public boolean switchMaster(String dcName, String clusterName, ShardModel shardM private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI))); - private InfoCommand generteInfoCommand(Endpoint key) { + @VisibleForTesting + public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { + this.keyedObjectPool = pool; + } + + @VisibleForTesting + public InfoCommand generteInfoCommand(Endpoint key) { if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java index 53cd62d354..232a6b0669 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java @@ -70,6 +70,7 @@ import com.ctrip.xpipe.redis.console.sentinel.impl.DefaultSentinelBalanceServiceTest; import com.ctrip.xpipe.redis.console.service.BasicServiceTest; import com.ctrip.xpipe.redis.console.service.MetaServiceTest; +import com.ctrip.xpipe.redis.console.service.ShardModelServiceTest; import com.ctrip.xpipe.redis.console.service.ShardServiceTest2; import com.ctrip.xpipe.redis.console.service.impl.*; import com.ctrip.xpipe.redis.console.service.meta.impl.*; @@ -236,7 +237,8 @@ RouteInfoControllerTest.class, RedisControllerTest.class, DcRelationsServiceTest.class, - DefaultMigrationProcessReporterTest.class + DefaultMigrationProcessReporterTest.class, + ShardModelServiceTest.class }) public class AllTests { diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java index cecefacba4..9f525748d8 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.console.service.ConfigService.KEY_KEEPER_CONTAINER_STANDARD; @@ -82,6 +83,7 @@ public void before() { Mockito.when(config.getClusterDividedParts()).thenReturn(2); Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(expireTime); Mockito.when(config.getKeeperPairOverLoadFactor()).thenReturn(5.0); + Mockito.when(config.getKeeperContainerDiskOverLoadFactor()).thenReturn(0.8); KeepercontainerTbl keepercontainerTbl = new KeepercontainerTbl(); keepercontainerTbl.setKeepercontainerActive(true); Mockito.doNothing().when(executor).execute(Mockito.any()); @@ -445,7 +447,7 @@ public void testGetAllDcReadyToMigrationKeeperContainersWithMixed() { analyzer.getCurrentDcKeeperContainerUsedInfoModelsMap().putAll(models); analyzer.analyzeKeeperContainerUsedInfo(); List allDcReadyToMigrationKeeperContainers = analyzer.getCurrentDcReadyToMigrationKeeperContainers(); - Assert.assertEquals(3, allDcReadyToMigrationKeeperContainers.size()); + Assert.assertEquals(3, allDcReadyToMigrationKeeperContainers.stream().filter(MigrationKeeperContainerDetailModel::isKeeperPairOverload).count()); } @Test @@ -457,13 +459,13 @@ public void testMultiSrcKeeperSingleTargetWithMixed() { .createKeeper(Cluster1, Shard2, true, 1, 7) .createKeeper(Cluster2, Shard1, true, 1, 2) .createKeeper(Cluster2, Shard2, true, 1, 2) - .createKeeper(Cluster3, Shard1, false, 1, 8) + .createKeeper(Cluster3, Shard1, false, 1, 7) .createKeeper(Cluster3, Shard2, false, 1, 7) .createKeeper(Cluster4, Shard1, false, 1, 2) .createKeeper(Cluster4, Shard2, false, 1, 2); - createKeeperContainer(models, IP2, 4, 19) - .createKeeper(Cluster3, Shard1, true, 1, 8) + createKeeperContainer(models, IP2, 4, 18) + .createKeeper(Cluster3, Shard1, true, 1, 7) .createKeeper(Cluster3, Shard2, true, 1, 7) .createKeeper(Cluster4, Shard1, true, 1, 2) .createKeeper(Cluster4, Shard2, true, 1, 2) diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ConfigServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ConfigServiceTest.java index 6f0b1c133b..b3b353f339 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ConfigServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ConfigServiceTest.java @@ -10,6 +10,7 @@ import java.util.List; +import static com.ctrip.xpipe.redis.console.service.ConfigService.KEY_KEEPER_CONTAINER_STANDARD; import static com.ctrip.xpipe.redis.console.service.ConfigService.KEY_SENTINEL_CHECK_EXCLUDE; /** @@ -83,6 +84,12 @@ public void testStartSentinelCheck() throws Exception { Assert.assertEquals(configModel.getVal(), String.valueOf(false)); } + @Test + public void testGetConfigs() throws Exception { + List configs = service.getConfigs(KEY_KEEPER_CONTAINER_STANDARD); + Assert.assertEquals(configs.size(), 0); + } + @Test public void testStopSentinelCheck() throws Exception { configModel.setSubKey(mockClusterName); diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java index 8305797489..97c80f974c 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java @@ -1,15 +1,20 @@ package com.ctrip.xpipe.redis.console.service; import com.ctrip.xpipe.command.DefaultCommandFuture; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSessionManager; import com.ctrip.xpipe.redis.console.model.RedisTbl; import com.ctrip.xpipe.redis.console.model.ShardModel; import com.ctrip.xpipe.redis.console.model.ShardTbl; +import com.ctrip.xpipe.redis.console.service.impl.DefaultKeeperAdvancedService; import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl.*; import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import org.apache.logging.log4j.core.config.CronScheduledFuture; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -20,6 +25,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -37,9 +45,6 @@ public class ShardModelServiceTest extends ShardModelServiceImpl{ @Mock private RedisService redisService; - @Mock - private RedisSessionManager redisSessionManager; - @Mock private RedisSession redisSession; @@ -63,10 +68,7 @@ public void initMockData() { newKeepers.add(new RedisTbl().setRedisIp("ip1").setRedisPort(6380)); newKeepers.add(new RedisTbl().setRedisIp("ip2").setRedisPort(6381)); when(keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcIp, targetIp, true)).thenReturn(newKeepers); - RedisSession active = new RedisSession(); - RedisSession backUp = new RedisSession(); - when(redisSessionManager.findOrCreateSession(new HostPort("ip1", 6380))).thenReturn(active); - when(redisSessionManager.findOrCreateSession(new HostPort("ip2", 6381))).thenReturn(backUp); + shardModelService.setKeyedObjectPool(new XpipeNettyClientKeyedObjectPool()); } @Test @@ -76,10 +78,24 @@ public void testMigrateAutoBalanceKeepers() { @Test public void testFullSyncJudgeTask() { -// when(redisSession.infoReplication(any())).thenReturn(new DefaultCommandFuture<>()); -// FullSyncJudgeTask task = new FullSyncJudgeTask(redisSession, redisSession, 1000, 1000, dcName, clusterName, shardModel); -// task.setScheduledFuture(new CronScheduledFuture(null, null)); -// task.run(); + InfoCommand infoCommand1 = shardModelService.generteInfoCommand(new DefaultEndPoint("1", 6380)); + InfoCommand infoCommand2 = shardModelService.generteInfoCommand(new DefaultEndPoint("2", 6380)); + FullSyncJudgeTask task = new FullSyncJudgeTask(infoCommand1, infoCommand2, 1000, 1000, dcName, clusterName, shardModel); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(task, 1000, 1000, TimeUnit.MILLISECONDS); + task.setScheduledFuture(scheduledFuture); + task.run(); + } + + @Test + public void testGetSwitchMaterNewKeepers() { + ShardModel model = new ShardModel(); + List keepers = new ArrayList<>(); + keepers.add(new RedisTbl().setMaster(true)); + model.setKeepers(keepers); + List switchMaterNewKeepers = new DefaultKeeperAdvancedService().getSwitchMaterNewKeepers(model); + Assert.assertEquals(switchMaterNewKeepers.size(), 1); + Assert.assertFalse(switchMaterNewKeepers.get(0).isMaster()); } } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java index 4dc92db19b..a9c60ba7ab 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java @@ -1,7 +1,7 @@ package com.ctrip.xpipe.redis.console.service.impl; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; -import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel; import com.ctrip.xpipe.redis.console.model.ShardModel; @@ -53,18 +53,18 @@ public void testMigrationKeeperContainer() { KeeperContainerUsedInfoModel src = new KeeperContainerUsedInfoModel() .setKeeperIp("1.1.1.1").setDcName("jq").setTotalInputFlow(300 * 1024 * 1024L) .setTotalRedisUsedMemory(500 * 1024 * 1024 * 1024L); - Map detailInfo = Maps.newHashMap(); - detailInfo.put(new DcClusterShardActive("jq", "cluster1", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(200 * 1024 * 1024L, 400 * 1024 * 1024L, "")); - detailInfo.put(new DcClusterShardActive("jq", "cluster1", "shard2", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(20 * 1024 * 1024L, 20 * 1024 * 1024L, "")); - detailInfo.put(new DcClusterShardActive("jq", "cluster2", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(30 * 1024 * 1024L, 30 * 1024 * 1024L, "")); - detailInfo.put(new DcClusterShardActive("jq", "cluster2", "shard2", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(40 * 1024 * 1024L, 40 * 1024 * 1024L, "")); + Map detailInfo = Maps.newHashMap(); + detailInfo.put(new DcClusterShardKeeper("jq", "cluster1", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(200 * 1024 * 1024L, 400 * 1024 * 1024L, "")); + detailInfo.put(new DcClusterShardKeeper("jq", "cluster1", "shard2", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(20 * 1024 * 1024L, 20 * 1024 * 1024L, "")); + detailInfo.put(new DcClusterShardKeeper("jq", "cluster2", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(30 * 1024 * 1024L, 30 * 1024 * 1024L, "")); + detailInfo.put(new DcClusterShardKeeper("jq", "cluster2", "shard2", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(40 * 1024 * 1024L, 40 * 1024 * 1024L, "")); src.setDetailInfo(detailInfo); KeeperContainerUsedInfoModel target = new KeeperContainerUsedInfoModel() .setKeeperIp("2.2.2.2").setDcName("jq").setTotalInputFlow(300 * 1024 * 1024L) .setTotalRedisUsedMemory(500 * 1024 * 1024 * 1024L); - Map detailInfo2 = Maps.newHashMap(); - detailInfo2.put(new DcClusterShardActive("jq", "cluster1", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(200 * 1024 * 1024L, 400 * 1024 * 1024L, "")); + Map detailInfo2 = Maps.newHashMap(); + detailInfo2.put(new DcClusterShardKeeper("jq", "cluster1", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(200 * 1024 * 1024L, 400 * 1024 * 1024L, "")); target.setDetailInfo(detailInfo2); List migrationShards = new ArrayList<>(); @@ -90,4 +90,20 @@ public void testMigrationKeeperContainer() { service.beginMigrateKeeperContainers(models); Assert.assertEquals(9, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); } + + @Test + public void testClone() { + KeeperContainerUsedInfoModel target = new KeeperContainerUsedInfoModel() + .setKeeperIp("2.2.2.2").setDcName("jq").setTotalInputFlow(300 * 1024 * 1024L) + .setTotalRedisUsedMemory(500 * 1024 * 1024 * 1024L); + Map detailInfo2 = Maps.newHashMap(); + detailInfo2.put(new DcClusterShardKeeper("jq", "cluster1", "shard1", true), new KeeperContainerUsedInfoModel.KeeperUsedInfo(200 * 1024 * 1024L, 400 * 1024 * 1024L, "")); + target.setDetailInfo(detailInfo2); + KeeperContainerUsedInfoModel usedInfoModel = KeeperContainerUsedInfoModel.cloneKeeperContainerUsedInfoModel(target); + for (Map.Entry entry : detailInfo2.entrySet()) { + KeeperContainerUsedInfoModel model = new KeeperContainerUsedInfoModel(target, entry); + Assert.assertEquals(model.getKeeperIp(), usedInfoModel.getKeeperIp()); + } + Assert.assertEquals(target.getDcName(), usedInfoModel.getDcName()); + } } \ No newline at end of file diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/server/FakeRedisServer.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/server/FakeRedisServer.java index e96ee4ef15..2d2e111aaf 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/server/FakeRedisServer.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/server/FakeRedisServer.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.simpleserver.IoAction; import com.ctrip.xpipe.simpleserver.IoActionFactory; import com.ctrip.xpipe.simpleserver.Server; +import com.ctrip.xpipe.utils.VisibleForTesting; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -273,5 +274,10 @@ public boolean isPartialSyncFail() { public void setPartialSyncFail(boolean partialSyncFail) { this.partialSyncFail = partialSyncFail; } + + @VisibleForTesting + public void setRdbSize(int rdbSize) { + this.rdbSize = rdbSize; + } } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java index 13cd69984b..9bcec52da5 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.keeper.impl.fakeredis; +import com.ctrip.xpipe.redis.core.protocal.MASTER_STATE; import com.ctrip.xpipe.redis.core.protocal.cmd.InMemoryPsync; import com.ctrip.xpipe.redis.core.store.ReplicationStore; import com.ctrip.xpipe.redis.keeper.AbstractFakeRedisTest; @@ -7,6 +8,7 @@ import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer; import com.ctrip.xpipe.redis.keeper.store.DefaultReplicationStore; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -15,15 +17,20 @@ * 2016年4月21日 下午5:42:29 */ public class DefaultRedisKeeperServerConnectToFakeRedisTest extends AbstractFakeRedisTest { - + + @Before + public void setupDefaultRedisKeeperServerConnectToFakeRedisTest() { + this.fakeRedisServer.setRdbSize(10000); + } + @Test public void testReplicationData() throws Exception{ - + RedisKeeperServer redisKeeperServer = startRedisKeeperServerAndConnectToFakeRedis(); sleep(1500); - - logger.info(remarkableMessage("[testReplicationData][read replication store]")); - + + logger.info(remarkableMessage("[testReplicationData][read replication store]")); + ReplicationStore replicationStore = redisKeeperServer.getReplicationStore(); byte[] rdbContent = readRdbFileTilEnd(replicationStore); Assert.assertArrayEquals(fakeRedisServer.getRdbContent(), rdbContent); @@ -31,62 +38,64 @@ public void testReplicationData() throws Exception{ String commands = readCommandFileTilEnd(replicationStore, fakeRedisServer.currentCommands().length()); Assert.assertEquals(fakeRedisServer.currentCommands(), commands); } - + @Test public void testNewDumpCommandsLost() throws Exception{ startKeeperServerAndTestReFullSync(2, allCommandsSize); } - + @Test public void testNewDumpCommandsTooMush() throws Exception{ - + startKeeperServerAndTestReFullSync(100, (int) (allCommandsSize * 0.8)); } private void startKeeperServerAndTestReFullSync(int fileToKeep, int maxTransferCommnadsSize) throws Exception { - + RedisKeeperServer redisKeeperServer = startRedisKeeperServerAndConnectToFakeRedis(fileToKeep, maxTransferCommnadsSize, 1000); int keeperPort = redisKeeperServer.getListeningPort(); sleep(3000); logger.info(remarkableMessage("send psync to redump rdb")); - + int rdbDumpCount1 = ((DefaultReplicationStore)redisKeeperServer.getReplicationStore()).getRdbUpdateCount(); - + InMemoryPsync psync = sendInmemoryPsync("localhost", keeperPort); - sleep(3000); + waitConditionUntilTimeOut(() -> psync.getCommands().length >= fakeRedisServer.getCommandsLength()); int rdbDumpCount2 = ((DefaultReplicationStore)redisKeeperServer.getReplicationStore()).getRdbUpdateCount(); Assert.assertEquals(rdbDumpCount1 + 1, rdbDumpCount2); - + assertPsyncResultEquals(psync); } - + @Test public void testDumpWhileWaitForRdb() throws Exception{ - + int sleepBeforeSendRdb = 2000; - + fakeRedisServer.setSleepBeforeSendRdb(sleepBeforeSendRdb); RedisKeeperServer redisKeeperServer = startRedisKeeperServerAndConnectToFakeRedis(100, allCommandsSize); - + waitConditionUntilTimeOut(() -> redisKeeperServer.getRedisMaster().getMasterState().equals(MASTER_STATE.REDIS_REPL_CONNECTED)); + sleep(sleepBeforeSendRdb/4); int rdbDumpCount1 = ((DefaultRedisKeeperServer)redisKeeperServer).getRdbDumpTryCount(); Assert.assertEquals(1, rdbDumpCount1); - + int keeperPort = redisKeeperServer.getListeningPort(); logger.info(remarkableMessage("send psync to keeper port:{}"), keeperPort); - + InMemoryPsync psync = sendInmemoryPsync("localhost", keeperPort, "?", -1L); - + sleep(1000); int rdbDumpCount2 = ((DefaultRedisKeeperServer)redisKeeperServer).getRdbDumpTryCount(); Assert.assertEquals(1, rdbDumpCount2); - + sleep(sleepBeforeSendRdb); waitForPsyncResultEquals(psync); } - + } +