Skip to content

Add shard connection backoff policy #473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
ExponentialReconnectionPolicy, HostDistance,
RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
NoSpeculativeExecutionPolicy, DefaultLoadBalancingPolicy,
NeverRetryPolicy)
NeverRetryPolicy, ShardConnectionBackoffPolicy, NoDelayShardConnectionBackoffPolicy,
ShardConnectionScheduler)
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
HostConnectionPool, HostConnection,
NoConnectionsAvailable)
Expand Down Expand Up @@ -757,6 +758,11 @@ def auth_provider(self, value):

self._auth_provider = value

_shard_connection_backoff_policy: ShardConnectionBackoffPolicy
@property
def shard_connection_backoff_policy(self) -> ShardConnectionBackoffPolicy:
return self._shard_connection_backoff_policy

_load_balancing_policy = None
@property
def load_balancing_policy(self):
Expand Down Expand Up @@ -1219,7 +1225,8 @@ def __init__(self,
shard_aware_options=None,
metadata_request_timeout=None,
column_encryption_policy=None,
application_info:Optional[ApplicationInfoBase]=None
application_info: Optional[ApplicationInfoBase] = None,
shard_connection_backoff_policy: Optional[ShardConnectionBackoffPolicy] = None,
):
"""
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
Expand Down Expand Up @@ -1325,6 +1332,13 @@ def __init__(self,
else:
self._load_balancing_policy = default_lbp_factory() # set internal attribute to avoid committing to legacy config mode

if shard_connection_backoff_policy is not None:
if not isinstance(shard_connection_backoff_policy, ShardConnectionBackoffPolicy):
raise TypeError("shard_connection_backoff_policy should be an instance of class derived from ShardConnectionBackoffPolicy")
self._shard_connection_backoff_policy = shard_connection_backoff_policy
else:
self._shard_connection_backoff_policy = NoDelayShardConnectionBackoffPolicy()

if reconnection_policy is not None:
if isinstance(reconnection_policy, type):
raise TypeError("reconnection_policy should not be a class, it should be an instance of that class")
Expand Down Expand Up @@ -2716,6 +2730,7 @@ def default_serial_consistency_level(self, cl):
_metrics = None
_request_init_callbacks = None
_graph_paging_available = False
shard_connection_backoff_scheduler: ShardConnectionScheduler

def __init__(self, cluster, hosts, keyspace=None):
self.cluster = cluster
Expand All @@ -2730,6 +2745,7 @@ def __init__(self, cluster, hosts, keyspace=None):
self._protocol_version = self.cluster.protocol_version

self.encoder = Encoder()
self.shard_connection_backoff_scheduler = cluster.shard_connection_backoff_policy.new_connection_scheduler(self.cluster.scheduler)

# create connection pools in parallel
self._initial_connect_futures = set()
Expand Down Expand Up @@ -3340,6 +3356,7 @@ def shutdown(self):
else:
self.is_shutdown = True

self.shard_connection_backoff_scheduler.shutdown()
# PYTHON-673. If shutdown was called shortly after session init, avoid
# a race by cancelling any initial connection attempts haven't started,
# then blocking on any that have.
Expand Down Expand Up @@ -4456,7 +4473,12 @@ def shutdown(self):
self.join()

def schedule(self, delay, fn, *args, **kwargs):
self._insert_task(delay, (fn, args, tuple(kwargs.items())))
if self.is_shutdown:
return
if delay:
self._insert_task(delay, (fn, args, tuple(kwargs.items())))
else:
self._executor.submit(fn, *args, **kwargs)

def schedule_unique(self, delay, fn, *args, **kwargs):
task = (fn, args, tuple(kwargs.items()))
Expand Down
Loading
Loading