Skip to content

Commit

Permalink
Make driver override server timeout for system queries
Browse files Browse the repository at this point in the history
Scylla have an ability to override server timeout by appending `USING
TIMEOUT <timeout>ms` to the query
Make driver add `USING TIMEOUT <control_connection_timeout*1000>ms` for
scylla connections
  • Loading branch information
dkropachev committed Aug 8, 2024
1 parent 7e0b02d commit d0e3d13
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 96 deletions.
59 changes: 45 additions & 14 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import absolute_import

import atexit
import datetime
from binascii import hexlify
from collections import defaultdict
from collections.abc import Mapping
Expand Down Expand Up @@ -66,7 +67,7 @@
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler,
RESULT_KIND_VOID, ProtocolException)
from cassandra.metadata import Metadata, protect_name, murmur3, _NodeInfo
from cassandra.metadata import Metadata, protect_name, murmur3, _NodeInfo, SchemaQueryMessage
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
Expand All @@ -82,7 +83,7 @@
from cassandra.marshal import int64_pack
from cassandra.tablets import Tablet, Tablets
from cassandra.timestamps import MonotonicTimestampGenerator
from cassandra.util import _resolve_contact_points_to_string_map, Version
from cassandra.util import _resolve_contact_points_to_string_map, Version, add_timeout_to_query

from cassandra.datastax.insights.reporter import MonitorReporter
from cassandra.datastax.insights.util import version_supports_insights
Expand Down Expand Up @@ -1033,6 +1034,12 @@ def default_retry_policy(self, policy):
or to disable the shardaware port (advanced shardaware)
"""

metadata_request_timeout = datetime.timedelta(seconds=2)
"""
Timeout for all queries used by driver it self.
Supported only by Scylla clusters.
"""

@property
def schema_metadata_enabled(self):
"""
Expand Down Expand Up @@ -1148,7 +1155,9 @@ def __init__(self,
client_id=None,
cloud=None,
scylla_cloud=None,
shard_aware_options=None):
shard_aware_options=None,
metadata_request_timeout=datetime.timedelta(seconds=2),
):
"""
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
extablishing connection pools or refreshing metadata.
Expand Down Expand Up @@ -1240,6 +1249,8 @@ def __init__(self,
self.no_compact = no_compact

self.auth_provider = auth_provider
if metadata_request_timeout is not None:
self.metadata_request_timeout = metadata_request_timeout

if load_balancing_policy is not None:
if isinstance(load_balancing_policy, type):
Expand Down Expand Up @@ -3549,6 +3560,7 @@ class PeersQueryType(object):
_is_shutdown = False
_timeout = None
_protocol_version = None
_metadata_request_timeout = None

_schema_event_refresh_window = None
_topology_event_refresh_window = None
Expand Down Expand Up @@ -3648,7 +3660,7 @@ def _reconnect_internal(self):
(conn, _) = self._connect_host_in_lbp()
if conn is not None:
return conn

# Try to re-resolve hostnames as a fallback when all hosts are unreachable
self._cluster._resolve_hostnames()

Expand Down Expand Up @@ -3689,11 +3701,16 @@ def _try_connect(self, host):
"registering watchers and refreshing schema and topology",
connection)

host.sharding_info = connection.features.sharding_info

# Indirect way to determine if conencted to a ScyllaDB cluster, which does not support peers_v2
# If sharding information is available, it's a ScyllaDB cluster, so do not use peers_v2 table.
if connection.features.sharding_info is not None:
self._uses_peers_v2 = False


# Cassandra does not support "USING TIMEOUT"
self._metadata_request_timeout = None if connection.features.sharding_info is None \
else datetime.timedelta(seconds=self._cluster.control_connection_timeout)
self._tablets_routing_v1 = connection.features.tablets_routing_v1

# use weak references in both directions
Expand All @@ -3710,8 +3727,10 @@ def _try_connect(self, host):

sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
peers_query = SchemaQueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE,
custom_timeout=self._metadata_request_timeout)
local_query = SchemaQueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE,
custom_timeout=self._metadata_request_timeout)
(peers_success, peers_result), (local_success, local_result) = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout, fail_on_error=False)

Expand Down Expand Up @@ -3830,7 +3849,12 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
log.debug("Skipping schema refresh due to lack of schema agreement")
return False

self._cluster.metadata.refresh(connection, self._timeout, fetch_size=self._schema_meta_page_size, **kwargs)
self._cluster.metadata.refresh(
connection,
self._timeout,
fetch_size=self._schema_meta_page_size,
metadata_request_timeout=self._actual_metadata_request_timeout,
**kwargs)

return True

Expand Down Expand Up @@ -3861,8 +3885,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
else:
log.debug("[control connection] Refreshing node list and token map")
sel_local = self._SELECT_LOCAL
peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
local_query = QueryMessage(query=sel_local, consistency_level=cl)
peers_query = SchemaQueryMessage(query=sel_peers, consistency_level=cl,
custom_timeout=self._metadata_request_timeout)
local_query = SchemaQueryMessage(query=sel_local, consistency_level=cl,
custom_timeout=self._metadata_request_timeout)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)

Expand Down Expand Up @@ -3917,8 +3943,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
# local rpc_address has not been queried yet, try to fetch it
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
# in system.local. See CASSANDRA-9436.
local_rpc_address_query = QueryMessage(query=self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS,
consistency_level=ConsistencyLevel.ONE)
local_rpc_address_query = SchemaQueryMessage(
query=self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS,
consistency_level=ConsistencyLevel.ONE,
custom_timeout=self._metadata_request_timeout,
)
success, local_rpc_address_result = connection.wait_for_response(
local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
if success:
Expand Down Expand Up @@ -4153,8 +4182,10 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)

while elapsed < total_timeout:
peers_query = QueryMessage(query=select_peers_query, consistency_level=cl)
local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl)
peers_query = SchemaQueryMessage(query=select_peers_query, consistency_level=cl,
custom_timeout=self._metadata_request_timeout)
local_query = SchemaQueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl,
custom_timeout=self._metadata_request_timeout)
try:
timeout = min(self._timeout, total_timeout - elapsed)
peers_result, local_result = connection.wait_for_responses(
Expand Down
Loading

0 comments on commit d0e3d13

Please sign in to comment.