From 5b0a63f71ba4809fa969ee499a4461fd10988ef9 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 27 Sep 2024 16:13:02 -0400 Subject: [PATCH] Test metadata_request_timeout configuration option --- tests/integration/standard/test_cluster.py | 2 +- tests/integration/standard/test_metadata.py | 34 +++++++++++++++ tests/unit/advanced/test_metadata.py | 46 ++++++++++++++++++++- tests/unit/test_util_types.py | 22 +++++++++- 4 files changed, 100 insertions(+), 4 deletions(-) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 43356dbd82..e506596bf7 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -522,7 +522,7 @@ def test_refresh_schema_no_wait(self): def patched_wait_for_responses(*args, **kwargs): # When selecting schema version, replace the real schema UUID with an unexpected UUID response = original_wait_for_responses(*args, **kwargs) - if len(args) > 2 and hasattr(args[2], "query") and args[2].query == "SELECT schema_version FROM system.local WHERE key='local'": + if len(args) > 2 and hasattr(args[2], "query") and "SELECT schema_version FROM system.local WHERE key='local'" in args[2].query: new_uuid = uuid4() response[1].parsed_rows[0] = (new_uuid,) return response diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 8fc50ce89e..944dd8ab20 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -25,11 +25,13 @@ import pytest from cassandra import AlreadyExists, SignatureDescriptor, UserFunctionDescriptor, UserAggregateDescriptor +from cassandra.connection import Connection from cassandra.encoder import Encoder from cassandra.metadata import (IndexMetadata, Token, murmur3, Function, Aggregate, protect_name, protect_names, RegisteredTableExtension, _RegisteredExtensionType, get_schema_parser, group_keys_by_replica, NO_VALID_REPLICA) +from cassandra.protocol import QueryMessage, ProtocolHandler from cassandra.util import SortedSet from tests.integration import (get_cluster, use_singledc, PROTOCOL_VERSION, execute_until_pass, @@ -1331,6 +1333,38 @@ def test_token(self): cluster.shutdown() +class MetadataTimeoutTest(unittest.TestCase): + """ + Test of TokenMap creation and other behavior. + """ + def test_timeout(self): + cluster = TestCluster() + cluster.metadata_request_timeout = None + + stmts = [] + + class ConnectionWrapper(cluster.connection_class): + def __init__(self, *args, **kwargs): + super(ConnectionWrapper, self).__init__(*args, **kwargs) + + def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, + decoder=ProtocolHandler.decode_message, result_metadata=None): + if isinstance(msg, QueryMessage): + stmts.append(msg.query) + return super(ConnectionWrapper, self).send_msg(msg, request_id, cb, encoder, decoder, result_metadata) + + cluster.connection_class = ConnectionWrapper + s = cluster.connect() + s.execute('SELECT now() FROM system.local') + s.shutdown() + + for stmt in stmts: + if "SELECT now() FROM system.local" in stmt: + continue + if "USING TIMEOUT 2000ms" not in stmt: + self.fail(f"query `{stmt}` does not contain `USING TIMEOUT 2000ms`") + + class KeyspaceAlterMetadata(unittest.TestCase): """ Test verifies that table metadata is preserved on keyspace alter diff --git a/tests/unit/advanced/test_metadata.py b/tests/unit/advanced/test_metadata.py index cf730ebec5..20f80b4da4 100644 --- a/tests/unit/advanced/test_metadata.py +++ b/tests/unit/advanced/test_metadata.py @@ -11,13 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime import unittest from cassandra.metadata import ( KeyspaceMetadata, TableMetadataDSE68, - VertexMetadata, EdgeMetadata + VertexMetadata, EdgeMetadata, SchemaParserV22, _SchemaParser ) +from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS class GraphMetadataToCQLTests(unittest.TestCase): @@ -136,3 +137,44 @@ def test_edge_multiple_partition_and_clustering_keys(self): 'FROM from_label((pk1, pk2), c1, c2) ', tm.as_cql_query() ) + + +class SchemaParsersTests(unittest.TestCase): + def test_metadata_query_metadata_timeout(self): + class FakeConnection: + def __init__(self): + self.queries = [] + + def wait_for_responses(self, *msgs, **kwargs): + self.queries.extend(msgs) + local_response = ResultMessage(kind=RESULT_KIND_ROWS) + local_response.column_names = [] + local_response.parsed_rows = [] + + return [[local_response, local_response] for _ in msgs] + + for schemaClass in get_all_schema_parser_classes(_SchemaParser): + conn = FakeConnection() + p = schemaClass(conn, 2.0, 1000, None) + p._query_all() + + for q in conn.queries: + if "USING TIMEOUT" in q.query: + self.fail(f"<{schemaClass.__name__}> query `{q.query}` contains `USING TIMEOUT`, while should not") + + conn = FakeConnection() + p = schemaClass(conn, 2.0, 1000, datetime.timedelta(seconds=2)) + p._query_all() + + for q in conn.queries: + if "USING TIMEOUT 2000ms" not in q.query: + self.fail(f"{schemaClass.__name__} query `{q.query}` does not contain `USING TIMEOUT 2000ms`") + + +def get_all_schema_parser_classes(cl): + for child in cl.__subclasses__(): + if not child.__name__.startswith('SchemaParser') or child.__module__ != 'cassandra.metadata': + continue + yield child + for c in get_all_schema_parser_classes(child): + yield c diff --git a/tests/unit/test_util_types.py b/tests/unit/test_util_types.py index 5d6058b394..c93b827f59 100644 --- a/tests/unit/test_util_types.py +++ b/tests/unit/test_util_types.py @@ -15,7 +15,7 @@ import datetime -from cassandra.util import Date, Time, Duration, Version +from cassandra.util import Date, Time, Duration, Version, maybe_add_timeout_to_query class DateTests(unittest.TestCase): @@ -287,3 +287,23 @@ def test_version_compare(self): self.assertTrue(Version('4.0-SNAPSHOT2') > Version('4.0.0-SNAPSHOT1')) self.assertTrue(Version('4.0.0-alpha1-SNAPSHOT') > Version('4.0.0-SNAPSHOT')) + + +class FunctionTests(unittest.TestCase): + def test_maybe_add_timeout_to_query(self): + self.assertEqual( + "SELECT * FROM HOSTS", + maybe_add_timeout_to_query("SELECT * FROM HOSTS", None) + ) + self.assertEqual( + "SELECT * FROM HOSTS USING TIMEOUT 1000ms", + maybe_add_timeout_to_query("SELECT * FROM HOSTS", datetime.timedelta(seconds=1)) + ) + self.assertEqual( + "SELECT * FROM HOSTS USING TIMEOUT 1s", + maybe_add_timeout_to_query("SELECT * FROM HOSTS USING TIMEOUT 1s", None) + ) + self.assertEqual( + "SELECT * FROM HOSTS USING TIMEOUT 1s", + maybe_add_timeout_to_query("SELECT * FROM HOSTS USING TIMEOUT 1s", datetime.timedelta(seconds=1)) + )