Skip to content

Commit 55ea9bf

Browse files
Add integration tests
1 parent 7796e9c commit 55ea9bf

File tree

5 files changed

+166
-10
lines changed

5 files changed

+166
-10
lines changed

Diff for: .github/workflows/integration-tests.yml

+6
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,10 @@ jobs:
2121

2222
- name: Test with pytest
2323
run: |
24+
export SCYLLA_VERSION='release:5.1'
2425
./ci/run_integration_test.sh tests/integration/standard/ tests/integration/cqlengine/
26+
27+
- name: Test tablets
28+
run: |
29+
export SCYLLA_VERSION='unstable/master:2023-07-31T05:54:06Z'
30+
./ci/run_integration_test.sh tests/integration/experiments/

Diff for: ci/run_integration_test.sh

+2-7
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ if (( aio_max_nr != aio_max_nr_recommended_value )); then
1515
fi
1616
fi
1717

18-
SCYLLA_RELEASE='release:5.1'
19-
2018
python3 -m venv .test-venv
2119
source .test-venv/bin/activate
2220
pip install -U pip wheel setuptools
@@ -33,14 +31,11 @@ pip install https://github.com/scylladb/scylla-ccm/archive/master.zip
3331

3432
# download version
3533

36-
ccm create scylla-driver-temp -n 1 --scylla --version ${SCYLLA_RELEASE}
34+
ccm create scylla-driver-temp -n 1 --scylla --version ${SCYLLA_VERSION}
3735
ccm remove
3836

3937
# run test
4038

41-
echo "export SCYLLA_VERSION=${SCYLLA_RELEASE}"
42-
echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append tests/integration/standard/"
43-
export SCYLLA_VERSION=${SCYLLA_RELEASE}
39+
echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append $*"
4440
export MAPPED_SCYLLA_VERSION=3.11.4
4541
PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=libev pytest -rf --import-mode append $*
46-

Diff for: tests/integration/__init__.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,8 @@ def _id_and_mark(f):
372372
# 1. unittest doesn't skip setUpClass when used on class and we need it sometimes
373373
# 2. unittest doesn't have conditional xfail, and I prefer to use pytest than custom decorator
374374
# 3. unittest doesn't have a reason argument, so you don't see the reason in pytest report
375-
requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2'),
375+
# TODO remove second check when we stop using unstable version in CI for tablets
376+
requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and (len(SCYLLA_VERSION.split('/')) != 0 or Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2')),
376377
reason='Scylla supports collection indexes from 5.2 onwards')
377378
requires_custom_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None,
378379
reason='Scylla does not support SASI or any other CUSTOM INDEX class')
@@ -501,7 +502,7 @@ def start_cluster_wait_for_up(cluster):
501502

502503

503504
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, set_keyspace=True, ccm_options=None,
504-
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE):
505+
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE, use_tablets=False):
505506
configuration_options = configuration_options or {}
506507
dse_options = dse_options or {}
507508
workloads = workloads or []
@@ -611,7 +612,10 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,
611612
# CDC is causing an issue (can't start cluster with multiple seeds)
612613
# Selecting only features we need for tests, i.e. anything but CDC.
613614
CCM_CLUSTER = CCMScyllaCluster(path, cluster_name, **ccm_options)
614-
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True})
615+
if use_tablets:
616+
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf', 'consistent-topology-changes', 'tablets'], 'start_native_transport': True})
617+
else:
618+
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True})
615619
else:
616620
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
617621
CCM_CLUSTER.set_configuration_options({'start_native_transport': True})

Diff for: tests/integration/experiments/test_tablets.py

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import time
2+
import unittest
3+
import pytest
4+
import os
5+
from cassandra.cluster import Cluster
6+
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
7+
8+
from tests.integration import PROTOCOL_VERSION, use_cluster
9+
from tests.unit.test_host_connection_pool import LOGGER
10+
11+
def setup_module():
12+
use_cluster('tablets', [3], start=True, use_tablets=True)
13+
14+
class TestTabletsIntegration(unittest.TestCase):
15+
@classmethod
16+
def setup_class(cls):
17+
cls.cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
18+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
19+
reconnection_policy=ConstantReconnectionPolicy(1), experimental_tablet_feature_enabled=True, experimental_tablet_refresh_time=1)
20+
cls.session = cls.cluster.connect()
21+
cls.create_ks_and_cf(cls)
22+
cls.create_data(cls.session)
23+
24+
@classmethod
25+
def teardown_class(cls):
26+
cls.cluster.shutdown()
27+
28+
def verify_same_host_in_tracing(self, results):
29+
traces = results.get_query_trace()
30+
events = traces.events
31+
host_set = set()
32+
for event in events:
33+
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
34+
host_set.add(event.source)
35+
36+
self.assertEqual(len(host_set), 1)
37+
self.assertIn('querying locally', "\n".join([event.description for event in events]))
38+
39+
trace_id = results.response_future.get_query_trace_ids()[0]
40+
traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,))
41+
events = [event for event in traces]
42+
host_set = set()
43+
for event in events:
44+
LOGGER.info("TRACE EVENT: %s %s", event.source, event.activity)
45+
host_set.add(event.source)
46+
47+
self.assertEqual(len(host_set), 1)
48+
self.assertIn('querying locally', "\n".join([event.activity for event in events]))
49+
50+
def verify_same_shard_in_tracing(self, results):
51+
traces = results.get_query_trace()
52+
events = traces.events
53+
shard_set = set()
54+
for event in events:
55+
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
56+
shard_set.add(event.thread_name)
57+
58+
self.assertEqual(len(shard_set), 1)
59+
self.assertIn('querying locally', "\n".join([event.description for event in events]))
60+
61+
trace_id = results.response_future.get_query_trace_ids()[0]
62+
traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,))
63+
events = [event for event in traces]
64+
shard_set = set()
65+
for event in events:
66+
LOGGER.info("TRACE EVENT: %s %s", event.thread, event.activity)
67+
shard_set.add(event.thread)
68+
69+
self.assertEqual(len(shard_set), 1)
70+
self.assertIn('querying locally', "\n".join([event.activity for event in events]))
71+
72+
def create_ks_and_cf(self):
73+
self.session.execute(
74+
"""
75+
DROP KEYSPACE IF EXISTS test1
76+
"""
77+
)
78+
self.session.execute(
79+
"""
80+
CREATE KEYSPACE test1
81+
WITH replication = {
82+
'class': 'NetworkTopologyStrategy',
83+
'replication_factor': 1,
84+
'initial_tablets': 8
85+
}
86+
""")
87+
88+
self.session.execute(
89+
"""
90+
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
91+
""")
92+
93+
@staticmethod
94+
def create_data(session):
95+
prepared = session.prepare(
96+
"""
97+
INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?)
98+
""")
99+
100+
for i in range(50):
101+
bound = prepared.bind((i, i%5, i%2))
102+
session.execute(bound)
103+
104+
def query_data_shard(self, session, verify_in_tracing=True):
105+
prepared = session.prepare(
106+
"""
107+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
108+
""")
109+
110+
bound = prepared.bind([(2)])
111+
results = session.execute(bound, trace=True)
112+
self.assertEqual(results, [(2, 2, 0)])
113+
if verify_in_tracing:
114+
self.verify_same_shard_in_tracing(results)
115+
116+
def query_data_host(self, session, verify_in_tracing=True):
117+
prepared = session.prepare(
118+
"""
119+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
120+
""")
121+
122+
bound = prepared.bind([(2)])
123+
results = session.execute(bound, trace=True)
124+
self.assertEqual(results, [(2, 2, 0)])
125+
if verify_in_tracing:
126+
self.verify_same_host_in_tracing(results)
127+
128+
def test_tablets(self):
129+
self.query_data_host(self.session)
130+
131+
def test_tablets_shard_awareness(self):
132+
self.query_data_shard(self.session)
133+
134+
def test_tablets_refresh(self):
135+
tablets1 = self.cluster.metadata._tablets._tablets
136+
137+
self.session.execute(
138+
"""
139+
CREATE TABLE test1.table2 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
140+
""")
141+
142+
time.sleep(2)
143+
144+
tablets2 = self.cluster.metadata._tablets._tablets
145+
146+
self.assertTrue(len(tablets1) < len(tablets2))
147+
148+

Diff for: tests/unit/test_policies.py

+3
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,9 @@ class FakeCluster:
602602
def __init__(self):
603603
self.metadata = Mock(spec=Metadata)
604604

605+
def check_tablets_enabled(self):
606+
return False
607+
605608
def test_get_distance(self):
606609
"""
607610
Same test as DCAwareRoundRobinPolicyTest.test_get_distance()

0 commit comments

Comments
 (0)