Skip to content

Commit 960cb95

Browse files
Add integration tests
1 parent f9f2610 commit 960cb95

File tree

3 files changed

+172
-4
lines changed

3 files changed

+172
-4
lines changed

Diff for: ci/run_integration_test.sh

+15-1
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,19 @@ echo "export SCYLLA_VERSION=${SCYLLA_RELEASE}"
4242
echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append tests/integration/standard/"
4343
export SCYLLA_VERSION=${SCYLLA_RELEASE}
4444
export MAPPED_SCYLLA_VERSION=3.11.4
45-
PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=libev pytest -rf --import-mode append $*
45+
export EXPERIMENTAL_TABLETS=false
46+
PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=libev pytest -rf --import-mode append $* --ignore tests/integration/standard/test_tablets.py
4647

48+
# download version
49+
50+
export SCYLLA_UNSTABLE='unstable/master:2023-07-31T05:54:06Z'
51+
export SCYLLA_VERSION=${SCYLLA_UNSTABLE}
52+
53+
ccm create scylla-driver-tablets-temp -n 3 --scylla --version ${SCYLLA_UNSTABLE}
54+
ccm remove
55+
56+
# run tablet tests
57+
58+
export EXPERIMENTAL_TABLETS=true
59+
60+
PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=libev pytest -rf --import-mode append tests/integration/standard/test_tablets.py

Diff for: tests/integration/__init__.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ def _id_and_mark(f):
372372
# 1. unittest doesn't skip setUpClass when used on class and we need it sometimes
373373
# 2. unittest doesn't have conditional xfail, and I prefer to use pytest than custom decorator
374374
# 3. unittest doesn't have a reason argument, so you don't see the reason in pytest report
375-
requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2'),
375+
requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and (len(SCYLLA_VERSION.split('/')) != 0 or Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2')),
376376
reason='Scylla supports collection indexes from 5.2 onwards')
377377
requires_custom_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None,
378378
reason='Scylla does not support SASI or any other CUSTOM INDEX class')
@@ -501,7 +501,7 @@ def start_cluster_wait_for_up(cluster):
501501

502502

503503
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, set_keyspace=True, ccm_options=None,
504-
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE):
504+
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE, use_tablets=False):
505505
configuration_options = configuration_options or {}
506506
dse_options = dse_options or {}
507507
workloads = workloads or []
@@ -611,7 +611,10 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,
611611
# CDC is causing an issue (can't start cluster with multiple seeds)
612612
# Selecting only features we need for tests, i.e. anything but CDC.
613613
CCM_CLUSTER = CCMScyllaCluster(path, cluster_name, **ccm_options)
614-
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True})
614+
if use_tablets:
615+
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf', 'consistent-topology-changes', 'tablets'], 'start_native_transport': True})
616+
else:
617+
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True})
615618
else:
616619
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
617620
CCM_CLUSTER.set_configuration_options({'start_native_transport': True})

Diff for: tests/integration/standard/test_tablets.py

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import time
2+
import unittest
3+
import pytest
4+
import os
5+
from cassandra.cluster import Cluster
6+
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
7+
8+
from tests.integration import PROTOCOL_VERSION, use_cluster
9+
from tests.unit.test_host_connection_pool import LOGGER
10+
11+
def setup_module():
12+
use_cluster('tablets', [3], start=True, use_tablets=True)
13+
14+
class TestTabletsIntegration(unittest.TestCase):
15+
@classmethod
16+
def setup_class(cls):
17+
cls.cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
18+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
19+
reconnection_policy=ConstantReconnectionPolicy(1))
20+
cls.session = cls.cluster.connect()
21+
22+
@classmethod
23+
def teardown_class(cls):
24+
cls.cluster.shutdown()
25+
26+
def verify_same_host_in_tracing(self, results):
27+
traces = results.get_query_trace()
28+
events = traces.events
29+
host_set = set()
30+
for event in events:
31+
LOGGER.info("%s %s %s", event.source, event.thread_name, event.description)
32+
host_set.add(event.source)
33+
34+
self.assertEqual(len(host_set), 1)
35+
self.assertIn('querying locally', "\n".join([event.description for event in events]))
36+
37+
trace_id = results.response_future.get_query_trace_ids()[0]
38+
traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,))
39+
events = [event for event in traces]
40+
host_set = set()
41+
for event in events:
42+
LOGGER.info("%s %s", event.source, event.activity)
43+
host_set.add(event.source)
44+
45+
self.assertEqual(len(host_set), 1)
46+
self.assertIn('querying locally', "\n".join([event.activity for event in events]))
47+
48+
def verify_same_shard_in_tracing(self, results):
49+
traces = results.get_query_trace()
50+
events = traces.events
51+
shard_set = set()
52+
for event in events:
53+
LOGGER.info("%s %s %s", event.source, event.thread_name, event.description)
54+
shard_set.add(event.thread_name)
55+
56+
self.assertEqual(len(shard_set), 1)
57+
self.assertIn('querying locally', "\n".join([event.description for event in events]))
58+
59+
trace_id = results.response_future.get_query_trace_ids()[0]
60+
traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,))
61+
events = [event for event in traces]
62+
shard_set = set()
63+
for event in events:
64+
LOGGER.info("%s %s", event.thread, event.activity)
65+
shard_set.add(event.thread)
66+
67+
self.assertEqual(len(shard_set), 1)
68+
self.assertIn('querying locally', "\n".join([event.activity for event in events]))
69+
70+
def create_ks_and_cf(self):
71+
self.session.execute(
72+
"""
73+
DROP KEYSPACE IF EXISTS test1
74+
"""
75+
)
76+
self.session.execute(
77+
"""
78+
CREATE KEYSPACE test1
79+
WITH replication = {
80+
'class': 'NetworkTopologyStrategy',
81+
'replication_factor': 1,
82+
'initial_tablets': 8
83+
}
84+
""")
85+
86+
self.session.execute(
87+
"""
88+
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
89+
""")
90+
91+
@staticmethod
92+
def create_data(session):
93+
prepared = session.prepare(
94+
"""
95+
INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?)
96+
""")
97+
98+
for i in range(50):
99+
bound = prepared.bind((i, i%5, i%2))
100+
session.execute(bound)
101+
102+
def query_data_shard(self, session, verify_in_tracing=True):
103+
prepared = session.prepare(
104+
"""
105+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
106+
""")
107+
108+
bound = prepared.bind([(2)])
109+
results = session.execute(bound, trace=True)
110+
self.assertEqual(results, [(2, 2, 0)])
111+
if verify_in_tracing:
112+
self.verify_same_shard_in_tracing(results)
113+
114+
def query_data_host(self, session, verify_in_tracing=True):
115+
prepared = session.prepare(
116+
"""
117+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
118+
""")
119+
120+
bound = prepared.bind([(2)])
121+
results = session.execute(bound, trace=True)
122+
self.assertEqual(results, [(2, 2, 0)])
123+
if verify_in_tracing:
124+
self.verify_same_host_in_tracing(results)
125+
126+
def test_tablets(self):
127+
self.create_ks_and_cf()
128+
self.create_data(self.session)
129+
self.query_data_host(self.session)
130+
131+
def test_tablets_shard_awareness(self):
132+
self.query_data_shard(self.session)
133+
134+
def test_tablets_refresh(self):
135+
tablets1 = self.cluster.metadata.all_tablets()
136+
137+
LOGGER.info(tablets1)
138+
139+
self.session.execute(
140+
"""
141+
CREATE TABLE test1.table2 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
142+
""")
143+
144+
time.sleep(90)
145+
146+
tablets2 = self.cluster.metadata.all_tablets()
147+
LOGGER.info(tablets2)
148+
149+
self.assertTrue(len(tablets1) < len(tablets2))
150+
151+

0 commit comments

Comments
 (0)