Skip to content

Commit

Permalink
add keeperContainer tag
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Mar 4, 2025
1 parent 1ed4d95 commit e62ccac
Show file tree
Hide file tree
Showing 30 changed files with 197 additions and 77 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ Redis 也可能会挂,Redis 本身提供哨兵 (Sentinel) 机制保证集群
- 【有任何疑问,请阅读】[XPipe Wiki](https://github.com/ctripcorp/x-pipe/wiki)
- 【目前用户的问题整理】[XPipe Q&A](https://github.com/ctripcorp/x-pipe/wiki/XPipe-Q&A)
- 【文章】[携程Redis多数据中心解决方案-XPipe](https://mp.weixin.qq.com/s/Q3bt0-5nv8uNMdHuls-Exw?)
- 【文章】[携程Redis海外机房数据同步实践](https://mp.weixin.qq.com/s/LeSSdT6bOEFzZyN26PRVzg)
- 【PPT】[携程内XPipe使用介绍](https://docs.c-ctrip.com/files/6/portal/0AS2w12000947w1mw6A59.pptx)

<a name="技术交流"></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public HealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService
this.instanceLongDelayMilli = ()->instance.getHealthCheckConfig().instanceLongDelayMilli();
this.delayDownAfterMilli = () -> {
DelayConfig delayConfig = instance.getHealthCheckConfig().getDelayConfig(instance.getCheckInfo().getClusterId(), currentDcId, instance.getCheckInfo().getDcId());
return delayConfig.getClusterLevelDelayDownAfterMilli() > 0 ? delayConfig.getClusterLevelDelayDownAfterMilli() : delayConfig.getDcLevelDelayDownAfterMilli();};
return delayConfig.getClusterLevelDelayDownAfterMilli();};
this.healthyDelayMilli = () -> {
DelayConfig delayConfig = instance.getHealthCheckConfig().getDelayConfig(instance.getCheckInfo().getClusterId(), currentDcId, instance.getCheckInfo().getDcId());
return delayConfig.getClusterLevelHealthyDelayMilli() > 0 ? delayConfig.getClusterLevelHealthyDelayMilli() : delayConfig.getDcLevelHealthyDelayMilli();};
return delayConfig.getClusterLevelHealthyDelayMilli();};
checkParam();
}

private void checkParam() {
if(this.delayDownAfterMilli.getAsInt() < this.pingDownAfterMilli.getAsInt()) {
if(this.delayDownAfterMilli.getAsInt() > 0 && this.delayDownAfterMilli.getAsInt() < this.pingDownAfterMilli.getAsInt()) {
logger.error("Ping-Down-After-Milli must smaller than Delay-Down-After-Milli");
}
}
Expand Down Expand Up @@ -151,7 +151,7 @@ void delay(long delayMilli, long...srcShardDbId){
lastHealthDelayTime.compareAndSet(UNSET_TIME, System.currentTimeMillis());

delayLogger.debug("{}, {}", instance.getCheckInfo().getHostPort(), delayMilli);
if(delayMilli >= 0 && delayMilli <= healthyDelayMilli.getAsInt()){
if (delayMilli >= 0 && (delayMilli <= healthyDelayMilli.getAsInt() || healthyDelayMilli.getAsInt() < 0)) {
lastHealthDelayTime.set(System.currentTimeMillis());
setDelayUp();
}
Expand Down Expand Up @@ -181,7 +181,9 @@ protected void healthStatusUpdate() {
final int delayDownAfter = delayDownAfterMilli.getAsInt();
final int instanceLongDelay = instanceLongDelayMilli.getAsInt();

if ( delayDownTime > delayDownAfter) {
if (delayDownAfter < 0) {
// skip for negative distance
} else if ( delayDownTime > delayDownAfter) {
setDelayDown();
}else if(delayDownTime >= instanceLongDelay){
setDelayHalfDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class KeeperContainerUsedInfoModel {

private String org;

private String tag;

private String az;

private String updateTime;
Expand Down Expand Up @@ -62,6 +64,7 @@ public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entr
this.keeperIp = model.getKeeperIp();
this.dcName = model.getDcName();
this.org = model.getOrg();
this.tag = model.getTag();
this.az = model.getAz();
this.activeInputFlow = model.getActiveInputFlow() + dcClusterShard.getValue().getInputFlow();
this.totalInputFlow = model.getTotalInputFlow() + dcClusterShard.getValue().getInputFlow();
Expand All @@ -80,19 +83,6 @@ public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entr
this.diskType = model.getDiskType();
}

public void addModel(KeeperContainerUsedInfoModel model) {
if (!Objects.equals(model.keeperIp, keeperIp) || !Objects.equals(model.dcName, dcName) || !Objects.equals(model.org, org) || !Objects.equals(model.az, az)) {
return;
}
activeInputFlow += model.activeInputFlow;
totalInputFlow += model.totalInputFlow;
activeRedisUsedMemory += model.activeRedisUsedMemory;
totalRedisUsedMemory += model.totalRedisUsedMemory;
activeKeeperCount += model.activeKeeperCount;
totalKeeperCount += model.totalKeeperCount;
detailInfo.putAll(model.detailInfo);
}

public static KeeperContainerUsedInfoModel cloneKeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model) {
KeeperContainerUsedInfoModel newModel = new KeeperContainerUsedInfoModel();
newModel.setKeeperIp(model.getKeeperIp());
Expand Down Expand Up @@ -157,6 +147,15 @@ public KeeperContainerUsedInfoModel setOrg(String org) {
return this;
}

public String getTag() {
return tag;
}

public KeeperContainerUsedInfoModel setTag(String tag) {
this.tag = tag;
return this;
}

public String getAz() {
return az;
}
Expand Down Expand Up @@ -323,8 +322,9 @@ public String toString() {
"keeperIp='" + keeperIp + '\'' +
", dcName='" + dcName + '\'' +
", org='" + org + '\'' +
", tag='" + tag + '\'' +
", az='" + az + '\'' +
", updateTime=" + updateTime +
", updateTime='" + updateTime + '\'' +
", activeInputFlow=" + activeInputFlow +
", totalInputFlow=" + totalInputFlow +
", inputFlowStandard=" + inputFlowStandard +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,47 @@ public void testDelayOkAndInstanceUpAfterLoading() {
assertEquals(HEALTHY, healthStatus.getState());
}

@Test
public void testPingDownWithNegativeDistance() {
when(config.pingDownAfterMilli()).thenReturn(40);
when(config.getDelayConfig(any(),any(),any())).thenReturn(
new DelayConfig("test","test","test")
.setClusterLevelHealthyDelayMilli(-20).setClusterLevelDelayDownAfterMilli(-40));

healthStatus.pongInit();
sleep(40 * 2 + 5);
healthStatus.healthStatusUpdate();
Assert.assertSame(healthStatus.getState(), HEALTH_STATE.DOWN);
}

@Test
public void testDelayDown() {
when(config.pingDownAfterMilli()).thenReturn(1000);
when(config.getDelayConfig(any(),any(),any())).thenReturn(
new DelayConfig("test","test","test")
.setClusterLevelHealthyDelayMilli(20).setClusterLevelDelayDownAfterMilli(40));

healthStatus.pong();
healthStatus.delay(1);
sleep(40 * 2 + 5);
healthStatus.healthStatusUpdate();
Assert.assertSame(HEALTH_STATE.SICK, healthStatus.getState());
}

@Test
public void testDelayDownWithNegativeDistance() {
when(config.pingDownAfterMilli()).thenReturn(1000);
when(config.getDelayConfig(any(),any(),any())).thenReturn(
new DelayConfig("test","test","test")
.setClusterLevelHealthyDelayMilli(-20).setClusterLevelDelayDownAfterMilli(-40));

healthStatus.pong();
healthStatus.delay(1);
sleep(40 * 2 + 5);
healthStatus.healthStatusUpdate();
Assert.assertSame(HEALTHY, healthStatus.getState());
}

private void markup() {
healthStatus.pong();
healthStatus.delay(config.getDelayConfig("test","test","test").getClusterLevelHealthyDelayMilli()/2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public void beforeHealthStatusTest() {
config = mock(HealthCheckConfig.class);
when(config.getDelayConfig(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(
new DelayConfig("test", "test", "test").
setDcLevelHealthyDelayMilli(2000).setClusterLevelHealthyDelayMilli(-2000).
setDcLevelDelayDownAfterMilli(2000 * 8).setClusterLevelDelayDownAfterMilli(-2000 * 8));
setDcLevelHealthyDelayMilli(2000).setClusterLevelHealthyDelayMilli(2000).
setDcLevelDelayDownAfterMilli(2000 * 8).setClusterLevelDelayDownAfterMilli(2000 * 8));
when(instance.getHealthCheckConfig()).thenReturn(config);
healthStatus = new HeteroHealthStatus(instance, scheduled);

Expand All @@ -65,7 +65,8 @@ public void testStateSwitchFromUnknowToUp() {
@Test
public void testInstanceLongDelay() throws InterruptedException, TimeoutException {
when(config.getDelayConfig(Mockito.any(),Mockito.any(),Mockito.any())).thenReturn(
new DelayConfig("test","test","test").setDcLevelHealthyDelayMilli(200));
new DelayConfig("test","test","test")
.setDcLevelHealthyDelayMilli(200).setClusterLevelHealthyDelayMilli(200));
when(config.instanceLongDelayMilli()).thenReturn(300);
when(config.checkIntervalMilli()).thenReturn(100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class KeeperContainerCreateInfo extends AbstractCreateInfo {
@Nullable
private String orgName;

private String tag;

private boolean active;

private String azName;
Expand Down Expand Up @@ -101,6 +103,7 @@ public String toString() {
", keepercontainerPort=" + keepercontainerPort +
", keepercontainerOrgId=" + keepercontainerOrgId +
", orgName='" + orgName + '\'' +
", tag='" + tag + '\'' +
", active=" + active +
", azName='" + azName + '\'' +
", diskType='" + diskType + '\'' +
Expand All @@ -116,6 +119,15 @@ public KeeperContainerCreateInfo setOrgName(String orgName) {
return this;
}

public String getTag() {
return tag;
}

public KeeperContainerCreateInfo setTag(String tag) {
this.tag = tag;
return this;
}

public String getDiskType() {
return diskType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ public RetMessage updateKeeperContainer(@RequestBody KeeperContainerInfoModel ke
}
}

@RequestMapping(value = {"/keepercontainers/dc/{dcName}/az/{azName}/org/{orgName}",
"/keepercontainers/dc/{dcName}/az/{azName}/org",
"/keepercontainers/dc/{dcName}/az/org/{orgName}",
"/keepercontainers/dc/{dcName}/az/org"},
method = RequestMethod.GET)
public List<KeeperContainerInfoModel> getAvailableKeeperContainersByDcAzAndOrg(@PathVariable String dcName,
@PathVariable(required = false) String azName, @PathVariable(required = false) String orgName) {
return keeperContainerService.findAvailableKeeperContainerInfoModelsByDcAzAndOrg(dcName, azName, orgName);
@RequestMapping(value = "/keepercontainers/dc/az/org/tag", method = RequestMethod.GET)
public List<KeeperContainerInfoModel> getAvailableKeeperContainersByDcAzAndOrg(
@RequestParam String dcName,
@RequestParam(required = false) String azName,
@RequestParam(required = false) String orgName,
@RequestParam(required = false) String tag) {
return keeperContainerService.findAvailableKeeperContainerInfoModelsByDcAzOrgAndTag(dcName, azName, orgName, tag);
}

@RequestMapping(value = "/keepercontainers/overload/all", method = RequestMethod.GET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ private KeeperContainerUsedInfoModel getKeeperBasicModel(String ip, String dcNam
if (organizationTbl != null) {
model.setOrg(organizationTbl.getOrgName());
}
model.setTag(keepercontainerTbl.getTag());
try {
KeeperDiskInfo keeperDiskInfo = keeperContainerDiskInfoCollector.getKeeperDiskInfo(ip);
if (keeperDiskInfo == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedIn
KeeperContainerUsedInfoModel target = queue.poll();
if ((Objects.equals(srcKeeper.getOrg(), target.getOrg()))
&& (Objects.equals(srcKeeper.getAz(), target.getAz()))
&& (Objects.equals(srcKeeper.getTag(), target.getTag()))
&& !Objects.equals(target.getKeeperIp(), srcKeeperPair.getKeeperIp())
&& ((!isMigrateShardBackUp && filterChain.canMigrate(dcClusterShard, srcKeeperPair, target, this))
|| (isMigrateShardBackUp && !filterChain.isMigrateKeeperPairOverload(dcClusterShard, srcKeeperPair, target, this)))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class KeeperContainerInfoModel implements Serializable {
private HostPort addr;
private String dcName;
private String orgName;
private String tag;
private String azName;
private long keeperCount;
private long shardCount;
Expand Down Expand Up @@ -79,6 +80,14 @@ public void setOrgName(String orgName) {
this.orgName = orgName;
}

public String getTag() {
return tag;
}

public void setTag(String tag) {
this.tag = tag;
}

public String getAzName() {
return azName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface KeeperContainerService {

List<KeeperContainerInfoModel> findAllInfos();
KeeperContainerInfoModel findKeeperContainerInfoModelById(long id);
List<KeeperContainerInfoModel> findAvailableKeeperContainerInfoModelsByDcAzAndOrg(String dcName, String azName, String orgName);
List<KeeperContainerInfoModel> findAvailableKeeperContainerInfoModelsByDcAzOrgAndTag(String dcName, String azName, String orgName, String tag);

void addKeeperContainer(KeeperContainerCreateInfo createInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ public void updateCluster(String clusterName, ClusterModel cluster) {
// organization info should not be updated by cluster,
// it's automatically updated by scheduled task
proto.setOrganizationInfo(null);
proto.setTag(cluster.getClusterTbl().getTag() == null ? "" : cluster.getClusterTbl().getTag());

clusterDao.updateCluster(proto);
}
Expand Down
Loading

0 comments on commit e62ccac

Please sign in to comment.