Skip to content

Commit

Permalink
Merge pull request #256 from Lorak-mmk/fix-168-v3
Browse files Browse the repository at this point in the history
Fix wait_for_schema_agreement deadlock
  • Loading branch information
Lorak-mmk authored Sep 27, 2023
2 parents d12d2c1 + 01383bc commit 501640c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
22 changes: 12 additions & 10 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2009,6 +2009,17 @@ def _start_reconnector(self, host, is_host_addition):
reconnector.start()

@run_in_executor
def on_down_potentially_blocking(self, host, is_host_addition):
self.profile_manager.on_down(host)
self.control_connection.on_down(host)
for session in tuple(self.sessions):
session.on_down(host)

for listener in self.listeners:
listener.on_down(host)

self._start_reconnector(host, is_host_addition)

def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
"""
Intended for internal use only.
Expand All @@ -2034,18 +2045,9 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
host.set_down()
if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
return

log.warning("Host %s has been marked down", host)

self.profile_manager.on_down(host)
self.control_connection.on_down(host)
for session in tuple(self.sessions):
session.on_down(host)

for listener in self.listeners:
listener.on_down(host)

self._start_reconnector(host, is_host_addition)
self.on_down_potentially_blocking(host, is_host_addition)

def on_add(self, host, refresh_nodes=True):
if self.is_shutdown:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import logging
import unittest

from tests.integration import use_cluster, get_node, local, TestCluster

LOGGER = logging.getLogger(__name__)


def setup_module():
use_cluster('test_concurrent_schema_change_and_node_kill', [3], start=True)

@local
class TestConcurrentSchemaChangeAndNodeKill(unittest.TestCase):
@classmethod
def setup_class(cls):
cls.cluster = TestCluster(max_schema_agreement_wait=120)
cls.session = cls.cluster.connect()

@classmethod
def teardown_class(cls):
cls.cluster.shutdown()

def test_schema_change_after_node_kill(self):
node2 = get_node(2)
self.session.execute(
"DROP KEYSPACE IF EXISTS ks_deadlock;")
self.session.execute(
"CREATE KEYSPACE IF NOT EXISTS ks_deadlock "
"WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2' };")
self.session.set_keyspace('ks_deadlock')
self.session.execute("CREATE TABLE IF NOT EXISTS some_table(k int, c int, v int, PRIMARY KEY (k, v));")
self.session.execute("INSERT INTO some_table (k, c, v) VALUES (1, 2, 3);")
node2.stop(wait=False, gently=False)
self.session.execute("ALTER TABLE some_table ADD v2 int;", timeout=180)
print(self.session.execute("SELECT * FROM some_table WHERE k = 1;").all())

0 comments on commit 501640c

Please sign in to comment.