Skip to content

Commit

Permalink
optimized sentinel reset
Browse files Browse the repository at this point in the history
  • Loading branch information
llj李龙姣 committed May 23, 2024
1 parent 9ad4cc5 commit 55bd332
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.ctrip.xpipe.redis.core.protocal.pojo.Role;
import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -26,6 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE;

Expand All @@ -37,7 +37,7 @@ public class ResetSentinels extends AbstractSentinelHelloCollectCommand {
private ScheduledExecutorService scheduled;
private ExecutorService resetExecutor;
private CheckerConfig checkerConfig;
private RateLimiter rateLimiter;
private AtomicInteger resetSentinelCounts = new AtomicInteger();

public ResetSentinels(SentinelHelloCollectContext context, MetaCache metaCache,
XpipeNettyClientKeyedObjectPool keyedObjectPool,
Expand All @@ -50,10 +50,6 @@ public ResetSentinels(SentinelHelloCollectContext context, MetaCache metaCache,
this.resetExecutor = resetExecutor;
this.sentinelManager = sentinelManager;
this.checkerConfig = checkerConfig;
int sentinelQuorum = checkerConfig.getDefaultSentinelQuorumConfig().getQuorum();
double checkIntervalInSec = (double) checkerConfig.getSentinelCheckIntervalMilli() / 1000;
double rate = (double) (sentinelQuorum - 1) / checkIntervalInSec;
this.rateLimiter = RateLimiter.create(rate);
}

@Override
Expand Down Expand Up @@ -105,7 +101,7 @@ private Set<HostPort> tooManyKeepers(List<HostPort> slaves) {
}


private Set<HostPort> unknownInstances(List<HostPort> slaves, String clusterId, String shardId) {
private Set<HostPort> unknownInstances(List<HostPort> slaves) {
Set<HostPort> unknownInstances = Sets.newHashSet(slaves);
unknownInstances.removeAll(context.getShardInstances());
slaves.removeAll(unknownInstances);
Expand All @@ -116,7 +112,7 @@ private Set<HostPort> unknownInstances(List<HostPort> slaves, String clusterId,
private static final int EXPECTED_MASTER_COUNT = 1;
boolean shouldReset(List<HostPort> slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) {
Set<HostPort> toManyKeepers = tooManyKeepers(slaves);
Set<HostPort> unknownSlaves = unknownInstances(slaves, clusterId, shardId);
Set<HostPort> unknownSlaves = unknownInstances(slaves);

if (toManyKeepers.isEmpty() && unknownSlaves.isEmpty())
return false;
Expand Down Expand Up @@ -235,7 +231,7 @@ protected void doExecute() throws Throwable {


if (shouldReset(slaves, clusterId, shardId, sentinelMonitorName, sentinelAddr)) {
if (rateLimiter.tryAcquire()) {
if (resetSentinelCounts.incrementAndGet() < checkerConfig.getDefaultSentinelQuorumConfig().getQuorum()) {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName);
sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> {
logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.meta.QuorumConfig;
import com.ctrip.xpipe.simpleserver.Server;
import com.ctrip.xpipe.tuple.Pair;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -46,8 +45,6 @@ public class ResetSentinelsTest extends AbstractCheckerTest {

@Before
public void init() throws Exception {
when(checkerConfig.getSentinelCheckIntervalMilli()).thenReturn(15000);
when(checkerConfig.getDefaultSentinelQuorumConfig()).thenReturn(new QuorumConfig());
resetSentinels = new ResetSentinels(new SentinelHelloCollectContext(), metaCache,
keyedObjectPool, scheduled, resetExecutor,sentinelManager,checkerConfig);
resetSentinels.setKeyedObjectPool(getXpipeNettyClientKeyedObjectPool()).setScheduled(scheduled);
Expand Down

0 comments on commit 55bd332

Please sign in to comment.