Skip to content

Commit

Permalink
Support port discovery for C* 4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
aboudreault committed Mar 30, 2020
1 parent 1f2103f commit 40fe726
Show file tree
Hide file tree
Showing 15 changed files with 408 additions and 104 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Unreleased

Features
--------
Transient Replication Support (PYTHON-1207)
* Transient Replication Support (PYTHON-1207)
* Support system.peers_v2 and port discovery for C* 4.0 (PYTHON-700)

Bug Fixes
---------
Expand Down
8 changes: 5 additions & 3 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ schedules:
matrix:
exclude:
- python: [3.4, 3.6, 3.7, 3.8]
- cassandra: ['2.1', '3.0', '4.0', 'test-dse']
- cassandra: ['2.1', '3.0', 'test-dse']

commit_branches:
schedule: per_commit
Expand All @@ -34,7 +34,7 @@ schedules:
matrix:
exclude:
- python: [3.4, 3.6, 3.7, 3.8]
- cassandra: ['2.1', '3.0', '4.0', 'test-dse']
- cassandra: ['2.1', '3.0', 'test-dse']

commit_branches_dev:
schedule: per_commit
Expand Down Expand Up @@ -184,9 +184,11 @@ build:
pip install --upgrade pip
pip install -U setuptools
pip install git+ssh://git@github.com/riptano/ccm-private.git@cassandra-7544-native-ports-with-dse-fix
# Remove this pyyaml installation when removing Python 3.4 support
pip install PyYAML==5.2
pip install $HOME/ccm
#pip install $HOME/ccm
if [ -n "$CCM_IS_DSE" ]; then
pip install -r test-datastax-requirements.txt
Expand Down
136 changes: 89 additions & 47 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler,
RESULT_KIND_VOID)
from cassandra.metadata import Metadata, protect_name, murmur3
from cassandra.metadata import Metadata, protect_name, murmur3, _NodeInfo
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
Expand Down Expand Up @@ -581,7 +581,7 @@ class Cluster(object):
contact_points = ['127.0.0.1']
"""
The list of contact points to try connecting for cluster discovery. A
contact point can be a string (ip, hostname) or a
contact point can be a string (ip or hostname), a tuple (ip/hostname, port) or a
:class:`.connection.EndPoint` instance.
Defaults to loopback interface.
Expand Down Expand Up @@ -1152,20 +1152,24 @@ def __init__(self,
self.endpoint_factory = endpoint_factory or DefaultEndPointFactory(port=self.port)
self.endpoint_factory.configure(self)

raw_contact_points = [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]
raw_contact_points = []
for cp in [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]:
raw_contact_points.append(cp if isinstance(cp, tuple) else (cp, port))

self.endpoints_resolved = [cp for cp in self.contact_points if isinstance(cp, EndPoint)]
self._endpoint_map_for_insights = {repr(ep): '{ip}:{port}'.format(ip=ep.address, port=ep.port)
for ep in self.endpoints_resolved}

strs_resolved_map = _resolve_contact_points_to_string_map(raw_contact_points, port)
strs_resolved_map = _resolve_contact_points_to_string_map(raw_contact_points)
self.endpoints_resolved.extend(list(chain(
*[
[DefaultEndPoint(x, port) for x in xs if x is not None]
[DefaultEndPoint(ip, port) for ip, port in xs if ip is not None]
for xs in strs_resolved_map.values() if xs is not None
]
)))

self._endpoint_map_for_insights.update(
{key: ['{ip}:{port}'.format(ip=ip, port=port) for ip in value]
{key: ['{ip}:{port}'.format(ip=ip, port=port) for ip, port in value]
for key, value in strs_resolved_map.items() if value is not None}
)

Expand Down Expand Up @@ -3420,8 +3424,17 @@ class ControlConnection(object):
_SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, host_id, {nt_col_name}, schema_version FROM system.peers"
_SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"

_SELECT_PEERS_V2 = "SELECT * FROM system.peers_v2"
_SELECT_PEERS_NO_TOKENS_V2 = "SELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2"
_SELECT_SCHEMA_PEERS_V2 = "SELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2"

_MINIMUM_NATIVE_ADDRESS_DSE_VERSION = Version("6.0.0")

class PeersQueryType(object):
"""internal Enum for _peers_query"""
PEERS = 0
PEERS_SCHEMA = 1

_is_shutdown = False
_timeout = None
_protocol_version = None
Expand All @@ -3433,6 +3446,8 @@ class ControlConnection(object):
_schema_meta_enabled = True
_token_meta_enabled = True

_uses_peers_v2 = True

# for testing purposes
_time = time

Expand Down Expand Up @@ -3547,13 +3562,25 @@ def _try_connect(self, host):
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
}, register_timeout=self._timeout)

sel_peers = self._peers_query_for_version(connection, self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
shared_results = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)
(peers_success, peers_result), (local_success, local_result) = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout, fail_on_error=False)

if not local_success:
raise local_result

if not peers_success:
# error with the peers v2 query, fallback to peers v1
self._uses_peers_v2 = False
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
peers_result = connection.wait_for_response(
peers_query, timeout=self._timeout)

shared_results = (peers_result, local_result)
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1)
except Exception:
Expand Down Expand Up @@ -3675,20 +3702,18 @@ def refresh_node_list_and_token_map(self, force_token_rebuild=False):

def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
force_token_rebuild=False):

if preloaded_results:
log.debug("[control connection] Refreshing node list and token map using preloaded results")
peers_result = preloaded_results[0]
local_result = preloaded_results[1]
else:
cl = ConsistencyLevel.ONE
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
if not self._token_meta_enabled:
log.debug("[control connection] Refreshing node list without token map")
sel_peers = self._peers_query_for_version(connection, self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
sel_local = self._SELECT_LOCAL_NO_TOKENS
else:
log.debug("[control connection] Refreshing node list and token map")
sel_peers = self._SELECT_PEERS
sel_local = self._SELECT_LOCAL
peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
local_query = QueryMessage(query=sel_local, consistency_level=cl)
Expand Down Expand Up @@ -3718,13 +3743,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
self._update_location_info(host, datacenter, rack)
host.host_id = local_row.get("host_id")
host.listen_address = local_row.get("listen_address")
host.broadcast_address = local_row.get("broadcast_address")
host.listen_port = local_row.get("listen_port")
host.broadcast_address = _NodeInfo.get_broadcast_address(local_row)
host.broadcast_port = _NodeInfo.get_broadcast_port(local_row)

host.broadcast_rpc_address = self._address_from_row(local_row)
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(local_row)
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(local_row)
if host.broadcast_rpc_address is None:
if self._token_meta_enabled:
# local rpc_address is not available, use the connection endpoint
host.broadcast_rpc_address = connection.endpoint.address
host.broadcast_rpc_port = connection.endpoint.port
else:
# local rpc_address has not been queried yet, try to fetch it
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
Expand All @@ -3737,9 +3766,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
row = dict_factory(
local_rpc_address_result.column_names,
local_rpc_address_result.parsed_rows)
host.broadcast_rpc_address = row[0]['rpc_address']
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(row[0])
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(row[0])
else:
host.broadcast_rpc_address = connection.endpoint.address
host.broadcast_rpc_port = connection.endpoint.port

host.release_version = local_row.get("release_version")
host.dse_version = local_row.get("dse_version")
Expand Down Expand Up @@ -3777,8 +3808,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)

host.host_id = row.get("host_id")
host.broadcast_address = row.get("peer")
host.broadcast_rpc_address = self._address_from_row(row)
host.broadcast_address = _NodeInfo.get_broadcast_address(row)
host.broadcast_port = _NodeInfo.get_broadcast_port(row)
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(row)
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(row)
host.release_version = row.get("release_version")
host.dse_version = row.get("dse_version")
host.dse_workload = row.get("workload")
Expand Down Expand Up @@ -3834,7 +3867,8 @@ def _refresh_nodes_if_not_up(self, host):

def _handle_topology_change(self, event):
change_type = event["change_type"]
host = self._cluster.metadata.get_host(event["address"][0])
addr, port = event["address"]
host = self._cluster.metadata.get_host(addr, port)
if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
if self._topology_event_refresh_window >= 0:
delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window)
Expand All @@ -3844,7 +3878,8 @@ def _handle_topology_change(self, event):

def _handle_status_change(self, event):
change_type = event["change_type"]
host = self._cluster.metadata.get_host(event["address"][0])
addr, port = event["address"]
host = self._cluster.metadata.get_host(addr, port)
if change_type == "UP":
delay = self._delay_for_event_type('status_change', self._status_event_refresh_window)
if host is None:
Expand Down Expand Up @@ -3898,7 +3933,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
elapsed = 0
cl = ConsistencyLevel.ONE
schema_mismatches = None
select_peers_query = self._peers_query_for_version(connection, self._SELECT_SCHEMA_PEERS_TEMPLATE)
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)

while elapsed < total_timeout:
peers_query = QueryMessage(query=select_peers_query, consistency_level=cl)
Expand Down Expand Up @@ -3955,43 +3990,50 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):

return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))

def _address_from_row(self, row):
def _get_peers_query(self, peers_query_type, connection=None):
"""
Parse the broadcast rpc address from a row and return it untranslated.
"""
addr = None
if "rpc_address" in row:
addr = row.get("rpc_address") # peers and local
if "native_transport_address" in row:
addr = row.get("native_transport_address")
if not addr or addr in ["0.0.0.0", "::"]:
addr = row.get("peer")
return addr
Determine the peers query to use.
:param peers_query_type: Should be one of PeersQueryType enum.
If _uses_peers_v2 is True, return the proper peers_v2 query (no templating).
Else, apply the logic below to choose the peers v1 address column name:
def _peers_query_for_version(self, connection, peers_query_template):
"""
Given a connection:
- find the server product version running on the connection's host,
- use that to choose the column name for the transport address (see APOLLO-1130), and
- use that column name in the provided peers query template.
The provided template should be a string with a format replacement
field named nt_col_name.
"""
host_release_version = self._cluster.metadata.get_host(connection.endpoint).release_version
host_dse_version = self._cluster.metadata.get_host(connection.endpoint).dse_version
uses_native_address_query = (
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)
if peers_query_type not in (self.PeersQueryType.PEERS, self.PeersQueryType.PEERS_SCHEMA):
raise ValueError("Invalid peers query type: %s" % peers_query_type)

if uses_native_address_query:
select_peers_query = peers_query_template.format(nt_col_name="native_transport_address")
elif host_release_version:
select_peers_query = peers_query_template.format(nt_col_name="rpc_address")
if self._uses_peers_v2:
if peers_query_type == self.PeersQueryType.PEERS:
query = self._SELECT_PEERS_V2 if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS_V2
else:
query = self._SELECT_SCHEMA_PEERS_V2
else:
select_peers_query = self._SELECT_PEERS
if peers_query_type == self.PeersQueryType.PEERS and self._token_meta_enabled:
query = self._SELECT_PEERS
else:
query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE
if peers_query_type == self.PeersQueryType.PEERS_SCHEMA
else self._SELECT_PEERS_NO_TOKENS_TEMPLATE)

host_release_version = self._cluster.metadata.get_host(connection.endpoint).release_version
host_dse_version = self._cluster.metadata.get_host(connection.endpoint).dse_version
uses_native_address_query = (
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)

if uses_native_address_query:
query = query_template.format(nt_col_name="native_transport_address")
elif host_release_version:
query = query_template.format(nt_col_name="rpc_address")
else:
query = self._SELECT_PEERS

return select_peers_query
return query

def _signal_error(self):
with self._lock:
Expand Down Expand Up @@ -4181,7 +4223,7 @@ class ResponseFuture(object):

coordinator_host = None
"""
The host from which we recieved a response
The host from which we received a response
"""

attempted_hosts = None
Expand Down
19 changes: 10 additions & 9 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,26 @@ class DefaultEndPointFactory(EndPointFactory):

port = None
"""
If set, force all endpoints to use this port.
If no port is discovered in the row, this is the default port
used for endpoint creation.
"""

def __init__(self, port=None):
self.port = port

def create(self, row):
addr = None
if "rpc_address" in row:
addr = row.get("rpc_address")
if "native_transport_address" in row:
addr = row.get("native_transport_address")
if not addr or addr in ["0.0.0.0", "::"]:
addr = row.get("peer")
# TODO next major... move this class so we don't need this kind of hack
from cassandra.metadata import _NodeInfo
addr = _NodeInfo.get_broadcast_rpc_address(row)
port = _NodeInfo.get_broadcast_rpc_port(row)
if port is None:
port = self.port if self.port else 9042

# create the endpoint with the translated address
# TODO next major, create a TranslatedEndPoint type
return DefaultEndPoint(
self.cluster.address_translator.translate(addr),
self.port if self.port is not None else 9042)
port)


@total_ordering
Expand Down
Loading

0 comments on commit 40fe726

Please sign in to comment.