Skip to content

Commit

Permalink
自动匀keeper自动巡检部分bug修复
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Mar 14, 2024
1 parent d011884 commit 09ead23
Show file tree
Hide file tree
Showing 26 changed files with 465 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction;
import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder;
import com.ctrip.xpipe.redis.checker.impl.KeeperContainerInfoReporter;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -115,7 +114,7 @@ public Map<HostPort, HealthStatusDesc> getAllHealthStatusDesc() {
}

@GetMapping("/health/keeper/status/all")
public ConcurrentMap<String, Map<DcClusterShardActive, Long>> getAllKeeperFlows() {
public ConcurrentMap<String, Map<DcClusterShardKeeper, Long>> getAllKeeperFlows() {
return keeperFlowCollector.getHostPort2InputFlow();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction;
import com.ctrip.xpipe.redis.checker.healthcheck.KeeperInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.KeeperSupport;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
import com.ctrip.xpipe.utils.MapUtils;
import org.slf4j.Logger;
Expand All @@ -19,16 +19,17 @@ public class KeeperFlowCollector implements KeeperInfoStatsActionListener, Keepe

private final Logger logger = LoggerFactory.getLogger(getClass());

protected ConcurrentMap<String, Map<DcClusterShardActive, Long>> hostPort2InputFlow = new ConcurrentHashMap<>();
protected ConcurrentMap<String, Map<DcClusterShardKeeper, Long>> hostPort2InputFlow = new ConcurrentHashMap<>();

@Override
public void onAction(KeeperInfoStatsActionContext context) {
try {
InfoResultExtractor extractor = context.getResult();
KeeperInstanceInfo info = context.instance().getCheckInfo();
long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue();
Map<DcClusterShardActive, Long> keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new);
keeperContainerResult.put(new DcClusterShardActive(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow);
deleteKeeper(info);
Map<DcClusterShardKeeper, Long> keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow);
} catch (Throwable throwable) {
logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable);
}
Expand All @@ -38,22 +39,19 @@ public void onAction(KeeperInfoStatsActionContext context) {
@Override
public void stopWatch(HealthCheckAction action) {
KeeperInstanceInfo instanceInfo = (KeeperInstanceInfo) action.getActionInstance().getCheckInfo();
logger.info("stopWatch: {}", new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), instanceInfo.isActive(), instanceInfo.getHostPort().getPort()));
logger.info("stop before: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort())));
logger.info("stop before: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort())));
logger.info("stopWatch: {}", new DcClusterShardKeeper(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), instanceInfo.isActive(), instanceInfo.getHostPort().getPort()));
deleteKeeper(instanceInfo);
}

private void deleteKeeper(KeeperInstanceInfo instanceInfo) {
if (hostPort2InputFlow.get(instanceInfo.getHostPort().getHost()) == null) return;
hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.remove(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort()));
.remove(new DcClusterShardKeeper(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort()));
hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.remove(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort()));
logger.info("stop after: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), true, instanceInfo.getHostPort().getPort())));
logger.info("stop after: {}", hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.get(new DcClusterShardActive(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort())));
.remove(new DcClusterShardKeeper(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId(), false, instanceInfo.getHostPort().getPort()));
}

public ConcurrentMap<String, Map<DcClusterShardActive, Long>> getHostPort2InputFlow() {
public ConcurrentMap<String, Map<DcClusterShardKeeper, Long>> getHostPort2InputFlow() {
return hostPort2InputFlow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ctrip.xpipe.redis.core.protocal.cmd.pubsub.*;
import com.ctrip.xpipe.redis.core.protocal.pojo.RedisInfo;
import com.ctrip.xpipe.redis.core.protocal.pojo.Role;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -359,6 +360,11 @@ public String toString() {
return "";
}

@VisibleForTesting
public void setEndpoint(Endpoint endpoint) {
this.endpoint = endpoint;
}

public interface RollCallback {

void role(String role, Role detail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperContainerMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperDiskInfo;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.VisibleForTesting;
Expand All @@ -30,7 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

public class KeeperContainerInfoReporter implements GroupCheckerLeaderAware {

Expand Down Expand Up @@ -107,8 +105,8 @@ public void notLeader() {
public void reportKeeperContainerInfo() {
try {
logger.debug("[reportKeeperContainerInfo] start");
Map<String, Map<DcClusterShardActive, Long>> collectedInfos = keeperFlowCollector.getHostPort2InputFlow();
Map<String, Map<DcClusterShardActive, Long>> hostPort2InputFlow = new HashMap<>();
Map<String, Map<DcClusterShardKeeper, Long>> collectedInfos = keeperFlowCollector.getHostPort2InputFlow();
Map<String, Map<DcClusterShardKeeper, Long>> hostPort2InputFlow = new HashMap<>();
for (DcMeta dcMeta : metaCache.getXpipeMeta().getDcs().values()) {
if (CURRENT_IDC.equalsIgnoreCase(dcMeta.getId())) {
dcMeta.getKeeperContainers().forEach(keeperContainerMeta -> {
Expand All @@ -131,24 +129,24 @@ public void reportKeeperContainerInfo() {
long totalRedisUsedMemory = 0;
int activeKeeperCount = 0;
int totalKeeperCount = 0;
Map<DcClusterShardActive, KeeperUsedInfo> detailInfo = new HashMap<>();
for (Map.Entry<DcClusterShardActive, Long> entry : inputFlowMap.entrySet()) {
Map<DcClusterShardKeeper, KeeperUsedInfo> detailInfo = new HashMap<>();
for (Map.Entry<DcClusterShardKeeper, Long> entry : inputFlowMap.entrySet()) {
totalKeeperCount++;
totalInputFlow += entry.getValue();
DcClusterShardActive dcClusterShardActive = entry.getKey();
DcClusterShardKeeper dcClusterShardKeeper = entry.getKey();
long inputFlow = entry.getValue();
Long redisUsedMemory = dcClusterShardUsedMemory.get(new DcClusterShard().setDcId(dcClusterShardActive.getDcId()).setClusterId(dcClusterShardActive.getClusterId()).setShardId(dcClusterShardActive.getShardId()));
Long redisUsedMemory = dcClusterShardUsedMemory.get(new DcClusterShard().setDcId(dcClusterShardKeeper.getDcId()).setClusterId(dcClusterShardKeeper.getClusterId()).setShardId(dcClusterShardKeeper.getShardId()));
if (redisUsedMemory == null) {
logger.warn("[reportKeeperContainerInfo] redisUsedMemory is null, dcClusterShard: {}", entry.getKey());
redisUsedMemory = 0L;
}
totalRedisUsedMemory += redisUsedMemory;
if (dcClusterShardActive.isActive()) {
if (dcClusterShardKeeper.isActive()) {
activeRedisUsedMemory += redisUsedMemory;
activeInputFlow += inputFlow;
activeKeeperCount++;
}
detailInfo.put(dcClusterShardActive, new KeeperUsedInfo(redisUsedMemory, inputFlow, keeperIp));
detailInfo.put(dcClusterShardKeeper, new KeeperUsedInfo(redisUsedMemory, inputFlow, keeperIp));

}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
package com.ctrip.xpipe.redis.checker.model;

import com.ctrip.xpipe.endpoint.HostPort;

import java.io.Serializable;
import java.util.Objects;

public class DcClusterShardActive extends DcClusterShard implements Serializable {
public class DcClusterShardKeeper extends DcClusterShard implements Serializable {

private boolean active;

private String ip;

private int port;

public DcClusterShardActive(){}
public DcClusterShardKeeper(){}

public DcClusterShardActive(String dcId, String clusterId, String shardId, boolean active, int port) {
public DcClusterShardKeeper(String dcId, String clusterId, String shardId, boolean active, int port) {
super(dcId, clusterId, shardId);
this.active = active;
this.port = port;
}

public DcClusterShardActive(String dcId, String clusterId, String shardId, boolean active) {
public DcClusterShardKeeper(String dcId, String clusterId, String shardId, boolean active) {
super(dcId, clusterId, shardId);
this.active = active;
}

public DcClusterShardActive(DcClusterShard dcClusterShard, boolean active) {
public DcClusterShardKeeper(DcClusterShard dcClusterShard, boolean active) {
super(dcClusterShard.getDcId(), dcClusterShard.getClusterId(), dcClusterShard.getShardId());
this.active = active;
}

public DcClusterShardActive(String info) {
public DcClusterShardKeeper(String info) {
String[] split = info.split(SPLITTER);
if (split.length >= 5) {
this.dcId = split[0];
Expand Down Expand Up @@ -59,15 +59,15 @@ public void setPort(int port) {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DcClusterShardActive)) return false;
if (!(o instanceof DcClusterShardKeeper)) return false;
if (!super.equals(o)) return false;
DcClusterShardActive that = (DcClusterShardActive) o;
DcClusterShardKeeper that = (DcClusterShardKeeper) o;
return isActive() == that.isActive();
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), isActive());
return Objects.hash(super.hashCode(), port);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class KeeperContainerUsedInfoModel {

private int totalKeeperCount;

private Map<DcClusterShardActive, KeeperUsedInfo> detailInfo;
private Map<DcClusterShardKeeper, KeeperUsedInfo> detailInfo;

private boolean keeperContainerActive;

Expand All @@ -50,7 +50,7 @@ public KeeperContainerUsedInfoModel(String keeperIp, String dcName, long activeI
this.activeRedisUsedMemory = activeRedisUsedMemory;
}

public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entry<DcClusterShardActive, KeeperUsedInfo> dcClusterShard) {
public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard) {

Check warning on line 53 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L53

Added line #L53 was not covered by tests
this.keeperIp = model.getKeeperIp();
this.dcName = model.getDcName();
this.org = model.getOrg();
Expand Down Expand Up @@ -95,10 +95,10 @@ public static KeeperContainerUsedInfoModel cloneKeeperContainerUsedInfoModel(Kee
return newModel;

Check warning on line 95 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L90-L95

Added lines #L90 - L95 were not covered by tests
}

private static Map<DcClusterShardActive, KeeperUsedInfo> getKeeperUsedInfoMap(KeeperContainerUsedInfoModel model) {
Map<DcClusterShardActive, KeeperUsedInfo> clonedDetailInfo = new HashMap<>();
for (Map.Entry<DcClusterShardActive, KeeperUsedInfo> entry : model.getDetailInfo().entrySet()) {
DcClusterShardActive key = new DcClusterShardActive(entry.getKey().getDcId(), entry.getKey().getClusterId(), entry.getKey().getShardId(), entry.getKey().isActive(), entry.getKey().getPort());
private static Map<DcClusterShardKeeper, KeeperUsedInfo> getKeeperUsedInfoMap(KeeperContainerUsedInfoModel model) {
Map<DcClusterShardKeeper, KeeperUsedInfo> clonedDetailInfo = new HashMap<>();

Check warning on line 99 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L99

Added line #L99 was not covered by tests
for (Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> entry : model.getDetailInfo().entrySet()) {
DcClusterShardKeeper key = new DcClusterShardKeeper(entry.getKey().getDcId(), entry.getKey().getClusterId(), entry.getKey().getShardId(), entry.getKey().isActive(), entry.getKey().getPort());
KeeperUsedInfo value = new KeeperUsedInfo(entry.getValue().getPeerData(), entry.getValue().getInputFlow(), entry.getValue().keeperIP);
clonedDetailInfo.put(key, value);
}
Expand Down Expand Up @@ -169,11 +169,11 @@ public KeeperContainerUsedInfoModel setTotalRedisUsedMemory(long totalRedisUsedM
return this;
}

public Map<DcClusterShardActive, KeeperUsedInfo> getDetailInfo() {
public Map<DcClusterShardKeeper, KeeperUsedInfo> getDetailInfo() {
return detailInfo;
}

public KeeperContainerUsedInfoModel setDetailInfo(Map<DcClusterShardActive, KeeperUsedInfo> detailInfo) {
public KeeperContainerUsedInfoModel setDetailInfo(Map<DcClusterShardKeeper, KeeperUsedInfo> detailInfo) {
this.detailInfo = detailInfo;
return this;
}
Expand Down Expand Up @@ -359,7 +359,7 @@ public String toString() {
@VisibleForTesting
public KeeperContainerUsedInfoModel createKeeper(String clusterId, String shardId, boolean active, long inputFlow, long redisUsedMemory){
if (this.detailInfo == null) this.detailInfo = new HashMap<>();
detailInfo.put(new DcClusterShardActive(this.dcName, clusterId, shardId, active), new KeeperUsedInfo(redisUsedMemory, inputFlow, this.keeperIp));
detailInfo.put(new DcClusterShardKeeper(this.dcName, clusterId, shardId, active), new KeeperUsedInfo(redisUsedMemory, inputFlow, this.keeperIp));

Check warning on line 362 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L362

Added line #L362 was not covered by tests
this.setDetailInfo(detailInfo);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.redis.checker.healthcheck.*;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService;
import com.ctrip.xpipe.redis.checker.healthcheck.config.HealthCheckConfig;
Expand Down Expand Up @@ -81,6 +82,14 @@ public void afterDelayActionTest() {
Assert.assertFalse(instanceNull.get());
}

@Test
public void testToString() {
RedisSession session1 = new RedisSession();
Assert.assertEquals(session1.toString(), "");
session1.setEndpoint(new DefaultEndPoint("10.10.10.10", 6380));
Assert.assertEquals(session1.toString(), "redis://10.10.10.10:6380");
}

@Test
public void testRedisUp() throws Exception {
delayHealth.set(false);
Expand Down
Loading

0 comments on commit 09ead23

Please sign in to comment.