Skip to content

Commit

Permalink
optimized sentinel reset
Browse files Browse the repository at this point in the history
  • Loading branch information
llj李龙姣 committed May 22, 2024
1 parent e0eea9e commit 9ad4cc5
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void go() throws Exception {
chain.add(new AnalyseHellos(context, checkerConfig));
chain.add(new AcquireLeakyBucket(context, leakyBucket));
chain.add(new DeleteSentinels(context, sentinelManager));
chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager));
chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager, checkerConfig));
chain.add(new AddSentinels(context, sentinelManager, checkerConfig));
chain.add(new SetSentinels(context, sentinelManager));
chain.execute().addListener(commandFuture -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@
import com.ctrip.xpipe.monitor.CatEventMonitor;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.SentinelManager;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHello;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
import com.ctrip.xpipe.redis.core.protocal.pojo.MasterRole;
import com.ctrip.xpipe.redis.core.protocal.pojo.Role;
import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel;
import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole;
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.ObjectUtils;

import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE;

Expand All @@ -29,16 +36,24 @@ public class ResetSentinels extends AbstractSentinelHelloCollectCommand {
private XpipeNettyClientKeyedObjectPool keyedObjectPool;
private ScheduledExecutorService scheduled;
private ExecutorService resetExecutor;
private CheckerConfig checkerConfig;
private RateLimiter rateLimiter;

public ResetSentinels(SentinelHelloCollectContext context, MetaCache metaCache,
XpipeNettyClientKeyedObjectPool keyedObjectPool,
ScheduledExecutorService scheduled, ExecutorService resetExecutor,SentinelManager sentinelManager) {
ScheduledExecutorService scheduled, ExecutorService resetExecutor, SentinelManager sentinelManager,
CheckerConfig checkerConfig) {
super(context);
this.metaCache = metaCache;
this.keyedObjectPool = keyedObjectPool;
this.scheduled = scheduled;
this.resetExecutor = resetExecutor;
this.sentinelManager = sentinelManager;
this.checkerConfig = checkerConfig;
int sentinelQuorum = checkerConfig.getDefaultSentinelQuorumConfig().getQuorum();
double checkIntervalInSec = (double) checkerConfig.getSentinelCheckIntervalMilli() / 1000;
double rate = (double) (sentinelQuorum - 1) / checkIntervalInSec;
this.rateLimiter = RateLimiter.create(rate);
}

@Override
Expand All @@ -50,6 +65,12 @@ protected void doExecute() throws Throwable {
protected void checkReset(String clusterId, String shardId, String sentinelMonitorName, Set<SentinelHello> hellos) {
if (hellos.isEmpty())
return;

if (hellos.size() < checkerConfig.getDefaultSentinelQuorumConfig().getTotal()) {
logger.warn("[{}-{}+{}]sentinel missing, ignore reset, hellos:{}", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), hellos);
return;
}

ParallelCommandChain resetChain = new ParallelCommandChain(resetExecutor, false);
for (SentinelHello hello : hellos) {
resetChain.add(new ResetSentinel(clusterId, shardId, hello, sentinelMonitorName));
Expand Down Expand Up @@ -84,70 +105,100 @@ private Set<HostPort> tooManyKeepers(List<HostPort> slaves) {
}


private Set<HostPort> unknownInstances(List<HostPort> slaves, String clusterId, String shardId){
Set<HostPort> unknownInstances = new HashSet<>();
for (HostPort currentSlave : slaves) {
Pair<String, String> clusterShard = metaCache.findClusterShard(currentSlave);
if (clusterShard == null || !ObjectUtils.equals(clusterId, clusterShard.getKey()) || !ObjectUtils.equals(shardId, clusterShard.getValue()))
unknownInstances.add(currentSlave);
}
private Set<HostPort> unknownInstances(List<HostPort> slaves, String clusterId, String shardId) {
Set<HostPort> unknownInstances = Sets.newHashSet(slaves);
unknownInstances.removeAll(context.getShardInstances());
slaves.removeAll(unknownInstances);
return unknownInstances;
}

private Set<HostPort> connectedToTrueMaster(Set<HostPort> invalidSlaves) {
Map<HostPort, SlaveRole> connectedSlaves = new ConcurrentHashMap<>();

ParallelCommandChain slaveRoleChain = new ParallelCommandChain();
for (HostPort hostPort : invalidSlaves) {
RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled);
roleCommand.future().addListener(future -> {
if (future.isSuccess()) {
Role role = future.get();
if (role instanceof SlaveRole) {
SlaveRole slaveRole = (SlaveRole) role;
HostPort trueMaster = context.getTrueMasterInfo().getKey();
if (slaveRole.getMasterHost().equals(trueMaster.getHost()) && slaveRole.getMasterPort() == trueMaster.getPort()) {
connectedSlaves.put(hostPort, slaveRole);
}
}
}
});
slaveRoleChain.add(roleCommand);
}
private static final int EXPECTED_KEEPER_COUNT = 1;
private static final int EXPECTED_MASTER_COUNT = 1;
boolean shouldReset(List<HostPort> slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) {
Set<HostPort> toManyKeepers = tooManyKeepers(slaves);
Set<HostPort> unknownSlaves = unknownInstances(slaves, clusterId, shardId);

try {
slaveRoleChain.execute().get(1500, TimeUnit.MILLISECONDS);
} catch (Throwable th) {
logger.warn("[{}-{}+{}]parallel role command to slaves error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th);
}
if (toManyKeepers.isEmpty() && unknownSlaves.isEmpty())
return false;

if (!sentinelHasAllSlaves(slaves)) {
logger.info("[{}-{}+{}][reset]{}, {}, some slaves not found in sentinel, stop reset, sentinel slaves: {}, meta instances: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, slaves, context.getShardInstances());
return false;
}

return connectedSlaves.keySet();
}
List<HostPort> masterSlaves = masterSlaves();
if (!masterHasAllSlaves(masterSlaves,clusterId, shardId, sentinelMonitorName, sentinelAddr))
return false;

private static final int EXPECTED_KEEPER_COUNT = 1;
boolean shouldReset(List<HostPort> slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) {
Set<HostPort> toManyKeepers = tooManyKeepers(slaves);
if (shouldResetTooManyKeepers(toManyKeepers)) {
if (shouldResetTooManyKeepers(masterSlaves, toManyKeepers)) {
logger.info("[{}-{}+{}][reset]{}, {}, too many keepers: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, toManyKeepers);
return true;
}

Set<HostPort> unknownSlaves = unknownInstances(slaves, clusterId, shardId);
if (shouldResetUnknownInstances(unknownSlaves)) {
if (shouldResetUnknownInstances(masterSlaves, unknownSlaves)) {
logger.info("[{}-{}+{}][reset]{}, {}, unknown slaves: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, unknownSlaves);
return true;
}

return false;
}

private boolean shouldResetTooManyKeepers(Set<HostPort> toManyKeepers) {
return !toManyKeepers.isEmpty() && connectedToTrueMaster(toManyKeepers).size() <= EXPECTED_KEEPER_COUNT;
private boolean sentinelHasAllSlaves(List<HostPort> sentinelSlaves) {
if (sentinelSlaves.isEmpty())
return false;

Set<HostPort> shardInstances = Sets.newHashSet(context.getShardInstances());
shardInstances.removeAll(sentinelSlaves);
return shardInstances.size() == EXPECTED_MASTER_COUNT;
}

private boolean shouldResetTooManyKeepers(List<HostPort> masterSlaves, Set<HostPort> toManyKeepers) {
if(toManyKeepers.isEmpty())
return false;

Set<HostPort> slaves = Sets.newHashSet(masterSlaves);
slaves.retainAll(toManyKeepers);

return slaves.size() <= EXPECTED_KEEPER_COUNT;
}

private boolean shouldResetUnknownInstances(List<HostPort> masterSlaves, Set<HostPort> unknownSlaves) {
if(unknownSlaves.isEmpty())
return false;

Set<HostPort> slaves = Sets.newHashSet(masterSlaves);
slaves.retainAll(unknownSlaves);

return slaves.isEmpty();
}

private List<HostPort> masterSlaves() {
HostPort master = context.getTrueMasterInfo().getKey();
RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(master.getHost(), master.getPort())), scheduled);

try {
Role role = roleCommand.execute().get(660, TimeUnit.MILLISECONDS);
if (role instanceof MasterRole) {
MasterRole masterRole = (MasterRole) role;
return masterRole.getSlaveHostPorts();
}
} catch (Throwable th) {
logger.warn("[{}-{}+{}]get slaves from master failed", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th);
}

return new ArrayList<>();
}

private boolean shouldResetUnknownInstances(Set<HostPort> unknownSlaves) {
return !unknownSlaves.isEmpty() && connectedToTrueMaster(unknownSlaves).isEmpty();
private boolean masterHasAllSlaves(List<HostPort> masterSlaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) {
HostPort master = context.getTrueMasterInfo().getKey();
Set<HostPort> masterAndSlaves = Sets.newHashSet(masterSlaves);
masterAndSlaves.add(master);

boolean masterHasAllSlaves = masterAndSlaves.containsAll(context.getShardInstances());
if (!masterHasAllSlaves) {
logger.info("[{}-{}+{}][reset]{}, {}, master:{} lost connection with some slaves, stop reset, current slaves:{}, expected slaves:{}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, master, masterSlaves, context.getShardInstances());
}
return masterHasAllSlaves;
}

class ResetSentinel extends AbstractCommand<Void> {
Expand Down Expand Up @@ -182,12 +233,17 @@ protected void doExecute() throws Throwable {
if (slaves.isEmpty())
return;


if (shouldReset(slaves, clusterId, shardId, sentinelMonitorName, sentinelAddr)) {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName);
sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> {
logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable);
return null;
});
if (rateLimiter.tryAcquire()) {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName);
sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> {
logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable);
return null;
});
} else {
logger.warn("[{}-{}][reset]try to reset sentinel {} failed, rate limit", LOG_TITLE, sentinelMonitorName, sentinel);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public List<HostPort> getShardInstances() {
}

public SentinelHelloCollectContext setShardInstances(List<HostPort> shardInstances) {
this.shardInstances = shardInstances;return this;
this.shardInstances = shardInstances;
return this;
}

public Map<ClusterType, String[]> getClusterTypeSentinelConfig() {
Expand Down
Loading

0 comments on commit 9ad4cc5

Please sign in to comment.