Skip to content

Commit 6be92d2

Browse files
committed
Address Ariel's and David's comments
1 parent 85c4d9e commit 6be92d2

File tree

9 files changed

+96
-87
lines changed

9 files changed

+96
-87
lines changed

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
3-
url = https://github.com/apache/cassandra-accord.git
4-
branch = trunk
3+
url = https://github.com/ifesdjeen/cassandra-accord.git
4+
branch = CASSANDRA-20245

src/java/org/apache/cassandra/config/AccordSpec.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,15 +194,17 @@ public enum TransactionalRangeMigration
194194
public boolean ephemeralReadEnabled = true;
195195
public boolean state_cache_listener_jfr_enabled = true;
196196
public final JournalSpec journal = new JournalSpec();
197-
public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec();
198-
199-
public static class MinEpochRetrySpec extends RetrySpec
200-
{
201-
public MinEpochRetrySpec()
197+
public final RetrySpec minEpochSyncRetry = new RetrySpec() {
202198
{
203199
maxAttempts = new MaxAttempt(3);
204200
}
205-
}
201+
};
202+
203+
public final RetrySpec fetchRetry = new RetrySpec() {
204+
{
205+
maxAttempts = new MaxAttempt(100);
206+
}
207+
};
206208

207209
public static class JournalSpec implements Params
208210
{

src/java/org/apache/cassandra/net/MessageDelivery.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,6 @@ public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Backoff backoff,
9999
return promise;
100100
}
101101

102-
public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Verb verb, REQ request,
103-
Iterator<InetAddressAndPort> candidates,
104-
RetryPredicate shouldRetry,
105-
RetryErrorMessage errorMessage)
106-
{
107-
return sendWithRetries(Backoff.NO_OP.INSTANCE, ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry, errorMessage);
108-
}
109-
110102
public default <REQ, RSP> void sendWithRetries(Backoff backoff, RetryScheduler retryThreads,
111103
Verb verb, REQ request,
112104
Iterator<InetAddressAndPort> candidates,

src/java/org/apache/cassandra/net/MessagingUtils.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,31 @@
1818

1919
package org.apache.cassandra.net;
2020

21+
import java.util.Collection;
2122
import java.util.Iterator;
2223

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
2327
import org.apache.cassandra.locator.InetAddressAndPort;
2428
import org.apache.cassandra.repair.SharedContext;
2529

2630
public class MessagingUtils
2731
{
32+
private static final Logger logger = LoggerFactory.getLogger(MessagingUtils.class);
33+
2834
/**
2935
* Candidate iterator that would try all endpoints known to be alive first, and then try all endpoints
3036
* in a round-robin manner.
37+
* <p>
38+
* Calls onIteration every time after exhausting the peers.
3139
*/
32-
public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext context, Iterable<InetAddressAndPort> peers)
40+
public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext context, Collection<InetAddressAndPort> peers, String verb)
3341
{
3442
return new Iterator<>()
3543
{
3644
boolean firstRun = true;
45+
int attempt = 0;
3746
Iterator<InetAddressAndPort> iter = peers.iterator();
3847
boolean isEmpty = !iter.hasNext();
3948

@@ -58,10 +67,13 @@ public InetAddressAndPort next()
5867

5968
// After that, cycle through all nodes
6069
if (!iter.hasNext())
70+
{
71+
logger.warn("Exhausted iterator on {} cycling through the set of peers: {} attempt #{}", verb, peers, attempt++);
6172
iter = peers.iterator();
73+
}
6274

6375
return iter.next();
6476
}
6577
};
6678
}
67-
}
79+
}

src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -416,17 +416,17 @@ void maybeReportMetadata(ClusterMetadata metadata)
416416
synchronized (epochs)
417417
{
418418
long maxEpoch = epochs.maxEpoch();
419-
if (maxEpoch >= epoch)
420-
return;
421-
422419
if (maxEpoch == 0)
423420
{
424421
getOrCreateEpochState(epoch); // touch epoch state so subsequent calls see it
425422
reportMetadata(metadata);
426423
return;
427424
}
428425
}
429-
getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata));
426+
427+
// Create a -1 epoch iif we know this epoch may actually exist
428+
if (metadata.epoch.getEpoch() > minEpoch())
429+
getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata));
430430
}
431431

432432
@Override
@@ -436,14 +436,25 @@ protected void fetchTopologyInternal(long epoch)
436436
Stage.ACCORD_MIGRATION.execute(() -> {
437437
if (ClusterMetadata.current().epoch.getEpoch() < epoch)
438438
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));
439+
440+
// In most cases, after fetching log from CMS, we will be caught up to the required epoch.
441+
// This TCM will also notify Accord via reportMetadata, so we do not need to fetch topologies.
442+
// If metadata has reported has skipped one or more eopchs, and is _ahead_ of the requested epoch,
443+
// we need to fetch topologies from peers to fill in the gap.
444+
ClusterMetadata metadata = ClusterMetadata.current();
445+
if (metadata.epoch.getEpoch() == epoch)
446+
return;
447+
439448
try
440449
{
441-
Set<InetAddressAndPort> peers = new HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints());
450+
Set<InetAddressAndPort> peers = new HashSet<>(metadata.directory.allJoinedEndpoints());
442451
peers.remove(FBUtilities.getBroadcastAddressAndPort());
443452
if (peers.isEmpty())
444453
return;
445-
Topology topology;
446-
while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) {}
454+
455+
// TODO (required): fetch only _missing_ topologies.
456+
Topology topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get();
457+
Invariants.require(topology.epoch() == epoch);
447458
reportTopology(topology);
448459
}
449460
catch (InterruptedException e)

src/java/org/apache/cassandra/service/accord/AccordService.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -376,25 +376,39 @@ public synchronized void startup()
376376
try
377377
{
378378
// Fetch topologies up to current
379-
List<Topology> topologies = fetchTopologies(0, metadata);
379+
List<Topology> topologies = fetchTopologies(null, metadata);
380380
for (Topology topology : topologies)
381381
configService.reportTopology(topology);
382382

383383
ClusterMetadataService.instance().log().addListener(configService.listener);
384384
ClusterMetadata next = ClusterMetadata.current();
385385

386386
// if metadata was updated before we were able to add a listener, fetch remaining topologies
387-
if (metadata != next)
387+
if (next.epoch.isAfter(metadata.epoch))
388388
{
389-
topologies = fetchTopologies(metadata.epoch.getEpoch(), next);
389+
topologies = fetchTopologies(metadata.epoch.getEpoch() + 1, next);
390390
for (Topology topology : topologies)
391391
configService.reportTopology(topology);
392392
}
393393

394-
epochReady(metadata.epoch).get();
394+
int attempt = 0;
395+
int waitSeconds = 5;
396+
while (true)
397+
{
398+
try
399+
{
400+
epochReady(metadata.epoch).get(5, SECONDS);
401+
break;
402+
}
403+
catch (TimeoutException e)
404+
{
405+
logger.warn("Epoch {} is not ready after waiting for {} seconds", metadata.epoch, (++attempt) * waitSeconds);
406+
}
407+
}
395408
}
396409
catch (InterruptedException e)
397410
{
411+
Thread.currentThread().interrupt();
398412
throw new UncheckedInterruptedException(e);
399413
}
400414
catch (ExecutionException e)
@@ -417,28 +431,28 @@ public synchronized void startup()
417431
/**
418432
* Queries peers to discover min epoch, and then fetches all topologies between min and current epochs
419433
*/
420-
private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException
434+
private List<Topology> fetchTopologies(Long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException
421435
{
422-
if (configService.maxEpoch() >= metadata.epoch.getEpoch())
423-
{
424-
logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch);
436+
if (minEpoch != null && minEpoch == metadata.epoch.getEpoch())
425437
return Collections.singletonList(AccordTopology.createAccordTopology(metadata));
426-
}
427438

428439
Set<InetAddressAndPort> peers = new HashSet<>();
429440
peers.addAll(metadata.directory.allAddresses());
430441
peers.remove(FBUtilities.getBroadcastAddressAndPort());
431442

432443
// No peers: single node cluster or first node to boot
433444
if (peers.isEmpty())
434-
return Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
445+
return Collections.singletonList(AccordTopology.createAccordTopology(metadata));
435446

436447
// Bootstrap, fetch min epoch
437-
if (minEpoch == 0)
448+
if (minEpoch == null)
438449
{
439-
long fetched = findMinEpoch(SharedContext.Global.instance, peers);
450+
Long fetched = findMinEpoch(SharedContext.Global.instance, peers);
451+
if (fetched != null)
452+
logger.info("Discovered min epoch of {} by querying {}", fetched, peers);
453+
440454
// No other node has advanced epoch just yet
441-
if (fetched == 0)
455+
if (fetched == null || fetched == metadata.epoch.getEpoch())
442456
return Collections.singletonList(AccordTopology.createAccordTopology(metadata));
443457

444458
minEpoch = fetched;
@@ -454,7 +468,7 @@ private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata)
454468
}
455469

456470
List<Future<Topology>> futures = new ArrayList<>();
457-
logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up to {}.", minEpoch, maxEpoch);
471+
logger.info("Fetching topologies for epochs [{}, {}].", minEpoch, maxEpoch);
458472

459473
for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
460474
futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch));
@@ -468,14 +482,11 @@ private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata metadata)
468482
}
469483

470484
@VisibleForTesting
471-
static long findMinEpoch(SharedContext context, Set<InetAddressAndPort> peers)
485+
static Long findMinEpoch(SharedContext context, Set<InetAddressAndPort> peers)
472486
{
473487
try
474488
{
475-
Long result = FetchMinEpoch.fetch(context, peers).get();
476-
if (result == null)
477-
return 0L;
478-
return result.longValue();
489+
return FetchMinEpoch.fetch(context, peers).get();
479490
}
480491
catch (InterruptedException e)
481492
{

src/java/org/apache/cassandra/service/accord/FetchTopology.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,8 @@
2121
import java.io.IOException;
2222
import java.util.Collection;
2323

24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
26-
2724
import accord.topology.Topology;
28-
import org.apache.cassandra.exceptions.RequestFailure;
25+
import org.apache.cassandra.config.DatabaseDescriptor;
2926
import org.apache.cassandra.io.IVersionedSerializer;
3027
import org.apache.cassandra.io.util.DataInputPlus;
3128
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -37,11 +34,18 @@
3734
import org.apache.cassandra.net.Verb;
3835
import org.apache.cassandra.repair.SharedContext;
3936
import org.apache.cassandra.service.accord.serializers.TopologySerializers;
37+
import org.apache.cassandra.utils.Backoff;
4038
import org.apache.cassandra.utils.concurrent.Future;
4139

4240
public class FetchTopology
4341
{
44-
private static final Logger log = LoggerFactory.getLogger(FetchTopology.class);
42+
public String toString()
43+
{
44+
return "FetchTopology{" +
45+
"epoch=" + epoch +
46+
'}';
47+
}
48+
4549
private final long epoch;
4650

4751
public static final IVersionedSerializer<FetchTopology> serializer = new IVersionedSerializer<>()
@@ -72,11 +76,6 @@ public FetchTopology(long epoch)
7276

7377
public static class Response
7478
{
75-
private static Response unkonwn(long epoch)
76-
{
77-
throw new IllegalStateException("Unknown topology: " + epoch);
78-
}
79-
8079
// TODO (required): messaging version after version patch
8180
public static final IVersionedSerializer<Response> serializer = new IVersionedSerializer<>()
8281
{
@@ -115,20 +114,25 @@ public Response(long epoch, Topology topology)
115114
public static final IVerbHandler<FetchTopology> handler = message -> {
116115
long epoch = message.payload.epoch;
117116
Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch);
118-
if (topology == null)
119-
MessagingService.instance().respond(Response.unkonwn(epoch), message);
120-
else
117+
if (topology != null)
121118
MessagingService.instance().respond(new Response(epoch, topology), message);
119+
else
120+
throw new IllegalStateException("Unknown topology: " + epoch);
122121
};
123122

124-
private static final Logger logger = LoggerFactory.getLogger(FetchTopology.class);
125-
126123
public static Future<Topology> fetch(SharedContext context, Collection<InetAddressAndPort> peers, long epoch)
127124
{
128-
FetchTopology req = new FetchTopology(epoch);
129-
return context.messaging().<FetchTopology, Response>sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req,
130-
MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers),
131-
(attempt, from, failure) -> true,
125+
FetchTopology request = new FetchTopology(epoch);
126+
Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().fetchRetry);
127+
return context.messaging().<FetchTopology, Response>sendWithRetries(backoff,
128+
context.optionalTasks()::schedule,
129+
Verb.ACCORD_FETCH_TOPOLOGY_REQ,
130+
request,
131+
MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers, Verb.ACCORD_FETCH_TOPOLOGY_REQ.name()),
132+
(attempt, from, failure) -> {
133+
System.out.println("Got " + failure + " from " + from + " while fetching " + request);
134+
return true;
135+
},
132136
MessageDelivery.RetryErrorMessage.EMPTY)
133137
.map(m -> m.payload.topology);
134138
}

test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -234,27 +234,4 @@ private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future
234234
}
235235
return maxRetries;
236236
}
237-
238-
private static MessageDelivery.FailedResponseException getFailedResponseException(Future<Long> f) throws InterruptedException, ExecutionException
239-
{
240-
MessageDelivery.FailedResponseException exception;
241-
try
242-
{
243-
f.get();
244-
Assert.fail("Future should have failed");
245-
throw new AssertionError("Unreachable");
246-
}
247-
catch (ExecutionException e)
248-
{
249-
if (e.getCause() instanceof MessageDelivery.FailedResponseException)
250-
{
251-
exception = (MessageDelivery.FailedResponseException) e.getCause();
252-
}
253-
else
254-
{
255-
throw e;
256-
}
257-
}
258-
return exception;
259-
}
260237
}

0 commit comments

Comments
 (0)