From 6be92d29286b9bc167cd9a6ee895a251969f80f7 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 29 Jan 2025 19:53:59 +0100 Subject: [PATCH] Address Ariel's and David's comments --- .gitmodules | 4 +- modules/accord | 2 +- .../apache/cassandra/config/AccordSpec.java | 14 +++--- .../apache/cassandra/net/MessageDelivery.java | 8 --- .../apache/cassandra/net/MessagingUtils.java | 16 +++++- .../accord/AccordConfigurationService.java | 25 +++++++--- .../service/accord/AccordService.java | 49 ++++++++++++------- .../service/accord/FetchTopology.java | 42 +++++++++------- .../service/accord/FetchMinEpochTest.java | 23 --------- 9 files changed, 96 insertions(+), 87 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..a5685a541e16 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/ifesdjeen/cassandra-accord.git + branch = CASSANDRA-20245 diff --git a/modules/accord b/modules/accord index c7a789b1f424..380d741c8d75 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit c7a789b1f424771a4befab6bcb91edd4ab5d7198 +Subproject commit 380d741c8d75f6d5c14e0a627c129762bcec9385 diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index 720a2db7722d..e6f954cb26d2 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -194,15 +194,17 @@ public enum TransactionalRangeMigration public boolean ephemeralReadEnabled = true; public boolean state_cache_listener_jfr_enabled = true; public final JournalSpec journal = new JournalSpec(); - public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec(); - - public static class MinEpochRetrySpec extends RetrySpec - { - public MinEpochRetrySpec() + public final RetrySpec minEpochSyncRetry = new RetrySpec() { { maxAttempts = new MaxAttempt(3); } - } + }; + + public final RetrySpec fetchRetry = new RetrySpec() { + { + maxAttempts = new MaxAttempt(100); + } + }; public static class JournalSpec implements Params { diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java b/src/java/org/apache/cassandra/net/MessageDelivery.java index 9dc374750f60..f9439dbb30d2 100644 --- a/src/java/org/apache/cassandra/net/MessageDelivery.java +++ b/src/java/org/apache/cassandra/net/MessageDelivery.java @@ -99,14 +99,6 @@ public default Future> sendWithRetries(Backoff backoff, return promise; } - public default Future> sendWithRetries(Verb verb, REQ request, - Iterator candidates, - RetryPredicate shouldRetry, - RetryErrorMessage errorMessage) - { - return sendWithRetries(Backoff.NO_OP.INSTANCE, ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry, errorMessage); - } - public default void sendWithRetries(Backoff backoff, RetryScheduler retryThreads, Verb verb, REQ request, Iterator candidates, diff --git a/src/java/org/apache/cassandra/net/MessagingUtils.java b/src/java/org/apache/cassandra/net/MessagingUtils.java index 2190eaf3a655..11735f8d7f66 100644 --- a/src/java/org/apache/cassandra/net/MessagingUtils.java +++ b/src/java/org/apache/cassandra/net/MessagingUtils.java @@ -18,22 +18,31 @@ package org.apache.cassandra.net; +import java.util.Collection; import java.util.Iterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.SharedContext; public class MessagingUtils { + private static final Logger logger = LoggerFactory.getLogger(MessagingUtils.class); + /** * Candidate iterator that would try all endpoints known to be alive first, and then try all endpoints * in a round-robin manner. + *

+ * Calls onIteration every time after exhausting the peers. */ - public static Iterator tryAliveFirst(SharedContext context, Iterable peers) + public static Iterator tryAliveFirst(SharedContext context, Collection peers, String verb) { return new Iterator<>() { boolean firstRun = true; + int attempt = 0; Iterator iter = peers.iterator(); boolean isEmpty = !iter.hasNext(); @@ -58,10 +67,13 @@ public InetAddressAndPort next() // After that, cycle through all nodes if (!iter.hasNext()) + { + logger.warn("Exhausted iterator on {} cycling through the set of peers: {} attempt #{}", verb, peers, attempt++); iter = peers.iterator(); + } return iter.next(); } }; } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index a7673b6f1247..6aa4888e3a15 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -416,9 +416,6 @@ void maybeReportMetadata(ClusterMetadata metadata) synchronized (epochs) { long maxEpoch = epochs.maxEpoch(); - if (maxEpoch >= epoch) - return; - if (maxEpoch == 0) { getOrCreateEpochState(epoch); // touch epoch state so subsequent calls see it @@ -426,7 +423,10 @@ void maybeReportMetadata(ClusterMetadata metadata) return; } } - getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); + + // Create a -1 epoch iif we know this epoch may actually exist + if (metadata.epoch.getEpoch() > minEpoch()) + getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); } @Override @@ -436,14 +436,25 @@ protected void fetchTopologyInternal(long epoch) Stage.ACCORD_MIGRATION.execute(() -> { if (ClusterMetadata.current().epoch.getEpoch() < epoch) ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch)); + + // In most cases, after fetching log from CMS, we will be caught up to the required epoch. + // This TCM will also notify Accord via reportMetadata, so we do not need to fetch topologies. + // If metadata has reported has skipped one or more eopchs, and is _ahead_ of the requested epoch, + // we need to fetch topologies from peers to fill in the gap. + ClusterMetadata metadata = ClusterMetadata.current(); + if (metadata.epoch.getEpoch() == epoch) + return; + try { - Set peers = new HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints()); + Set peers = new HashSet<>(metadata.directory.allJoinedEndpoints()); peers.remove(FBUtilities.getBroadcastAddressAndPort()); if (peers.isEmpty()) return; - Topology topology; - while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) {} + + // TODO (required): fetch only _missing_ topologies. + Topology topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get(); + Invariants.require(topology.epoch() == epoch); reportTopology(topology); } catch (InterruptedException e) diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index bd30d105cecc..6b9c42dd9f56 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -376,7 +376,7 @@ public synchronized void startup() try { // Fetch topologies up to current - List topologies = fetchTopologies(0, metadata); + List topologies = fetchTopologies(null, metadata); for (Topology topology : topologies) configService.reportTopology(topology); @@ -384,17 +384,31 @@ public synchronized void startup() ClusterMetadata next = ClusterMetadata.current(); // if metadata was updated before we were able to add a listener, fetch remaining topologies - if (metadata != next) + if (next.epoch.isAfter(metadata.epoch)) { - topologies = fetchTopologies(metadata.epoch.getEpoch(), next); + topologies = fetchTopologies(metadata.epoch.getEpoch() + 1, next); for (Topology topology : topologies) configService.reportTopology(topology); } - epochReady(metadata.epoch).get(); + int attempt = 0; + int waitSeconds = 5; + while (true) + { + try + { + epochReady(metadata.epoch).get(5, SECONDS); + break; + } + catch (TimeoutException e) + { + logger.warn("Epoch {} is not ready after waiting for {} seconds", metadata.epoch, (++attempt) * waitSeconds); + } + } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e); } catch (ExecutionException e) @@ -417,13 +431,10 @@ public synchronized void startup() /** * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private List fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException + private List fetchTopologies(Long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - if (configService.maxEpoch() >= metadata.epoch.getEpoch()) - { - logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); + if (minEpoch != null && minEpoch == metadata.epoch.getEpoch()) return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); - } Set peers = new HashSet<>(); peers.addAll(metadata.directory.allAddresses()); @@ -431,14 +442,17 @@ private List fetchTopologies(long minEpoch, ClusterMetadata metadata) // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return Collections.singletonList(AccordTopology.createAccordTopology(metadata));; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); // Bootstrap, fetch min epoch - if (minEpoch == 0) + if (minEpoch == null) { - long fetched = findMinEpoch(SharedContext.Global.instance, peers); + Long fetched = findMinEpoch(SharedContext.Global.instance, peers); + if (fetched != null) + logger.info("Discovered min epoch of {} by querying {}", fetched, peers); + // No other node has advanced epoch just yet - if (fetched == 0) + if (fetched == null || fetched == metadata.epoch.getEpoch()) return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); minEpoch = fetched; @@ -454,7 +468,7 @@ private List fetchTopologies(long minEpoch, ClusterMetadata metadata) } List> futures = new ArrayList<>(); - logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up to {}.", minEpoch, maxEpoch); + logger.info("Fetching topologies for epochs [{}, {}].", minEpoch, maxEpoch); for (long epoch = minEpoch; epoch <= maxEpoch; epoch++) futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch)); @@ -468,14 +482,11 @@ private List fetchTopologies(long minEpoch, ClusterMetadata metadata) } @VisibleForTesting - static long findMinEpoch(SharedContext context, Set peers) + static Long findMinEpoch(SharedContext context, Set peers) { try { - Long result = FetchMinEpoch.fetch(context, peers).get(); - if (result == null) - return 0L; - return result.longValue(); + return FetchMinEpoch.fetch(context, peers).get(); } catch (InterruptedException e) { diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java b/src/java/org/apache/cassandra/service/accord/FetchTopology.java index f6c1a518c1f3..08af6cb1fc8a 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java +++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java @@ -21,11 +21,8 @@ import java.io.IOException; import java.util.Collection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import accord.topology.Topology; -import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -37,11 +34,18 @@ import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.service.accord.serializers.TopologySerializers; +import org.apache.cassandra.utils.Backoff; import org.apache.cassandra.utils.concurrent.Future; public class FetchTopology { - private static final Logger log = LoggerFactory.getLogger(FetchTopology.class); + public String toString() + { + return "FetchTopology{" + + "epoch=" + epoch + + '}'; + } + private final long epoch; public static final IVersionedSerializer serializer = new IVersionedSerializer<>() @@ -72,11 +76,6 @@ public FetchTopology(long epoch) public static class Response { - private static Response unkonwn(long epoch) - { - throw new IllegalStateException("Unknown topology: " + epoch); - } - // TODO (required): messaging version after version patch public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @@ -115,20 +114,25 @@ public Response(long epoch, Topology topology) public static final IVerbHandler handler = message -> { long epoch = message.payload.epoch; Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); - if (topology == null) - MessagingService.instance().respond(Response.unkonwn(epoch), message); - else + if (topology != null) MessagingService.instance().respond(new Response(epoch, topology), message); + else + throw new IllegalStateException("Unknown topology: " + epoch); }; - private static final Logger logger = LoggerFactory.getLogger(FetchTopology.class); - public static Future fetch(SharedContext context, Collection peers, long epoch) { - FetchTopology req = new FetchTopology(epoch); - return context.messaging().sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req, - MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers), - (attempt, from, failure) -> true, + FetchTopology request = new FetchTopology(epoch); + Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().fetchRetry); + return context.messaging().sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_TOPOLOGY_REQ, + request, + MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers, Verb.ACCORD_FETCH_TOPOLOGY_REQ.name()), + (attempt, from, failure) -> { + System.out.println("Got " + failure + " from " + from + " while fetching " + request); + return true; + }, MessageDelivery.RetryErrorMessage.EMPTY) .map(m -> m.payload.topology); } diff --git a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java index a8b573c08a49..f06df7e5e337 100644 --- a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java +++ b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java @@ -234,27 +234,4 @@ private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future } return maxRetries; } - - private static MessageDelivery.FailedResponseException getFailedResponseException(Future f) throws InterruptedException, ExecutionException - { - MessageDelivery.FailedResponseException exception; - try - { - f.get(); - Assert.fail("Future should have failed"); - throw new AssertionError("Unreachable"); - } - catch (ExecutionException e) - { - if (e.getCause() instanceof MessageDelivery.FailedResponseException) - { - exception = (MessageDelivery.FailedResponseException) e.getCause(); - } - else - { - throw e; - } - } - return exception; - } } \ No newline at end of file