Skip to content

Commit 6e31b25

Browse files
committed
Checkpointing a first draft of a AST Model
1 parent 1adbfea commit 6e31b25

File tree

21 files changed

+3874
-28
lines changed

21 files changed

+3874
-28
lines changed

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
3-
url = https://github.com/apache/cassandra-accord.git
4-
branch = trunk
3+
url = https://github.com/dcapwell/cassandra-accord.git
4+
branch = CASSANDRA-20156

src/java/org/apache/cassandra/db/filter/RowFilter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ protected ByteBuffer getValue(TableMetadata metadata, DecoratedKey partitionKey,
536536
return row.clustering().bufferAt(column.position());
537537
default:
538538
Cell<?> cell = row.getCell(column);
539-
return cell == null ? null : cell.buffer();
539+
return cell == null || cell.isTombstone() || !cell.isLive(nowInSec) ? null : cell.buffer();
540540
}
541541
}
542542

@@ -821,7 +821,8 @@ public ByteBuffer getIndexValue()
821821
return CompositeType.build(ByteBufferAccessor.instance, key, value);
822822
}
823823

824-
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row)
824+
@Override
825+
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
825826
{
826827
assert key != null;
827828
// We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
@@ -940,7 +941,8 @@ protected Kind kind()
940941
}
941942

942943
// Filtering by custom expressions isn't supported yet, so just accept any row
943-
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row)
944+
@Override
945+
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
944946
{
945947
return true;
946948
}

test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.regex.Matcher;
4545
import java.util.regex.Pattern;
4646
import java.util.stream.Collectors;
47+
import java.util.stream.IntStream;
4748

4849
import com.google.common.annotations.VisibleForTesting;
4950
import com.google.common.base.Strings;
@@ -54,6 +55,8 @@
5455
import org.slf4j.LoggerFactory;
5556

5657
import accord.primitives.TxnId;
58+
import org.apache.cassandra.db.ColumnFamilyStore;
59+
import org.apache.cassandra.db.Keyspace;
5760
import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
5861
import org.apache.cassandra.dht.Token;
5962
import org.apache.cassandra.distributed.Cluster;
@@ -73,6 +76,8 @@
7376
import org.apache.cassandra.distributed.test.log.TestProcessor;
7477
import org.apache.cassandra.gms.ApplicationState;
7578
import org.apache.cassandra.gms.VersionedValue;
79+
import org.apache.cassandra.index.Index;
80+
import org.apache.cassandra.index.SecondaryIndexManager;
7681
import org.apache.cassandra.io.util.File;
7782
import org.apache.cassandra.locator.InetAddressAndPort;
7883
import org.apache.cassandra.net.Message;
@@ -96,6 +101,7 @@
96101
import org.apache.cassandra.utils.concurrent.AsyncPromise;
97102
import org.apache.cassandra.utils.concurrent.CountDownLatch;
98103
import org.assertj.core.api.Assertions;
104+
import org.awaitility.Awaitility;
99105

100106
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
101107
import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
@@ -628,6 +634,22 @@ public static boolean isMigrating(IInvokableInstance instance)
628634
return instance.callOnInstance(() -> ClusterMetadataService.instance().isMigrating());
629635
}
630636

637+
public static void awaitIndexReady(Cluster cluster, String ks, String table, String name)
638+
{
639+
cluster.forEach(i -> awaitIndexReady(i, ks, table, name));
640+
}
641+
642+
public static void awaitIndexReady(IInvokableInstance instance, String ks, String table, String name)
643+
{
644+
instance.runOnInstance(() -> {
645+
SecondaryIndexManager indexManager = Keyspace.open(ks).getColumnFamilyStore(table).indexManager;
646+
Index index = indexManager.getIndexByName(name);
647+
Awaitility.await("index " + name)
648+
.atMost(1, TimeUnit.MINUTES)
649+
.until(() -> indexManager.isIndexQueryable(index));
650+
});
651+
}
652+
631653
public static interface SerializablePredicate<T> extends Predicate<T>, Serializable
632654
{}
633655

@@ -659,6 +681,11 @@ public static void waitForCMSToQuiesce(ICluster<IInvokableInstance> cluster, int
659681
waitForCMSToQuiesce(cluster, maxEpoch(cluster, cmsNodes));
660682
}
661683

684+
public static Epoch maxEpoch(ICluster<IInvokableInstance> cluster)
685+
{
686+
return maxEpoch(cluster, IntStream.range(1, cluster.size() + 1).toArray());
687+
}
688+
662689
public static Epoch maxEpoch(ICluster<IInvokableInstance> cluster, int[] cmsNodes)
663690
{
664691
Epoch max = null;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test.cql3;
20+
21+
import accord.utils.Property;
22+
import org.apache.cassandra.service.consensus.TransactionalMode;
23+
24+
public class AccordInteropSingleNodeTableWalkTest extends SingleNodeTableWalkTest
25+
{
26+
public AccordInteropSingleNodeTableWalkTest()
27+
{
28+
super(TransactionalMode.full);
29+
}
30+
31+
@Override
32+
protected void preCheck(Property.StatefulBuilder builder)
33+
{
34+
// if a failing seed is detected, populate here
35+
// Example: builder.withSeed(42L);
36+
//TODO (January): checkpoint a failing seed to debug later... accord returns incorrect data for the following SQL
37+
// 16: SELECT * FROM ks3.tbl WHERE token(pk0, pk1) BETWEEN token('-3353292-12-18', 30) AND token('-258969-01-11', -40); -- by token range, on node1, fetch size 1
38+
builder.withSeed(6941301320988278649L);
39+
}
40+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test.cql3;
20+
21+
import accord.utils.Property;
22+
import org.apache.cassandra.service.consensus.TransactionalMode;
23+
24+
public class AccordInteropTokenConflictTest extends TokenConflictTest
25+
{
26+
public AccordInteropTokenConflictTest()
27+
{
28+
super(TransactionalMode.full);
29+
}
30+
31+
@Override
32+
protected void preCheck(Property.StatefulBuilder builder)
33+
{
34+
// if a failing seed is detected, populate here
35+
// Example: builder.withSeed(42L);
36+
//TODO (January): checkpoint a failing seed to debug later... accord returns incorrect data for the following SQL
37+
// 97: SELECT * FROM ks1.tbl WHERE token(pk0) BETWEEN token([8473585318424753772, 1213177836815110536]) AND token([-7320550072265110851, 4691861474352962931]); -- token BETWEEN, rc=-1, start token=-9116738905031522785, end token=1077669339564852589, on node1, fetch size 1
38+
builder.withSeed(3448341964809595261L);
39+
}
40+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test.cql3;
20+
21+
import java.net.InetAddress;
22+
import javax.annotation.Nullable;
23+
24+
import accord.utils.Property;
25+
import accord.utils.RandomSource;
26+
import org.apache.cassandra.db.marshal.InetAddressType;
27+
import org.apache.cassandra.distributed.Cluster;
28+
import org.apache.cassandra.service.consensus.TransactionalMode;
29+
import org.apache.cassandra.utils.AbstractTypeGenerators;
30+
import org.apache.cassandra.utils.AbstractTypeGenerators.TypeSupport;
31+
import org.apache.cassandra.utils.FastByteOperations;
32+
import org.apache.cassandra.utils.Generators;
33+
import org.quicktheories.core.Gen;
34+
35+
public class InetSingleNodeTableWalkTest extends SingleNodeTableWalkTest
36+
{
37+
static
38+
{
39+
IGNORED_ISSUES.remove(KnownIssue.SAI_INET_MIXED);
40+
}
41+
42+
public InetSingleNodeTableWalkTest()
43+
{
44+
this(null);
45+
}
46+
47+
protected InetSingleNodeTableWalkTest(@Nullable TransactionalMode transactionalMode)
48+
{
49+
super(transactionalMode);
50+
}
51+
52+
@Override
53+
protected void preCheck(Property.StatefulBuilder builder)
54+
{
55+
// if a failing seed is detected, populate here
56+
// Example: builder.withSeed(42L);
57+
// builder.withSeed(3985593186746556237L);
58+
59+
builder.withSeed(-7293505339069640960L); // ipv6 allow fitering missing partition
60+
}
61+
62+
@Override
63+
protected AbstractTypeGenerators.TypeGenBuilder supportedTypes()
64+
{
65+
return AbstractTypeGenerators.withoutUnsafeEquality(AbstractTypeGenerators.builder()
66+
.withTypeKinds(AbstractTypeGenerators.TypeKind.PRIMITIVE)
67+
.withPrimitives(InetAddressType.instance));
68+
}
69+
70+
private enum Mode { ipv4, ipv6, mixed }
71+
72+
@Override
73+
protected InetState createState(RandomSource rs, Cluster cluster)
74+
{
75+
// Mode mode = rs.pick(Mode.values());
76+
Mode mode = rs.pick(Mode.ipv4, Mode.ipv6);
77+
// Mode mode = Mode.mixed;
78+
Gen<InetAddress> gen;
79+
switch (mode)
80+
{
81+
case ipv4:
82+
gen = Generators.INET_4_ADDRESS_UNRESOLVED_GEN;
83+
break;
84+
case ipv6:
85+
gen = Generators.INET_6_ADDRESS_UNRESOLVED_GEN;
86+
break;
87+
case mixed:
88+
gen = Generators.INET_ADDRESS_UNRESOLVED_GEN;
89+
break;
90+
default:
91+
throw new UnsupportedOperationException(mode.name());
92+
}
93+
var support = TypeSupport.of(InetAddressType.instance,
94+
gen,
95+
(a, b) -> FastByteOperations.compareUnsigned(a.getAddress(), b.getAddress())); // serialization strips the hostname, only keeps the address
96+
AbstractTypeGenerators.overridePrimitiveTypeSupport(InetAddressType.instance, support);
97+
return new InetState(rs, cluster, mode);
98+
}
99+
100+
public class InetState extends State
101+
{
102+
private final Mode mode;
103+
104+
public InetState(RandomSource rs, Cluster cluster, Mode mode)
105+
{
106+
super(rs, cluster);
107+
this.mode = mode;
108+
}
109+
110+
@Override
111+
public String toString()
112+
{
113+
return "Mode: " + mode + "\n" + super.toString();
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)