diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..a5685a541e16 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/ifesdjeen/cassandra-accord.git + branch = CASSANDRA-20245 diff --git a/modules/accord b/modules/accord index c7a789b1f424..380d741c8d75 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit c7a789b1f424771a4befab6bcb91edd4ab5d7198 +Subproject commit 380d741c8d75f6d5c14e0a627c129762bcec9385 diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index 720a2db7722d..566a0d16ef7b 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -194,7 +194,8 @@ public enum TransactionalRangeMigration public boolean ephemeralReadEnabled = true; public boolean state_cache_listener_jfr_enabled = true; public final JournalSpec journal = new JournalSpec(); - public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec(); + public final RetrySpec minEpochSyncRetry = new MinEpochRetrySpec(); + public final RetrySpec fetchRetry = new FetchRetrySpec(); public static class MinEpochRetrySpec extends RetrySpec { @@ -204,6 +205,14 @@ public MinEpochRetrySpec() } } + public static class FetchRetrySpec extends RetrySpec + { + public FetchRetrySpec() + { + maxAttempts = new MaxAttempt(100); + } + } + public static class JournalSpec implements Params { public int segmentSize = 32 << 20; diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailure.java b/src/java/org/apache/cassandra/exceptions/RequestFailure.java index 312102c01201..6946b812aa34 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailure.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailure.java @@ -43,6 +43,7 @@ public class RequestFailure { public static final RequestFailure UNKNOWN = new RequestFailure(RequestFailureReason.UNKNOWN); + public static final RequestFailure UNKNOWN_TOPOLOGY = new RequestFailure(RequestFailureReason.UNKNOWN_TOPOLOGY); public static final RequestFailure READ_TOO_MANY_TOMBSTONES = new RequestFailure(RequestFailureReason.READ_TOO_MANY_TOMBSTONES); public static final RequestFailure TIMEOUT = new RequestFailure(RequestFailureReason.TIMEOUT); public static final RequestFailure INCOMPATIBLE_SCHEMA = new RequestFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA); @@ -134,6 +135,7 @@ public static RequestFailure forReason(RequestFailureReason reason) { default: throw new IllegalStateException("Unhandled request failure reason " + reason); case UNKNOWN: return UNKNOWN; + case UNKNOWN_TOPOLOGY: return UNKNOWN_TOPOLOGY; case READ_TOO_MANY_TOMBSTONES: return READ_TOO_MANY_TOMBSTONES; case TIMEOUT: return TIMEOUT; case INCOMPATIBLE_SCHEMA: return INCOMPATIBLE_SCHEMA; diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java index b3b9bddfc448..917c6c753d46 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java @@ -42,6 +42,7 @@ public enum RequestFailureReason READ_TOO_MANY_INDEXES (10), RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM (11), BOOTING (12), + UNKNOWN_TOPOLOGY (13) ; static diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java b/src/java/org/apache/cassandra/net/MessageDelivery.java index 9dc374750f60..c57366d45058 100644 --- a/src/java/org/apache/cassandra/net/MessageDelivery.java +++ b/src/java/org/apache/cassandra/net/MessageDelivery.java @@ -99,14 +99,6 @@ public default Future> sendWithRetries(Backoff backoff, return promise; } - public default Future> sendWithRetries(Verb verb, REQ request, - Iterator candidates, - RetryPredicate shouldRetry, - RetryErrorMessage errorMessage) - { - return sendWithRetries(Backoff.NO_OP.INSTANCE, ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry, errorMessage); - } - public default void sendWithRetries(Backoff backoff, RetryScheduler retryThreads, Verb verb, REQ request, Iterator candidates, @@ -147,7 +139,8 @@ interface RetryErrorMessage } private static void sendWithRetries(MessageDelivery messaging, - Backoff backoff, RetryScheduler retryThreads, + Backoff backoff, + RetryScheduler retryThreads, Verb verb, REQ request, Iterator candidates, OnResult onResult, diff --git a/src/java/org/apache/cassandra/net/MessagingUtils.java b/src/java/org/apache/cassandra/net/MessagingUtils.java index 2190eaf3a655..11735f8d7f66 100644 --- a/src/java/org/apache/cassandra/net/MessagingUtils.java +++ b/src/java/org/apache/cassandra/net/MessagingUtils.java @@ -18,22 +18,31 @@ package org.apache.cassandra.net; +import java.util.Collection; import java.util.Iterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.SharedContext; public class MessagingUtils { + private static final Logger logger = LoggerFactory.getLogger(MessagingUtils.class); + /** * Candidate iterator that would try all endpoints known to be alive first, and then try all endpoints * in a round-robin manner. + *

+ * Calls onIteration every time after exhausting the peers. */ - public static Iterator tryAliveFirst(SharedContext context, Iterable peers) + public static Iterator tryAliveFirst(SharedContext context, Collection peers, String verb) { return new Iterator<>() { boolean firstRun = true; + int attempt = 0; Iterator iter = peers.iterator(); boolean isEmpty = !iter.hasNext(); @@ -58,10 +67,13 @@ public InetAddressAndPort next() // After that, cycle through all nodes if (!iter.hasNext()) + { + logger.warn("Exhausted iterator on {} cycling through the set of peers: {} attempt #{}", verb, peers, attempt++); iter = peers.iterator(); + } return iter.next(); } }; } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 0dbf99635a88..9d8b41bcb141 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -213,6 +213,7 @@ public EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState diskState } } + //TODO (required): should not be public public final ChangeListener listener = new MetadataChangeListener(); private class MetadataChangeListener implements ChangeListener { @@ -267,8 +268,6 @@ public synchronized void start() Map removedNodes = mapping.removedNodes(); for (Map.Entry e : removedNodes.entrySet()) onNodeRemoved(e.getValue(), currentTopology(), e.getKey()); - - ClusterMetadataService.instance().log().addListener(listener); } @Override @@ -416,13 +415,36 @@ void maybeReportMetadata(ClusterMetadata metadata) long epoch = metadata.epoch.getEpoch(); synchronized (epochs) { - if (epochs.maxEpoch() == 0) + // On first boot, we have 2 options: + // + // - we can start listening to TCM _before_ we replay topologies + // - we can start listening to TCM _after_ we replay topologies + // + // If we start listening to TCM _before_ we replay topologies from other nodes, + // we may end up in a situation where TCM reports metadata that would create an + // `epoch - 1` epoch state that is not associated with any topologies, and + // therefore should not be listened upon. + // + // If we start listening to TCM _after_ we replay topologies, we may end up in a + // situation where TCM reports metadata that is 1 (or more) epochs _ahead_ of the + // last known epoch. Previous implementations were using TCM peer catch up, which + // could have resulted in gaps. + // + // Current protocol solves both problems by _first_ replaying topologies form peers, + // then subscribing to TCM _and_, if there are still any gaps, filling them again. + // However, it still has a slight chance of creating an `epoch - 1` epoch state + // not associated with any topologies, which under "right" circumstances could + // have been waited upon with `epochReady`. This check precludes creation of this + // epoch: by the time this code can be called, remote topology replay is already + // done, so TCM listener will only report epochs that are _at least_ min epoch. + if (epochs.maxEpoch() == 0 || epochs.minEpoch() == metadata.epoch.getEpoch()) { getOrCreateEpochState(epoch); // touch epoch state so subsequent calls see it reportMetadata(metadata); return; } } + getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); } @@ -433,16 +455,25 @@ protected void fetchTopologyInternal(long epoch) Stage.ACCORD_MIGRATION.execute(() -> { if (ClusterMetadata.current().epoch.getEpoch() < epoch) ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch)); + + // In most cases, after fetching log from CMS, we will be caught up to the required epoch. + // This TCM will also notify Accord via reportMetadata, so we do not need to fetch topologies. + // If metadata has reported has skipped one or more epochs, and is _ahead_ of the requested epoch, + // we need to fetch topologies from peers to fill in the gap. + ClusterMetadata metadata = ClusterMetadata.current(); + if (metadata.epoch.getEpoch() == epoch) + return; + try { - Set peers = new HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints()); + Set peers = new HashSet<>(metadata.directory.allJoinedEndpoints()); peers.remove(FBUtilities.getBroadcastAddressAndPort()); if (peers.isEmpty()) return; - Topology topology; - while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) - { - } + + // TODO (required): fetch only _missing_ topologies. + Topology topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get(); + Invariants.require(topology.epoch() == epoch); reportTopology(topology); } catch (InterruptedException e) @@ -461,6 +492,13 @@ protected void fetchTopologyInternal(long epoch) }); } + @Override + public void reportTopology(Topology topology, boolean isLoad, boolean startSync) + { + Invariants.require(topology.epoch() <= ClusterMetadata.current().epoch.getEpoch()); + super.reportTopology(topology, isLoad, startSync); + } + @Override protected void localSyncComplete(Topology topology, boolean startSync) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 005b856005b0..cce9b8832bf8 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -22,13 +22,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -44,7 +41,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; -import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,7 +134,6 @@ import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.SharedContext; -import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -165,7 +160,6 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.Blocking; @@ -379,25 +373,48 @@ public synchronized void startup() node.commandStores().restoreShardStateUnsafe(topology -> configService.reportTopology(topology, true, true)); configService.start(); - long minEpoch = fetchMinEpoch(); - if (minEpoch >= 0) + try { - for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch(); epoch++) - node.configService().fetchTopologyForEpoch(epoch); + // Fetch topologies up to current + List topologies = fetchTopologies(null, metadata); + for (Topology topology : topologies) + configService.reportTopology(topology); - try - { - epochReady(metadata.epoch).get(DatabaseDescriptor.getTransactionTimeout(MILLISECONDS), MILLISECONDS); - } - catch (InterruptedException e) + ClusterMetadataService.instance().log().addListener(configService.listener); + ClusterMetadata next = ClusterMetadata.current(); + + // if metadata was updated before we were able to add a listener, fetch remaining topologies + if (next.epoch.isAfter(metadata.epoch)) { - throw new UncheckedInterruptedException(e); + topologies = fetchTopologies(metadata.epoch.getEpoch() + 1, next); + for (Topology topology : topologies) + configService.reportTopology(topology); } - catch (ExecutionException | TimeoutException e) + + int attempt = 0; + int waitSeconds = 5; + while (true) { - throw new RuntimeException(e); + try + { + epochReady(metadata.epoch).get(waitSeconds, SECONDS); + break; + } + catch (TimeoutException e) + { + logger.warn("Epoch {} is not ready after waiting for {} seconds", metadata.epoch, (++attempt) * waitSeconds); + } } } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } fastPathCoordinator.start(); ClusterMetadataService.instance().log().addListener(fastPathCoordinator); @@ -412,44 +429,60 @@ public synchronized void startup() } /** - * Queries peers to discover min epoch + * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private long fetchMinEpoch() + private List fetchTopologies(Long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - ClusterMetadata metadata = ClusterMetadata.current(); - Map> peers = new HashMap<>(); - for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) - { - List tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); - if (tables.isEmpty()) - continue; - DataPlacement current = metadata.placements.get(keyspace.params.replication); - DataPlacement settled = metadata.writePlacementAllSettled(keyspace); - Sets.SetView alive = Sets.intersection(settled.writes.byEndpoint().keySet(), current.writes.byEndpoint().keySet()); - InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - settled.writes.forEach((range, group) -> { - if (group.endpoints().contains(self)) - { - for (InetAddressAndPort peer : group.endpoints()) - { - if (peer.equals(self) || !alive.contains(peer)) continue; - for (TableMetadata table : tables) - peers.computeIfAbsent(peer, i -> new HashSet<>()).add(AccordTopology.fullRange(table.id)); - } - } - }); - } + if (minEpoch != null && minEpoch == metadata.epoch.getEpoch()) + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); + + Set peers = new HashSet<>(); + peers.addAll(metadata.directory.allAddresses()); + peers.remove(FBUtilities.getBroadcastAddressAndPort()); + + // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return -1; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); - Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers); + // Bootstrap, fetch min epoch if (minEpoch == null) - return -1; - return minEpoch; + { + Long fetched = findMinEpoch(SharedContext.Global.instance, peers); + if (fetched != null) + logger.info("Discovered min epoch of {} by querying {}", fetched, peers); + + // No other node has advanced epoch just yet + if (fetched == null || fetched == metadata.epoch.getEpoch()) + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); + + minEpoch = fetched; + } + + long maxEpoch = metadata.epoch.getEpoch(); + + // If we are behind minEpoch, catch up to at least minEpoch + if (metadata.epoch.getEpoch() < minEpoch) + { + minEpoch = metadata.epoch.getEpoch(); + maxEpoch = minEpoch; + } + + List> futures = new ArrayList<>(); + logger.info("Fetching topologies for epochs [{}, {}].", minEpoch, maxEpoch); + + for (long epoch = minEpoch; epoch <= maxEpoch; epoch++) + futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch)); + + FBUtilities.waitOnFutures(futures); + List topologies = new ArrayList<>(futures.size()); + for (Future future : futures) + topologies.add(future.get()); + + return topologies; } @VisibleForTesting - static Long findMinEpoch(SharedContext context, Map> peers) + static Long findMinEpoch(SharedContext context, Set peers) { try { @@ -1152,7 +1185,7 @@ private static CommandStoreTxnBlockedGraph.TxnState populate(CommandStoreTxnBloc @Nullable @Override - public Long minEpoch(Collection ranges) + public Long minEpoch() { return node.topology().minEpoch(); } diff --git a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java index ef670c572e04..3c6e18c3d635 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java +++ b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java @@ -20,9 +20,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import javax.annotation.Nullable; @@ -38,7 +36,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.SharedContext; @@ -53,40 +50,32 @@ // TODO (required, efficiency): this can be simplified: we seem to always use "entire range" public class FetchMinEpoch { + private static final FetchMinEpoch instance = new FetchMinEpoch(); + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override - public void serialize(FetchMinEpoch t, DataOutputPlus out, int version) throws IOException + public void serialize(FetchMinEpoch t, DataOutputPlus out, int version) { - out.writeUnsignedVInt32(t.ranges.size()); - for (TokenRange range : t.ranges) - TokenRange.serializer.serialize(range, out, version); } @Override - public FetchMinEpoch deserialize(DataInputPlus in, int version) throws IOException + public FetchMinEpoch deserialize(DataInputPlus in, int version) { - int size = in.readUnsignedVInt32(); - List ranges = new ArrayList<>(size); - for (int i = 0; i < size; i++) - ranges.add(TokenRange.serializer.deserialize(in, version)); - return new FetchMinEpoch(ranges); + return FetchMinEpoch.instance; } @Override public long serializedSize(FetchMinEpoch t, int version) { - long size = TypeSizes.sizeofUnsignedVInt(t.ranges.size()); - for (TokenRange range : t.ranges) - size += TokenRange.serializer.serializedSize(range, version); - return size; + return 0; } }; public static final IVerbHandler handler = message -> { if (AccordService.started()) { - Long epoch = AccordService.instance().minEpoch(message.payload.ranges); + Long epoch = AccordService.instance().minEpoch(); MessagingService.instance().respond(new Response(epoch), message); } else @@ -96,41 +85,15 @@ public long serializedSize(FetchMinEpoch t, int version) } }; - public final Collection ranges; - - public FetchMinEpoch(Collection ranges) - { - this.ranges = ranges; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FetchMinEpoch that = (FetchMinEpoch) o; - return Objects.equals(ranges, that.ranges); - } - - @Override - public int hashCode() - { - return Objects.hash(ranges); - } - - @Override - public String toString() + private FetchMinEpoch() { - return "FetchMinEpoch{" + - "ranges=" + ranges + - '}'; } - public static Future fetch(SharedContext context, Map> peers) + public static Future fetch(SharedContext context, Set peers) { List> accum = new ArrayList<>(peers.size()); - for (Map.Entry> e : peers.entrySet()) - accum.add(fetch(context, e.getKey(), e.getValue())); + for (InetAddressAndPort peer : peers) + accum.add(fetch(context, peer)); // TODO (required): we are collecting only successes, but we need some threshold return FutureCombiner.successfulOf(accum).map(epochs -> { Long min = null; @@ -145,21 +108,22 @@ public static Future fetch(SharedContext context, Map fetch(SharedContext context, InetAddressAndPort to, Set value) + static Future fetch(SharedContext context, InetAddressAndPort to) { - FetchMinEpoch req = new FetchMinEpoch(value); - return context.messaging().sendWithRetries(Backoff.NO_OP.INSTANCE, - MessageDelivery.ImmediateRetryScheduler.instance, - Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req, - Iterators.cycle(to), - RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value), - RetryErrorMessage.EMPTY) + Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().minEpochSyncRetry); + return context.messaging().sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_MIN_EPOCH_REQ, + FetchMinEpoch.instance, + Iterators.cycle(to), + RetryPredicate.ALWAYS_RETRY, + RetryErrorMessage.EMPTY) .map(m -> m.payload.minEpoch); } public static class Response { - public static final IVersionedSerializer serializer = new IVersionedSerializer() + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override public void serialize(Response t, DataOutputPlus out, int version) throws IOException diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java b/src/java/org/apache/cassandra/service/accord/FetchTopology.java index d40d2654ea25..dc8df4f4548c 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java +++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java @@ -22,6 +22,9 @@ import java.util.Collection; import accord.topology.Topology; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -33,10 +36,18 @@ import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.service.accord.serializers.TopologySerializers; +import org.apache.cassandra.utils.Backoff; import org.apache.cassandra.utils.concurrent.Future; public class FetchTopology { + public String toString() + { + return "FetchTopology{" + + "epoch=" + epoch + + '}'; + } + private final long epoch; public static final IVersionedSerializer serializer = new IVersionedSerializer<>() @@ -67,34 +78,20 @@ public FetchTopology(long epoch) public static class Response { - private static Response UNKNOWN = new Response(-1, null) { - public String toString() - { - return "UNKNOWN_TOPOLOGY{}"; - } - }; - // TODO (required): messaging version after version patch public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override public void serialize(Response t, DataOutputPlus out, int version) throws IOException { - if (t == UNKNOWN) - { - out.writeLong(-1); - return; - } - out.writeLong(t.epoch); + out.writeUnsignedVInt(t.epoch); TopologySerializers.topology.serialize(t.topology, out, version); } @Override public Response deserialize(DataInputPlus in, int version) throws IOException { - long epoch = in.readLong(); - if (epoch == -1) - return UNKNOWN; + long epoch = in.readUnsignedVInt(); Topology topology = TopologySerializers.topology.deserialize(in, version); return new Response(epoch, topology); } @@ -102,10 +99,8 @@ public Response deserialize(DataInputPlus in, int version) throws IOException @Override public long serializedSize(Response t, int version) { - if (t == UNKNOWN) - return Long.BYTES; - - return Long.BYTES + TopologySerializers.topology.serializedSize(t.topology, version); + return TypeSizes.sizeofUnsignedVInt(t.epoch) + + TopologySerializers.topology.serializedSize(t.topology, version); } }; @@ -121,20 +116,25 @@ public Response(long epoch, Topology topology) public static final IVerbHandler handler = message -> { long epoch = message.payload.epoch; - Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); - if (topology == null) - MessagingService.instance().respond(Response.UNKNOWN, message); - else + + Topology topology; + if (AccordService.isSetup() && (topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch)) != null) MessagingService.instance().respond(new Response(epoch, topology), message); + else + MessagingService.instance().respondWithFailure(RequestFailure.UNKNOWN_TOPOLOGY, message); }; public static Future fetch(SharedContext context, Collection peers, long epoch) { - FetchTopology req = new FetchTopology(epoch); - return context.messaging().sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req, MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers), - // If the epoch is already discovered, no need to retry - (attempt, from, failure) -> AccordService.instance().currentEpoch() < epoch, - MessageDelivery.RetryErrorMessage.EMPTY) + FetchTopology request = new FetchTopology(epoch); + Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().fetchRetry); + return context.messaging().sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_TOPOLOGY_REQ, + request, + MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers, Verb.ACCORD_FETCH_TOPOLOGY_REQ.name()), + (attempt, from, failure) -> true, + MessageDelivery.RetryErrorMessage.EMPTY) .map(m -> m.payload.topology); } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 14fb94b6b453..865473d76c48 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.accord; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -180,7 +179,7 @@ public CompactionInfo(Int2ObjectHashMap redundantBefores, Int2O List debugTxnBlockedGraph(TxnId txnId); @Nullable - Long minEpoch(Collection ranges); + Long minEpoch(); void tryMarkRemoved(Topology topology, Node.Id node); void awaitTableDrop(TableId id); @@ -326,7 +325,7 @@ public List debugTxnBlockedGraph(TxnId txnId) @Nullable @Override - public Long minEpoch(Collection ranges) + public Long minEpoch() { return null; } @@ -513,9 +512,9 @@ public List debugTxnBlockedGraph(TxnId txnId) @Nullable @Override - public Long minEpoch(Collection ranges) + public Long minEpoch() { - return delegate.minEpoch(ranges); + return delegate.minEpoch(); } @Override diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index ceb8d911ecb0..1d1d1742bea6 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -80,6 +80,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.AccordSpec", "org.apache.cassandra.config.AccordSpec$JournalSpec", "org.apache.cassandra.config.AccordSpec$MinEpochRetrySpec", + "org.apache.cassandra.config.AccordSpec$FetchRetrySpec", "org.apache.cassandra.config.AccordSpec$TransactionalRangeMigration", "org.apache.cassandra.config.AccordSpec$QueueShardModel", "org.apache.cassandra.config.AccordSpec$QueueSubmissionModel", diff --git a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java index 7bfe09b15f66..f06df7e5e337 100644 --- a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java +++ b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.accord; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +27,7 @@ import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; @@ -36,18 +36,13 @@ import accord.utils.RandomSource; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.RetrySpec; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.io.IVersionedSerializers; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.SimulatedMessageDelivery.Action; -import org.apache.cassandra.service.accord.api.AccordRoutingKey.RoutingKeyKind; -import org.apache.cassandra.utils.AccordGenerators; -import org.apache.cassandra.utils.CassandraGenerators; import org.apache.cassandra.utils.SimulatedMiniCluster; import org.apache.cassandra.utils.SimulatedMiniCluster.Node; import org.apache.cassandra.utils.concurrent.Future; @@ -55,7 +50,6 @@ import static accord.utils.Property.qt; import static org.apache.cassandra.net.MessagingService.Version.VERSION_51; -import static org.apache.cassandra.utils.AccordGenerators.fromQT; import static org.assertj.core.api.Assertions.assertThat; public class FetchMinEpochTest @@ -74,24 +68,6 @@ private static void boundedRetries(int retries) DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts = new RetrySpec.MaxAttempt(retries); } - @Test - public void requestSerde() - { - DataOutputBuffer output = new DataOutputBuffer(); - Gen gen = fromQT(CassandraGenerators.partitioners()) - .map(CassandraGenerators::simplify) - .flatMap(partitioner -> - Gens.lists(AccordGenerators.range(partitioner) - .map(r -> (TokenRange) r)) - .ofSizeBetween(0, 10) - .map(FetchMinEpoch::new)); - qt().forAll(gen).check(req -> { - maybeSetPartitioner(req); - for (MessagingService.Version version : SUPPORTED) - IVersionedSerializers.testSerde(output, FetchMinEpoch.serializer, req, version.value); - }); - } - @Test public void responseSerde() { @@ -115,12 +91,12 @@ public void fetchOneNodeAlwaysFails() Node from = cluster.createNodeAndJoin(); Node to = cluster.createNodeAndJoin(); - Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort(), Collections.emptySet()); + Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort()); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); - MessageDelivery.FailedResponseException maxRetries = getFailedResponseException(f); - Assertions.assertThat(maxRetries.failure).isEqualTo(RequestFailure.TIMEOUT); + MessageDelivery.MaxRetriesException maxRetries = getMaxRetriesException(f); + Assertions.assertThat(maxRetries.attempts).isEqualTo(expectedMaxAttempts); }); } @@ -139,7 +115,7 @@ public void fetchOneNode() } Node to = cluster.createNodeAndJoin(); - Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort(), Collections.emptySet()); + Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort()); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); @@ -161,10 +137,10 @@ public void fetchManyNodesAllNodesFail() Node to3 = cluster.createNodeAndJoin(); Node to4 = cluster.createNodeAndJoin(); - Future f = FetchMinEpoch.fetch(from, ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(), - to2.broadcastAddressAndPort(), Collections.emptySet(), - to3.broadcastAddressAndPort(), Collections.emptySet(), - to4.broadcastAddressAndPort(), Collections.emptySet())); + Future f = FetchMinEpoch.fetch(from, ImmutableSet.of(to1.broadcastAddressAndPort(), + to2.broadcastAddressAndPort(), + to3.broadcastAddressAndPort(), + to4.broadcastAddressAndPort())); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); @@ -201,10 +177,10 @@ public void fetchManyNodes() to4.broadcastAddressAndPort(), actionGen(rs, maxRetries)); from.messagingActions((self, msg, to) -> nodeToActions.get(to).get()); - Future f = FetchMinEpoch.fetch(from, ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(), - to2.broadcastAddressAndPort(), Collections.emptySet(), - to3.broadcastAddressAndPort(), Collections.emptySet(), - to4.broadcastAddressAndPort(), Collections.emptySet())); + Future f = FetchMinEpoch.fetch(from, ImmutableSet.of(to1.broadcastAddressAndPort(), + to2.broadcastAddressAndPort(), + to3.broadcastAddressAndPort(), + to4.broadcastAddressAndPort())); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); @@ -235,53 +211,6 @@ public Action next(RandomSource rng) return safeActionGen.asSupplier(actionSource); } - private static void maybeSetPartitioner(FetchMinEpoch req) - { - IPartitioner partitioner = null; - for (TokenRange r : req.ranges) - { - IPartitioner rangePartitioner = null; - if (r.start().kindOfRoutingKey() != RoutingKeyKind.SENTINEL) - rangePartitioner = r.start().token().getPartitioner(); - if (rangePartitioner == null && r.end().kindOfRoutingKey() != RoutingKeyKind.SENTINEL) - rangePartitioner = r.end().token().getPartitioner(); - if (rangePartitioner == null) - continue; - if (partitioner == null) - { - partitioner = rangePartitioner; - } - else - { - Assertions.assertThat(rangePartitioner).isEqualTo(partitioner); - } - } - if (partitioner != null) - DatabaseDescriptor.setPartitionerUnsafe(partitioner); - } - - private static MessageDelivery.FailedResponseException getFailedResponseException(Future f) throws InterruptedException, ExecutionException - { - MessageDelivery.FailedResponseException exception; - try - { - f.get(); - Assert.fail("Future should have failed"); - throw new AssertionError("Unreachable"); - } - catch (ExecutionException e) - { - if (e.getCause() instanceof MessageDelivery.FailedResponseException) - { - exception = (MessageDelivery.FailedResponseException) e.getCause(); - } - else - { - throw e; - } - } - return exception; - } private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future f) throws InterruptedException, ExecutionException {