diff --git a/pom.xml b/pom.xml index cf1f68e301..7de218c451 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ 3.12.0 2.7 1.10-SNAPSHOT - 4.9.0 + 4.6.3 4.4.1-unstable-SNAPSHOT 1.0.7-SNAPSHOT 9.2.37 @@ -286,14 +286,6 @@ com.ctrip.framework vi - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java index 177fc0bc17..9f8276c5f8 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java @@ -237,7 +237,7 @@ public void go() throws Exception { chain.add(new AnalyseHellos(context, checkerConfig)); chain.add(new AcquireLeakyBucket(context, leakyBucket)); chain.add(new DeleteSentinels(context, sentinelManager)); - chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager, checkerConfig)); + chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager)); chain.add(new AddSentinels(context, sentinelManager, checkerConfig)); chain.add(new SetSentinels(context, sentinelManager)); chain.execute().addListener(commandFuture -> { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java index 3c1c7c96b7..6eb9a14c4b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java @@ -1,6 +1,6 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.collector.command; -import com.ctrip.xpipe.api.command.Command; +import com.ctrip.xpipe.api.command.CommandFuture; import com.ctrip.xpipe.command.AbstractCommand; import com.ctrip.xpipe.command.ParallelCommandChain; import com.ctrip.xpipe.endpoint.DefaultEndPoint; @@ -8,22 +8,17 @@ import com.ctrip.xpipe.monitor.CatEventMonitor; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.SentinelManager; -import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHello; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand; -import com.ctrip.xpipe.redis.core.protocal.pojo.MasterRole; import com.ctrip.xpipe.redis.core.protocal.pojo.Role; import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole; +import com.ctrip.xpipe.tuple.Pair; +import com.ctrip.xpipe.utils.ObjectUtils; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.concurrent.*; import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE; @@ -34,149 +29,42 @@ public class ResetSentinels extends AbstractSentinelHelloCollectCommand { private XpipeNettyClientKeyedObjectPool keyedObjectPool; private ScheduledExecutorService scheduled; private ExecutorService resetExecutor; - private CheckerConfig checkerConfig; public ResetSentinels(SentinelHelloCollectContext context, MetaCache metaCache, XpipeNettyClientKeyedObjectPool keyedObjectPool, - ScheduledExecutorService scheduled, ExecutorService resetExecutor, SentinelManager sentinelManager, - CheckerConfig checkerConfig) { + ScheduledExecutorService scheduled, ExecutorService resetExecutor,SentinelManager sentinelManager) { super(context); this.metaCache = metaCache; this.keyedObjectPool = keyedObjectPool; this.scheduled = scheduled; this.resetExecutor = resetExecutor; this.sentinelManager = sentinelManager; - this.checkerConfig = checkerConfig; } @Override protected void doExecute() throws Throwable { - checkSentinels(context.getInfo().getClusterId(), context.getInfo().getShardId(), context.getSentinelMonitorName(), context.getToCheckReset()); + checkReset(context.getInfo().getClusterId(), context.getInfo().getShardId(), context.getSentinelMonitorName(), context.getToCheckReset()); future().setSuccess(); } - protected void checkSentinels(String clusterId, String shardId, String sentinelMonitorName, Set hellos) { + protected void checkReset(String clusterId, String shardId, String sentinelMonitorName, Set hellos) { if (hellos.isEmpty()) return; - - if (hellos.size() < checkerConfig.getDefaultSentinelQuorumConfig().getQuorum()) { - logger.warn("[{}-{}+{}]sentinels less then quorum, ignore reset, hellos:{}", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), hellos); - return; - } - - Map> allSentinels = sentinelsSlaves(sentinelMonitorName, hellos); - Map> availableSentinels = sentinels2AllSlaves(allSentinels); - if (overHalfSentinelsLostSlaves(availableSentinels)) { - logger.warn("[{}-{}+{}]over half sentinels lost slaves: {}, ignore reset", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), allSentinels); - return; - } - - List shouldResetSentinels = shouldResetSentinels(clusterId, shardId, sentinelMonitorName, hellos, allSentinels); - - List resetSentinels = sentinelsToReset(availableSentinels, Lists.newArrayList(shouldResetSentinels)); - if (resetSentinels.isEmpty()) - return; - - logger.info("[{}-{}+{}]{} to reset sentinels:{}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, resetSentinels); - resetSentinels.forEach(sentinel -> { - CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName); - sentinelManager.reset(new Sentinel(sentinel.toString(), sentinel.getHost(), sentinel.getPort()), sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> { - logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinel, throwable); - return null; - }); - }); - - } - - List shouldResetSentinels(String clusterId, String shardId, String sentinelMonitorName, Set hellos, Map> allSentinels) { - Map shouldResetSentinelsMap = new ConcurrentHashMap<>(); - ParallelCommandChain checkCommand = new ParallelCommandChain(resetExecutor, false); + ParallelCommandChain resetChain = new ParallelCommandChain(resetExecutor, false); for (SentinelHello hello : hellos) { - HostPort sentinelAddr = hello.getSentinelAddr(); - List slaves = allSentinels.get(sentinelAddr); - CheckSentinel resetSentinel = new CheckSentinel(clusterId, shardId, hello, sentinelMonitorName, slaves); - resetSentinel.future().addListener(future -> { - if (future.isSuccess() && future.get()) - shouldResetSentinelsMap.put(sentinelAddr, sentinelAddr); - }); - checkCommand.add(resetSentinel); + resetChain.add(new ResetSentinel(clusterId, shardId, hello, sentinelMonitorName)); } - - try { - checkCommand.execute().get(2050, TimeUnit.MILLISECONDS); - } catch (Throwable th) { - logger.warn("[{}-{}+{}]check all sentinels error, hellos:{} ", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), hellos, th); - } - - return Lists.newArrayList(shouldResetSentinelsMap.keySet()); - } - - - List sentinelsToReset(Map> availableSentinels, List shouldResetSentinels) { - List sentinelsToReset = new ArrayList<>(); - if (shouldResetSentinels.isEmpty()) - return sentinelsToReset; - - //reset unavailable sentinels first - sentinelsToReset.addAll(shouldResetSentinels.stream().filter(sentinel -> !availableSentinels.containsKey(sentinel)).collect(Collectors.toList())); - shouldResetSentinels.removeAll(sentinelsToReset); - - //all shouldResetSentinels unavailable - if (shouldResetSentinels.isEmpty()) - return sentinelsToReset; - - //leave quorum availableSentinels - int canResetAvailableSentinelSize = availableSentinels.size() - checkerConfig.getDefaultSentinelQuorumConfig().getQuorum(); - if (canResetAvailableSentinelSize >= shouldResetSentinels.size()) { - sentinelsToReset.addAll(shouldResetSentinels); - return sentinelsToReset; - } - - for (int i = 0; i < canResetAvailableSentinelSize; i++) { - sentinelsToReset.add(shouldResetSentinels.get(i)); - } - - return sentinelsToReset; - } - - Map> sentinels2AllSlaves(Map> sentinelsSlaves) { - Map> sentinels2AllSlaves = new HashMap<>(); - sentinelsSlaves.forEach((sentinel, slaves) -> { - if (sentinelHasAllSlaves(slaves)) { - sentinels2AllSlaves.put(sentinel, slaves); + CommandFuture resetFuture = resetChain.execute(); + ScheduledFuture resetTimeoutFuture = scheduled.schedule(new Runnable() { + @Override + public void run() { + resetFuture.cancel(true); } + }, 3000, TimeUnit.MILLISECONDS); + resetFuture.addListener(commandFuture -> { + if (!commandFuture.isCancelled()) + resetTimeoutFuture.cancel(true); }); - return sentinels2AllSlaves; - } - - boolean overHalfSentinelsLostSlaves(Map> availableSentinels) { - return availableSentinels.size() < checkerConfig.getDefaultSentinelQuorumConfig().getQuorum(); - } - - private Map> sentinelsSlaves(String sentinelMonitorName, Set hellos) { - Map> sentinel2Slaves = new ConcurrentHashMap<>(); - ParallelCommandChain sentinelSlaves = new ParallelCommandChain(resetExecutor, false); - for (SentinelHello hello : hellos) { - HostPort sentinelAddr = hello.getSentinelAddr(); - Sentinel sentinel = new Sentinel(sentinelAddr.toString(), sentinelAddr.getHost(), sentinelAddr.getPort()); - Command> slavesCommand = sentinelManager.slaves(sentinel, sentinelMonitorName); - slavesCommand.future().addListener(future -> { - if (future.isSuccess()) - sentinel2Slaves.put(sentinelAddr, future.get()); - else { - logger.warn("[{}-{}][checkReset-slaves]{}", LOG_TITLE, sentinelMonitorName, sentinel, future.cause()); - } - }); - sentinelSlaves.add(slavesCommand); - } - - try { - sentinelSlaves.execute().get(2050, TimeUnit.MILLISECONDS); - } catch (Throwable th) { - logger.warn("[{}-{}+{}]get sentinel slaves error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th); - } - - return sentinel2Slaves; } private Set tooManyKeepers(List slaves) { @@ -196,35 +84,57 @@ private Set tooManyKeepers(List slaves) { } - private Set unknownInstances(List slaves) { - Set unknownInstances = Sets.newHashSet(slaves); - unknownInstances.removeAll(context.getShardInstances()); - slaves.removeAll(unknownInstances); + private Set unknownInstances(List slaves, String clusterId, String shardId){ + Set unknownInstances = new HashSet<>(); + for (HostPort currentSlave : slaves) { + Pair clusterShard = metaCache.findClusterShard(currentSlave); + if (clusterShard == null || !ObjectUtils.equals(clusterId, clusterShard.getKey()) || !ObjectUtils.equals(shardId, clusterShard.getValue())) + unknownInstances.add(currentSlave); + } return unknownInstances; } - private static final int EXPECTED_KEEPER_COUNT = 1; - private static final int EXPECTED_MASTER_COUNT = 1; - boolean shouldReset(List slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) { - if (slaves.isEmpty()) - return false; + private Set connectedToTrueMaster(Set invalidSlaves) { + Map connectedSlaves = new ConcurrentHashMap<>(); + + ParallelCommandChain slaveRoleChain = new ParallelCommandChain(); + for (HostPort hostPort : invalidSlaves) { + RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled); + roleCommand.future().addListener(future -> { + if (future.isSuccess()) { + Role role = future.get(); + if (role instanceof SlaveRole) { + SlaveRole slaveRole = (SlaveRole) role; + HostPort trueMaster = context.getTrueMasterInfo().getKey(); + if (slaveRole.getMasterHost().equals(trueMaster.getHost()) && slaveRole.getMasterPort() == trueMaster.getPort()) { + connectedSlaves.put(hostPort, slaveRole); + } + } + } + }); + slaveRoleChain.add(roleCommand); + } - Set toManyKeepers = tooManyKeepers(slaves); - Set unknownSlaves = unknownInstances(slaves); + try { + slaveRoleChain.execute().get(1500, TimeUnit.MILLISECONDS); + } catch (Throwable th) { + logger.warn("[{}-{}+{}]parallel role command to slaves error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th); + } - if (toManyKeepers.isEmpty() && unknownSlaves.isEmpty()) - return false; - List masterSlaves = masterSlaves(); - if (!masterHasAllSlaves(masterSlaves,clusterId, shardId, sentinelMonitorName, sentinelAddr)) - return false; + return connectedSlaves.keySet(); + } - if (shouldResetTooManyKeepers(masterSlaves, toManyKeepers)) { + private static final int EXPECTED_KEEPER_COUNT = 1; + boolean shouldReset(List slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) { + Set toManyKeepers = tooManyKeepers(slaves); + if (shouldResetTooManyKeepers(toManyKeepers)) { logger.info("[{}-{}+{}][reset]{}, {}, too many keepers: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, toManyKeepers); return true; } - if (shouldResetUnknownInstances(masterSlaves, unknownSlaves)) { + Set unknownSlaves = unknownInstances(slaves, clusterId, shardId); + if (shouldResetUnknownInstances(unknownSlaves)) { logger.info("[{}-{}+{}][reset]{}, {}, unknown slaves: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, unknownSlaves); return true; } @@ -232,89 +142,53 @@ boolean shouldReset(List slaves, String clusterId, String shardId, Str return false; } - private boolean sentinelHasAllSlaves(List sentinelSlaves) { - if (sentinelSlaves.isEmpty()) - return false; - - Set shardInstances = Sets.newHashSet(context.getShardInstances()); - shardInstances.removeAll(sentinelSlaves); - return shardInstances.size() == EXPECTED_MASTER_COUNT; - } - - private boolean shouldResetTooManyKeepers(List masterSlaves, Set toManyKeepers) { - if(toManyKeepers.isEmpty()) - return false; - - Set slaves = Sets.newHashSet(masterSlaves); - slaves.retainAll(toManyKeepers); - - return slaves.size() <= EXPECTED_KEEPER_COUNT; - } - - private boolean shouldResetUnknownInstances(List masterSlaves, Set unknownSlaves) { - if(unknownSlaves.isEmpty()) - return false; - - Set slaves = Sets.newHashSet(masterSlaves); - slaves.retainAll(unknownSlaves); - - return slaves.isEmpty(); - } - - private List masterSlaves() { - HostPort master = context.getTrueMasterInfo().getKey(); - RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(master.getHost(), master.getPort())), scheduled); - - try { - Role role = roleCommand.execute().get(660, TimeUnit.MILLISECONDS); - if (role instanceof MasterRole) { - MasterRole masterRole = (MasterRole) role; - return masterRole.getSlaveHostPorts(); - } - } catch (Throwable th) { - logger.warn("[{}-{}+{}]get slaves from master failed", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th); - } - - return new ArrayList<>(); + private boolean shouldResetTooManyKeepers(Set toManyKeepers) { + return !toManyKeepers.isEmpty() && connectedToTrueMaster(toManyKeepers).size() <= EXPECTED_KEEPER_COUNT; } - private boolean masterHasAllSlaves(List masterSlaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) { - HostPort master = context.getTrueMasterInfo().getKey(); - Set masterAndSlaves = Sets.newHashSet(masterSlaves); - masterAndSlaves.add(master); - - boolean masterHasAllSlaves = masterAndSlaves.containsAll(context.getShardInstances()); - if (!masterHasAllSlaves) { - logger.info("[{}-{}+{}][reset]{}, {}, master:{} lost connection with some slaves, stop reset, current slaves:{}, expected slaves:{}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, master, masterSlaves, context.getShardInstances()); - } - return masterHasAllSlaves; + private boolean shouldResetUnknownInstances(Set unknownSlaves) { + return !unknownSlaves.isEmpty() && connectedToTrueMaster(unknownSlaves).isEmpty(); } - class CheckSentinel extends AbstractCommand { + class ResetSentinel extends AbstractCommand { private String clusterId; private String shardId; private SentinelHello hello; private String sentinelMonitorName; - private List slaves; - public CheckSentinel(String clusterId, String shardId, SentinelHello sentinelHello, String sentinelMonitorName, List slaves) { + public ResetSentinel(String clusterId, String shardId, SentinelHello sentinelHello, String sentinelMonitorName) { this.clusterId = clusterId; this.shardId = shardId; this.hello = sentinelHello; this.sentinelMonitorName = sentinelMonitorName; - this.slaves = slaves; } @Override public String getName() { - return "CheckSentinel"; + return "ResetSentinel"; } @Override protected void doExecute() throws Throwable { HostPort sentinelAddr = hello.getSentinelAddr(); - future().setSuccess(shouldReset(slaves, clusterId, shardId, sentinelMonitorName, sentinelAddr)); + Sentinel sentinel = new Sentinel(sentinelAddr.toString(), sentinelAddr.getHost(), sentinelAddr.getPort()); + + List slaves = sentinelManager.slaves(sentinel, sentinelMonitorName).execute().getOrHandle(2050, TimeUnit.MILLISECONDS, throwable -> { + logger.warn("[{}-{}][checkReset-slaves]{}", LOG_TITLE, sentinelMonitorName, sentinel, throwable); + return new ArrayList<>(); + }); + + if (slaves.isEmpty()) + return; + + if (shouldReset(slaves, clusterId, shardId, sentinelMonitorName, sentinelAddr)) { + CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName); + sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> { + logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable); + return null; + }); + } } @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java index 992a78c5bc..619c672d2e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/SentinelHelloCollectContext.java @@ -117,8 +117,7 @@ public List getShardInstances() { } public SentinelHelloCollectContext setShardInstances(List shardInstances) { - this.shardInstances = shardInstances; - return this; + this.shardInstances = shardInstances;return this; } public Map getClusterTypeSentinelConfig() { diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java index 94609a0963..f9a24254c6 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java @@ -1,21 +1,16 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.collector.command; -import com.ctrip.xpipe.command.AbstractCommand; +import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.AbstractCheckerTest; import com.ctrip.xpipe.redis.checker.SentinelManager; -import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHello; import com.ctrip.xpipe.redis.core.meta.MetaCache; -import com.ctrip.xpipe.redis.core.meta.QuorumConfig; -import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel; import com.ctrip.xpipe.simpleserver.Server; import com.ctrip.xpipe.tuple.Pair; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -24,14 +19,10 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BooleanSupplier; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class ResetSentinelsTest extends AbstractCheckerTest { @@ -46,591 +37,144 @@ public class ResetSentinelsTest extends AbstractCheckerTest { private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Mock private ScheduledExecutorService scheduled; - - private ExecutorService resetExecutor = Executors.newSingleThreadExecutor(); @Mock - private CheckerConfig checkerConfig; + private ExecutorService resetExecutor; @Before public void init() throws Exception { resetSentinels = new ResetSentinels(new SentinelHelloCollectContext(), metaCache, - keyedObjectPool, scheduled, resetExecutor, sentinelManager, checkerConfig); + keyedObjectPool, scheduled, resetExecutor,sentinelManager); resetSentinels.setKeyedObjectPool(getXpipeNettyClientKeyedObjectPool()).setScheduled(scheduled); } - @After - public void shutdown() { - resetExecutor.shutdownNow(); - } - @Test - public void halfSentinelsLostSlaves() throws Exception { + public void testTooManyKeepers() throws Exception{ RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380)))); - when(checkerConfig.getDefaultSentinelQuorumConfig()).thenReturn(new QuorumConfig(3, 2)); - - //hellos is empty - resetSentinels.checkSentinels("cluster", "shard", "cluster+shard+activeDc", new HashSet<>()); - verify(checkerConfig, never()).getDefaultSentinelQuorumConfig(); - - //hellos lost - resetSentinels.checkSentinels("cluster", "shard", "cluster+shard+activeDc", Sets.newHashSet(new SentinelHello(new HostPort(LOCAL_HOST, 5000), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"))); - verify(checkerConfig, times(1)).getDefaultSentinelQuorumConfig(); - verify(sentinelManager, never()).slaves(any(), any()); - - //over half sentinels lost slaves - SentinelHello hello5000 = new SentinelHello(new HostPort(LOCAL_HOST, 5000), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5000 = new Sentinel(hello5000.getSentinelAddr().toString(), hello5000.getSentinelAddr().getHost(), hello5000.getSentinelAddr().getPort()); - - SentinelHello hello5001 = new SentinelHello(new HostPort(LOCAL_HOST, 5001), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5001 = new Sentinel(hello5001.getSentinelAddr().toString(), hello5001.getSentinelAddr().getHost(), hello5001.getSentinelAddr().getPort()); - - SentinelHello hello5002 = new SentinelHello(new HostPort(LOCAL_HOST, 5002), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5002 = new Sentinel(hello5002.getSentinelAddr().toString(), hello5002.getSentinelAddr().getHost(), hello5002.getSentinelAddr().getPort()); - - when(sentinelManager.slaves(sentinel5000, hello5000.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - when(sentinelManager.slaves(sentinel5001, hello5001.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - when(sentinelManager.slaves(sentinel5002, hello5002.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000))); - } - - @Override - protected void doReset() { + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort("localhost", 6379), new ArrayList<>()))); - } - - @Override - public String getName() { - return null; - } - }); - resetSentinels.checkSentinels("cluster", "shard", "cluster+shard+activeDc", Sets.newHashSet(hello5000, hello5001, hello5002)); - verify(checkerConfig, times(3)).getDefaultSentinelQuorumConfig(); - verify(sentinelManager, times(3)).slaves(any(), any()); - verify(metaCache, never()).getAllKeepers(); - } - - @Test - public void rateLimit1() throws Exception { - RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380)))); - when(checkerConfig.getDefaultSentinelQuorumConfig()).thenReturn(new QuorumConfig(5, 3)); +// sentinelManager.slaves when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - SentinelHello hello5000 = new SentinelHello(new HostPort(LOCAL_HOST, 5000), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5000 = new Sentinel(hello5000.getSentinelAddr().toString(), hello5000.getSentinelAddr().getHost(), hello5000.getSentinelAddr().getPort()); - - SentinelHello hello5001 = new SentinelHello(new HostPort(LOCAL_HOST, 5001), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5001 = new Sentinel(hello5001.getSentinelAddr().toString(), hello5001.getSentinelAddr().getHost(), hello5001.getSentinelAddr().getPort()); - - SentinelHello hello5002 = new SentinelHello(new HostPort(LOCAL_HOST, 5002), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5002 = new Sentinel(hello5002.getSentinelAddr().toString(), hello5002.getSentinelAddr().getHost(), hello5002.getSentinelAddr().getPort()); - - SentinelHello hello5003 = new SentinelHello(new HostPort(LOCAL_HOST, 5003), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5003 = new Sentinel(hello5003.getSentinelAddr().toString(), hello5003.getSentinelAddr().getHost(), hello5003.getSentinelAddr().getPort()); - - SentinelHello hello5004 = new SentinelHello(new HostPort(LOCAL_HOST, 5004), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5004 = new Sentinel(hello5004.getSentinelAddr().toString(), hello5004.getSentinelAddr().getHost(), hello5004.getSentinelAddr().getPort()); - - //sentinel5000 lost slaves and has too many keepers - when(sentinelManager.slaves(sentinel5000, hello5000.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5001 has too many keepers - when(sentinelManager.slaves(sentinel5001, hello5001.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5002 is ok - when(sentinelManager.slaves(sentinel5002, hello5002.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5003 is ok - when(sentinelManager.slaves(sentinel5003, hello5003.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5004 is ok - when(sentinelManager.slaves(sentinel5004, hello5004.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - - Server master = startServer(6379, "*3\r\n" - + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*3\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6381\r\n" - + "$1\r\n0\r\n"); - - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - when(sentinelManager.reset(any(), any())).thenReturn(new AbstractCommand() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(1L); - } - - @Override - protected void doReset() { - - } + // 1、command failed + boolean shouldReset= resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); - @Override - public String getName() { - return null; - } - }); - resetSentinels.checkSentinels("cluster", "shard", "cluster+shard+activeDc", Sets.newHashSet(hello5000, hello5001, hello5002, hello5003, hello5004)); - waitConditionUntilTimeOut(new BooleanSupplier() { - @Override - public boolean getAsBoolean() { - try { - verify(sentinelManager, times(2)).reset(any(), anyString()); - verify(sentinelManager, times(1)).reset(sentinel5000, hello5000.getMonitorName()); - verify(sentinelManager, times(1)).reset(sentinel5001, hello5001.getMonitorName()); - return true; - } catch (Throwable th) { - logger.error("test failed", th); - return false; - } + // 2、some keepers unreachable + Server activeKeeper0 = startServer(8000,"*5\r\n" + + "$6\r\nkeeper\r\n" + + "$9\r\nlocalhost\r\n" + + ":6379\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); - } - }); + // 3、invalid keeper not connected to master + Server activeKeeper1 = startServer(8001,"*5\r\n" + + "$6\r\nkeeper\r\n" + + "$10\r\nlocalhost2\r\n" + + ":6379\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); - master.stop(); + // 4、invalid keeper connected to master + activeKeeper1.stop(); + Server activeKeeper2 = startServer(8002,"*5\r\n" + + "$6\r\nkeeper\r\n" + + "$9\r\nlocalhost\r\n" + + ":6379\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8002)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertFalse(shouldReset); + activeKeeper2.stop(); + activeKeeper0.stop(); } @Test - public void rateLimit2() throws Exception { + public void testOneWayReset() throws Exception { RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380)))); - when(checkerConfig.getDefaultSentinelQuorumConfig()).thenReturn(new QuorumConfig(5, 3)); - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - SentinelHello hello5000 = new SentinelHello(new HostPort(LOCAL_HOST, 5000), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5000 = new Sentinel(hello5000.getSentinelAddr().toString(), hello5000.getSentinelAddr().getHost(), hello5000.getSentinelAddr().getPort()); - - SentinelHello hello5001 = new SentinelHello(new HostPort(LOCAL_HOST, 5001), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5001 = new Sentinel(hello5001.getSentinelAddr().toString(), hello5001.getSentinelAddr().getHost(), hello5001.getSentinelAddr().getPort()); - - SentinelHello hello5002 = new SentinelHello(new HostPort(LOCAL_HOST, 5002), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5002 = new Sentinel(hello5002.getSentinelAddr().toString(), hello5002.getSentinelAddr().getHost(), hello5002.getSentinelAddr().getPort()); - - SentinelHello hello5003 = new SentinelHello(new HostPort(LOCAL_HOST, 5003), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5003 = new Sentinel(hello5003.getSentinelAddr().toString(), hello5003.getSentinelAddr().getHost(), hello5003.getSentinelAddr().getPort()); - - SentinelHello hello5004 = new SentinelHello(new HostPort(LOCAL_HOST, 5004), new HostPort(LOCAL_HOST, 6379), "cluster+shard+activeDc"); - Sentinel sentinel5004 = new Sentinel(hello5004.getSentinelAddr().toString(), hello5004.getSentinelAddr().getHost(), hello5004.getSentinelAddr().getPort()); - - //sentinel5000 lost slaves and has too many keepers - when(sentinelManager.slaves(sentinel5000, hello5000.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort("localhost", 6380), new ArrayList<>()))); +// sentinelManager.slaves + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - @Override - public String getName() { - return null; - } - }); - //sentinel5001 has too many keepers - when(sentinelManager.slaves(sentinel5001, hello5001.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5002 has too many keepers - when(sentinelManager.slaves(sentinel5002, hello5002.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5003 has too many keepers - when(sentinelManager.slaves(sentinel5003, hello5003.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - //sentinel5004 has too many keepers - when(sentinelManager.slaves(sentinel5004, hello5004.getMonitorName())).thenReturn(new AbstractCommand>() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); - } - - @Override - protected void doReset() { - - } - - @Override - public String getName() { - return null; - } - }); - - Server master = startServer(6379, "*3\r\n" - + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*3\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6381\r\n" - + "$1\r\n0\r\n"); - - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - when(sentinelManager.reset(any(), any())).thenReturn(new AbstractCommand() { - @Override - protected void doExecute() throws Throwable { - future().setSuccess(1L); - } + HostPort wrongSlave = new HostPort("otherClusterShardSlave", 6379); + when(metaCache.findClusterShard(wrongSlave)).thenReturn(new Pair<>("otherCluster", "otherShard")); + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), wrongSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); - @Override - protected void doReset() { + Server unknownSlaveServer = startServer(8002,"*5\r\n" + + "$5\r\nslave\r\n" + + "$9\r\nlocalhost\r\n" + + ":6380\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + HostPort unknownConnectedSlave = new HostPort(LOCAL_HOST, unknownSlaveServer.getPort()); + when(metaCache.findClusterShard(unknownConnectedSlave)).thenReturn(null); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), unknownConnectedSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertFalse(shouldReset); + unknownSlaveServer.stop(); - } + HostPort unknownUnreachableSlave = new HostPort(LOCAL_HOST, 8003); + when(metaCache.findClusterShard(unknownUnreachableSlave)).thenReturn(null); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), unknownUnreachableSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); - @Override - public String getName() { - return null; - } - }); - resetSentinels.checkSentinels("cluster", "shard", "cluster+shard+activeDc", Sets.newHashSet(hello5000, hello5001, hello5002, hello5003, hello5004)); - waitConditionUntilTimeOut(new BooleanSupplier() { - @Override - public boolean getAsBoolean() { - try { - verify(sentinelManager, times(2)).reset(any(), anyString()); - verify(sentinelManager, times(1)).reset(sentinel5000, hello5000.getMonitorName()); - return true; - } catch (Throwable th) { - logger.error("test failed", th); - return false; - } + Server unknownAndConnectedToOtherMasterSlaveServer = startServer(8004,"*5\r\n" + + "$5\r\nslave\r\n" + + "$9\r\nlocalhost\r\n" + + ":6381\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + HostPort unknownAndConnectedToOtherMasterSlave = new HostPort(LOCAL_HOST, unknownAndConnectedToOtherMasterSlaveServer.getPort()); + when(metaCache.findClusterShard(unknownAndConnectedToOtherMasterSlave)).thenReturn(null); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), unknownAndConnectedToOtherMasterSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertTrue(shouldReset); - } - }); + HostPort trueSlave = new HostPort(LOCAL_HOST, 6379); + when(metaCache.findClusterShard(trueSlave)).thenReturn(new Pair<>("cluster", "shard")); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), trueSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Assert.assertFalse(shouldReset); - master.stop(); } - @Test - public void noInvalidSlaves() throws Exception { - RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380)))); - - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 6380)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertFalse(shouldReset); - } @Test - public void masterLostSlaves() throws Exception { - RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); + public void testNotOneWayReset() throws Exception { + RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort(),ClusterType.CROSS_DC); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort("localhost", 6380), new ArrayList<>()))); - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - startServer(6379,"*3\r\n" - + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*2\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n"); - - - boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + HostPort trueSlave = new HostPort(LOCAL_HOST, 6379); + when(metaCache.findClusterShard(trueSlave)).thenReturn(new Pair<>("cluster", "shard")); + boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertFalse(shouldReset); - } - - @Test - public void testTooManyKeepers() throws Exception { - RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - //not in master - Server master = startServer(6379, "*3\r\n" - + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*3\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6381\r\n" - + "$1\r\n0\r\n"); - - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + HostPort wrongSlave = new HostPort("otherClusterShardSlave", 6379); + when(metaCache.findClusterShard(wrongSlave)).thenReturn(new Pair<>("otherCluster", "otherShard")); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave, wrongSlave), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertTrue(shouldReset); - master.stop(); - - //in master - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6382), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6382), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - Server master2 = startServer(6382, "*3\r\n" - + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*4\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6381\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8001\r\n" - + "$1\r\n0\r\n"); - - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + Server unknownActiveSlaveServer = startServer(randomPort(),"*5\r\n" + + "$5\r\nslave\r\n" + + "$9\r\nlocalhost\r\n" + + ":6380\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + HostPort unknownActive = new HostPort(LOCAL_HOST, unknownActiveSlaveServer.getPort()); + when(metaCache.findClusterShard(unknownActive)).thenReturn(null); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave, unknownActive), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertFalse(shouldReset); - master2.stop(); - } - - @Test - public void unknownSlaves() throws Exception { - RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6379), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - //not in master - Server master = startServer(6379, "*3\r\n" + Server unknownActiveMasterServer = startServer(randomPort(), "*3\r\n" + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*3\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6381\r\n" - + "$1\r\n0\r\n"); - - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); - - boolean shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381), new HostPort(LOCAL_HOST, 6382), new HostPort(LOCAL_HOST, 8000)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); + + ":0\r\n*0\r\n"); + unknownActive = new HostPort(LOCAL_HOST, unknownActiveMasterServer.getPort()); + shouldReset = resetSentinels.shouldReset(Lists.newArrayList(trueSlave, unknownActive), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); Assert.assertTrue(shouldReset); - master.stop(); - - //in master - resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()).setTrueMasterInfo(new Pair<>(new HostPort(LOCAL_HOST, 6382), new ArrayList<>())) - .setShardInstances(Lists.newArrayList(new HostPort(LOCAL_HOST, 6379), new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381)))); - Server master2 = startServer(6382, "*3\r\n" - + "$6\r\nmaster\r\n" - + ":43\r\n" - + "*4\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6380\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n8000\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6381\r\n" - + "$1\r\n0\r\n" - + "*3\r\n" - + "$9\r\n127.0.0.1\r\n" - + "$4\r\n6382\r\n" - + "$1\r\n0\r\n"); - - shouldReset = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 6380), new HostPort(LOCAL_HOST, 6381),new HostPort(LOCAL_HOST, 6382), new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard", "cluster+shard+activeDc", new HostPort(LOCAL_HOST, 22230)); - Assert.assertFalse(shouldReset); - master2.stop(); } }