Skip to content

Commit 501640c

Browse files
authored
Merge pull request scylladb#256 from Lorak-mmk/fix-168-v3
Fix wait_for_schema_agreement deadlock
2 parents d12d2c1 + 01383bc commit 501640c

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

cassandra/cluster.py

+12-10
Original file line numberDiff line numberDiff line change
@@ -2009,6 +2009,17 @@ def _start_reconnector(self, host, is_host_addition):
20092009
reconnector.start()
20102010

20112011
@run_in_executor
2012+
def on_down_potentially_blocking(self, host, is_host_addition):
2013+
self.profile_manager.on_down(host)
2014+
self.control_connection.on_down(host)
2015+
for session in tuple(self.sessions):
2016+
session.on_down(host)
2017+
2018+
for listener in self.listeners:
2019+
listener.on_down(host)
2020+
2021+
self._start_reconnector(host, is_host_addition)
2022+
20122023
def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20132024
"""
20142025
Intended for internal use only.
@@ -2034,18 +2045,9 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20342045
host.set_down()
20352046
if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
20362047
return
2037-
20382048
log.warning("Host %s has been marked down", host)
20392049

2040-
self.profile_manager.on_down(host)
2041-
self.control_connection.on_down(host)
2042-
for session in tuple(self.sessions):
2043-
session.on_down(host)
2044-
2045-
for listener in self.listeners:
2046-
listener.on_down(host)
2047-
2048-
self._start_reconnector(host, is_host_addition)
2050+
self.on_down_potentially_blocking(host, is_host_addition)
20492051

20502052
def on_add(self, host, refresh_nodes=True):
20512053
if self.is_shutdown:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import os
2+
import logging
3+
import unittest
4+
5+
from tests.integration import use_cluster, get_node, local, TestCluster
6+
7+
LOGGER = logging.getLogger(__name__)
8+
9+
10+
def setup_module():
11+
use_cluster('test_concurrent_schema_change_and_node_kill', [3], start=True)
12+
13+
@local
14+
class TestConcurrentSchemaChangeAndNodeKill(unittest.TestCase):
15+
@classmethod
16+
def setup_class(cls):
17+
cls.cluster = TestCluster(max_schema_agreement_wait=120)
18+
cls.session = cls.cluster.connect()
19+
20+
@classmethod
21+
def teardown_class(cls):
22+
cls.cluster.shutdown()
23+
24+
def test_schema_change_after_node_kill(self):
25+
node2 = get_node(2)
26+
self.session.execute(
27+
"DROP KEYSPACE IF EXISTS ks_deadlock;")
28+
self.session.execute(
29+
"CREATE KEYSPACE IF NOT EXISTS ks_deadlock "
30+
"WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2' };")
31+
self.session.set_keyspace('ks_deadlock')
32+
self.session.execute("CREATE TABLE IF NOT EXISTS some_table(k int, c int, v int, PRIMARY KEY (k, v));")
33+
self.session.execute("INSERT INTO some_table (k, c, v) VALUES (1, 2, 3);")
34+
node2.stop(wait=False, gently=False)
35+
self.session.execute("ALTER TABLE some_table ADD v2 int;", timeout=180)
36+
print(self.session.execute("SELECT * FROM some_table WHERE k = 1;").all())

0 commit comments

Comments
 (0)