diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java index 9f8276c5f8..177fc0bc17 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java @@ -237,7 +237,7 @@ public void go() throws Exception { chain.add(new AnalyseHellos(context, checkerConfig)); chain.add(new AcquireLeakyBucket(context, leakyBucket)); chain.add(new DeleteSentinels(context, sentinelManager)); - chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager)); + chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager, checkerConfig)); chain.add(new AddSentinels(context, sentinelManager, checkerConfig)); chain.add(new SetSentinels(context, sentinelManager)); chain.execute().addListener(commandFuture -> { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java index 6eb9a14c4b..7b33cdd7a3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java @@ -8,17 +8,24 @@ import com.ctrip.xpipe.monitor.CatEventMonitor; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.SentinelManager; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHello; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand; +import com.ctrip.xpipe.redis.core.protocal.pojo.MasterRole; import com.ctrip.xpipe.redis.core.protocal.pojo.Role; import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel; -import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole; -import com.ctrip.xpipe.tuple.Pair; -import com.ctrip.xpipe.utils.ObjectUtils; - -import java.util.*; -import java.util.concurrent.*; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.RateLimiter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE; @@ -29,16 +36,24 @@ public class ResetSentinels extends AbstractSentinelHelloCollectCommand { private XpipeNettyClientKeyedObjectPool keyedObjectPool; private ScheduledExecutorService scheduled; private ExecutorService resetExecutor; + private CheckerConfig checkerConfig; + private RateLimiter rateLimiter; public ResetSentinels(SentinelHelloCollectContext context, MetaCache metaCache, XpipeNettyClientKeyedObjectPool keyedObjectPool, - ScheduledExecutorService scheduled, ExecutorService resetExecutor,SentinelManager sentinelManager) { + ScheduledExecutorService scheduled, ExecutorService resetExecutor, SentinelManager sentinelManager, + CheckerConfig checkerConfig) { super(context); this.metaCache = metaCache; this.keyedObjectPool = keyedObjectPool; this.scheduled = scheduled; this.resetExecutor = resetExecutor; this.sentinelManager = sentinelManager; + this.checkerConfig = checkerConfig; + int sentinelQuorum = checkerConfig.getDefaultSentinelQuorumConfig().getQuorum(); + double checkIntervalInSec = (double) checkerConfig.getSentinelCheckIntervalMilli() / 1000; + double rate = (double) (sentinelQuorum - 1) / checkIntervalInSec; + this.rateLimiter = RateLimiter.create(rate); } @Override @@ -50,6 +65,12 @@ protected void doExecute() throws Throwable { protected void checkReset(String clusterId, String shardId, String sentinelMonitorName, Set hellos) { if (hellos.isEmpty()) return; + + if (hellos.size() < checkerConfig.getDefaultSentinelQuorumConfig().getTotal()) { + logger.warn("[{}-{}+{}]sentinel missing, ignore reset, hellos:{}", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), hellos); + return; + } + ParallelCommandChain resetChain = new ParallelCommandChain(resetExecutor, false); for (SentinelHello hello : hellos) { resetChain.add(new ResetSentinel(clusterId, shardId, hello, sentinelMonitorName)); @@ -84,57 +105,37 @@ private Set tooManyKeepers(List slaves) { } - private Set unknownInstances(List slaves, String clusterId, String shardId){ - Set unknownInstances = new HashSet<>(); - for (HostPort currentSlave : slaves) { - Pair clusterShard = metaCache.findClusterShard(currentSlave); - if (clusterShard == null || !ObjectUtils.equals(clusterId, clusterShard.getKey()) || !ObjectUtils.equals(shardId, clusterShard.getValue())) - unknownInstances.add(currentSlave); - } + private Set unknownInstances(List slaves, String clusterId, String shardId) { + Set unknownInstances = Sets.newHashSet(slaves); + unknownInstances.removeAll(context.getShardInstances()); + slaves.removeAll(unknownInstances); return unknownInstances; } - private Set connectedToTrueMaster(Set invalidSlaves) { - Map connectedSlaves = new ConcurrentHashMap<>(); - - ParallelCommandChain slaveRoleChain = new ParallelCommandChain(); - for (HostPort hostPort : invalidSlaves) { - RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled); - roleCommand.future().addListener(future -> { - if (future.isSuccess()) { - Role role = future.get(); - if (role instanceof SlaveRole) { - SlaveRole slaveRole = (SlaveRole) role; - HostPort trueMaster = context.getTrueMasterInfo().getKey(); - if (slaveRole.getMasterHost().equals(trueMaster.getHost()) && slaveRole.getMasterPort() == trueMaster.getPort()) { - connectedSlaves.put(hostPort, slaveRole); - } - } - } - }); - slaveRoleChain.add(roleCommand); - } + private static final int EXPECTED_KEEPER_COUNT = 1; + private static final int EXPECTED_MASTER_COUNT = 1; + boolean shouldReset(List slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) { + Set toManyKeepers = tooManyKeepers(slaves); + Set unknownSlaves = unknownInstances(slaves, clusterId, shardId); - try { - slaveRoleChain.execute().get(1500, TimeUnit.MILLISECONDS); - } catch (Throwable th) { - logger.warn("[{}-{}+{}]parallel role command to slaves error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th); - } + if (toManyKeepers.isEmpty() && unknownSlaves.isEmpty()) + return false; + if (!sentinelHasAllSlaves(slaves)) { + logger.info("[{}-{}+{}][reset]{}, {}, some slaves not found in sentinel, stop reset, sentinel slaves: {}, meta instances: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, slaves, context.getShardInstances()); + return false; + } - return connectedSlaves.keySet(); - } + List masterSlaves = masterSlaves(); + if (!masterHasAllSlaves(masterSlaves,clusterId, shardId, sentinelMonitorName, sentinelAddr)) + return false; - private static final int EXPECTED_KEEPER_COUNT = 1; - boolean shouldReset(List slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) { - Set toManyKeepers = tooManyKeepers(slaves); - if (shouldResetTooManyKeepers(toManyKeepers)) { + if (shouldResetTooManyKeepers(masterSlaves, toManyKeepers)) { logger.info("[{}-{}+{}][reset]{}, {}, too many keepers: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, toManyKeepers); return true; } - Set unknownSlaves = unknownInstances(slaves, clusterId, shardId); - if (shouldResetUnknownInstances(unknownSlaves)) { + if (shouldResetUnknownInstances(masterSlaves, unknownSlaves)) { logger.info("[{}-{}+{}][reset]{}, {}, unknown slaves: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, unknownSlaves); return true; } @@ -142,12 +143,62 @@ boolean shouldReset(List slaves, String clusterId, String shardId, Str return false; } - private boolean shouldResetTooManyKeepers(Set toManyKeepers) { - return !toManyKeepers.isEmpty() && connectedToTrueMaster(toManyKeepers).size() <= EXPECTED_KEEPER_COUNT; + private boolean sentinelHasAllSlaves(List sentinelSlaves) { + if (sentinelSlaves.isEmpty()) + return false; + + Set shardInstances = Sets.newHashSet(context.getShardInstances()); + shardInstances.removeAll(sentinelSlaves); + return shardInstances.size() == EXPECTED_MASTER_COUNT; + } + + private boolean shouldResetTooManyKeepers(List masterSlaves, Set toManyKeepers) { + if(toManyKeepers.isEmpty()) + return false; + + Set slaves = Sets.newHashSet(masterSlaves); + slaves.retainAll(toManyKeepers); + + return slaves.size() <= EXPECTED_KEEPER_COUNT; + } + + private boolean shouldResetUnknownInstances(List masterSlaves, Set unknownSlaves) { + if(unknownSlaves.isEmpty()) + return false; + + Set slaves = Sets.newHashSet(masterSlaves); + slaves.retainAll(unknownSlaves); + + return slaves.isEmpty(); + } + + private List masterSlaves() { + HostPort master = context.getTrueMasterInfo().getKey(); + RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(master.getHost(), master.getPort())), scheduled); + + try { + Role role = roleCommand.execute().get(660, TimeUnit.MILLISECONDS); + if (role instanceof MasterRole) { + MasterRole masterRole = (MasterRole) role; + return masterRole.getSlaveHostPorts(); + } + } catch (Throwable th) { + logger.warn("[{}-{}+{}]get slaves from master failed", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th); + } + + return new ArrayList<>(); } - private boolean shouldResetUnknownInstances(Set unknownSlaves) { - return !unknownSlaves.isEmpty() && connectedToTrueMaster(unknownSlaves).isEmpty(); + private boolean masterHasAllSlaves(List masterSlaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) { + HostPort master = context.getTrueMasterInfo().getKey(); + Set masterAndSlaves = Sets.newHashSet(masterSlaves); + masterAndSlaves.add(master); + + boolean masterHasAllSlaves = masterAndSlaves.containsAll(context.getShardInstances()); + if (!masterHasAllSlaves) { + logger.info("[{}-{}+{}][reset]{}, {}, master:{} lost connection with some slaves, stop reset, current slaves:{}, expected slaves:{}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, master, masterSlaves, context.getShardInstances()); + } + return masterHasAllSlaves; } class ResetSentinel extends AbstractCommand { @@ -182,12 +233,17 @@ protected void doExecute() throws Throwable { if (slaves.isEmpty()) return; + if (shouldReset(slaves, clusterId, shardId, sentinelMonitorName, sentinelAddr)) { - CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName); - sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> { - logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable); - return null; - }); + if (rateLimiter.tryAcquire()) { + CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName); + sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> { + logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable); + return null; + }); + } else { + logger.warn("[{}-{}][reset]try to reset sentinel {} failed, rate limit", LOG_TITLE, sentinelMonitorName, sentinel); + } } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java index 619c672d2e..992a78c5bc 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java @@ -117,7 +117,8 @@ public List getShardInstances() { } public SentinelHelloCollectContext setShardInstances(List shardInstances) { - this.shardInstances = shardInstances;return this; + this.shardInstances = shardInstances; + return this; } public Map getClusterTypeSentinelConfig() { diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java index f9a24254c6..854e69d83b 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java @@ -1,12 +1,13 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.collector.command; -import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.AbstractCheckerTest; import com.ctrip.xpipe.redis.checker.SentinelManager; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; import com.ctrip.xpipe.redis.core.meta.MetaCache; +import com.ctrip.xpipe.redis.core.meta.QuorumConfig; import com.ctrip.xpipe.simpleserver.Server; import com.ctrip.xpipe.tuple.Pair; import com.google.common.collect.Lists; @@ -39,142 +40,191 @@ public class ResetSentinelsTest extends AbstractCheckerTest { private ScheduledExecutorService scheduled; @Mock private ExecutorService resetExecutor; + @Mock + private CheckerConfig checkerConfig; @Before public void init() throws Exception { + when(checkerConfig.getSentinelCheckIntervalMilli()).thenReturn(15000); + when(checkerConfig.getDefaultSentinelQuorumConfig()).thenReturn(new QuorumConfig()); resetSentinels = new ResetSentinels(new SentinelHelloCollectContext(), metaCache, - keyedObjectPool, scheduled, resetExecutor,sentinelManager); + keyedObjectPool, scheduled, resetExecutor,sentinelManager,checkerConfig); resetSentinels.setKeyedObjectPool(getXpipeNettyClientKeyedObjectPool()).setScheduled(scheduled); } @Test - public void testTooManyKeepers() throws Exception{ + public void noInvalidSlaves() throws Exception { RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort("localhost", 6379), new ArrayList<>()))); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380)))); -// sentinelManager.slaves when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - // 1、command failed - boolean shouldReset= resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 6380)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertFalse(shouldReset); + } - // 2、some keepers unreachable - Server activeKeeper0 = startServer(8000,"*5\r\n" - + "$6\r\nkeeper\r\n" - + "$9\r\nlocalhost\r\n" - + ":6379\r\n" - + "$9\r\nconnected\r\n" - + ":477\r\n"); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + @Test + public void sentinelLostSlaves() throws Exception { + RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - // 3、invalid keeper not connected to master - Server activeKeeper1 = startServer(8001,"*5\r\n" - + "$6\r\nkeeper\r\n" - + "$10\r\nlocalhost2\r\n" - + ":6379\r\n" - + "$9\r\nconnected\r\n" - + ":477\r\n"); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - // 4、invalid keeper connected to master - activeKeeper1.stop(); - Server activeKeeper2 = startServer(8002,"*5\r\n" - + "$6\r\nkeeper\r\n" - + "$9\r\nlocalhost\r\n" - + ":6379\r\n" - + "$9\r\nconnected\r\n" - + ":477\r\n"); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8002)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 6380)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertFalse(shouldReset); - activeKeeper2.stop(); - activeKeeper0.stop(); } @Test - public void testOneWayReset() throws Exception { + public void masterLostSlaves() throws Exception { RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort("localhost", 6380), new ArrayList<>()))); -// sentinelManager.slaves - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - HostPort wrongSlave = new HostPort("otherClusterShardSlave", 6379); - when(metaCache.findClusterShard(wrongSlave)).thenReturn(new Pair<>("otherCluster", "otherShard")); - boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), wrongSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - Server unknownSlaveServer = startServer(8002,"*5\r\n" - + "$5\r\nslave\r\n" - + "$9\r\nlocalhost\r\n" - + ":6380\r\n" - + "$9\r\nconnected\r\n" - + ":477\r\n"); - HostPort unknownConnectedSlave = new HostPort(LOCAL_HOST, unknownSlaveServer.getPort()); - when(metaCache.findClusterShard(unknownConnectedSlave)).thenReturn(null); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), unknownConnectedSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + startServer(6379,"*3\r\n" + + "$6\r\nmaster\r\n" + + ":43\r\n" + + "*2\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6380\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n8000\r\n" + + "$1\r\n0\r\n"); + + + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertFalse(shouldReset); - unknownSlaveServer.stop(); + } - HostPort unknownUnreachableSlave = new HostPort(LOCAL_HOST, 8003); - when(metaCache.findClusterShard(unknownUnreachableSlave)).thenReturn(null); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), unknownUnreachableSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + @Test + public void testTooManyKeepers() throws Exception { + RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - Server unknownAndConnectedToOtherMasterSlaveServer = startServer(8004,"*5\r\n" - + "$5\r\nslave\r\n" - + "$9\r\nlocalhost\r\n" - + ":6381\r\n" - + "$9\r\nconnected\r\n" - + ":477\r\n"); - HostPort unknownAndConnectedToOtherMasterSlave = new HostPort(LOCAL_HOST, unknownAndConnectedToOtherMasterSlaveServer.getPort()); - when(metaCache.findClusterShard(unknownAndConnectedToOtherMasterSlave)).thenReturn(null); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), unknownAndConnectedToOtherMasterSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - HostPort trueSlave = new HostPort(LOCAL_HOST, 6379); - when(metaCache.findClusterShard(trueSlave)).thenReturn(new Pair<>("cluster", "shard")); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), trueSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertFalse(shouldReset); + //not in master + Server master = startServer(6379, "*3\r\n" + + "$6\r\nmaster\r\n" + + ":43\r\n" + + "*3\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6380\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n8000\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6381\r\n" + + "$1\r\n0\r\n"); - } + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); + master.stop(); - @Test - public void testNotOneWayReset() throws Exception { - RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort(),ClusterType.CROSS_DC); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort("localhost", 6380), new ArrayList<>()))); + //in master + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6382), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6382), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - HostPort trueSlave = new HostPort(LOCAL_HOST, 6379); - when(metaCache.findClusterShard(trueSlave)).thenReturn(new Pair<>("cluster", "shard")); - boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Server master2 = startServer(6382, "*3\r\n" + + "$6\r\nmaster\r\n" + + ":43\r\n" + + "*4\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6380\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n8000\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6381\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n8001\r\n" + + "$1\r\n0\r\n"); + + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertFalse(shouldReset); + master2.stop(); + } - HostPort wrongSlave = new HostPort("otherClusterShardSlave", 6379); - when(metaCache.findClusterShard(wrongSlave)).thenReturn(new Pair<>("otherCluster", "otherShard")); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave, wrongSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertTrue(shouldReset); + @Test + public void unknownSlaves() throws Exception { + RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - Server unknownActiveSlaveServer = startServer(randomPort(),"*5\r\n" - + "$5\r\nslave\r\n" - + "$9\r\nlocalhost\r\n" - + ":6380\r\n" - + "$9\r\nconnected\r\n" - + ":477\r\n"); - HostPort unknownActive = new HostPort(LOCAL_HOST, unknownActiveSlaveServer.getPort()); - when(metaCache.findClusterShard(unknownActive)).thenReturn(null); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave, unknownActive), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertFalse(shouldReset); + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - Server unknownActiveMasterServer = startServer(randomPort(), "*3\r\n" + //not in master + Server master = startServer(6379, "*3\r\n" + "$6\r\nmaster\r\n" - + ":0\r\n*0\r\n"); - unknownActive = new HostPort(LOCAL_HOST, unknownActiveMasterServer.getPort()); - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave, unknownActive), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + + ":43\r\n" + + "*3\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6380\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n8000\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6381\r\n" + + "$1\r\n0\r\n"); + + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); + + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381), new HostPort(LOCAL_HOST, 6382), new HostPort(LOCAL_HOST, 8000)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertTrue(shouldReset); + master.stop(); + + //in master + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6382), new ArrayList<>())) + .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); + Server master2 = startServer(6382, "*3\r\n" + + "$6\r\nmaster\r\n" + + ":43\r\n" + + "*4\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6380\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n8000\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6381\r\n" + + "$1\r\n0\r\n" + + "*3\r\n" + + "$9\r\n127.0.0.1\r\n" + + "$4\r\n6382\r\n" + + "$1\r\n0\r\n"); + + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381),new HostPort(LOCAL_HOST, 6382), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertFalse(shouldReset); + master2.stop(); } }