From a358d403a097a384e0abf81ec846b2e3bd950165 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Mon, 1 Jul 2024 10:28:33 -0400 Subject: [PATCH] 1 --- connectionpool.go | 5 ----- policies.go | 47 +++++++++++++++++++++++++++++++---------------- policies_test.go | 18 +++++++++--------- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/connectionpool.go b/connectionpool.go index 37e7cf75d..9e5fa109b 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -32,11 +32,6 @@ type SetTablets interface { SetTablets(tablets []*TabletInfo) } -// interface to implement to permit dc failover -type DCFailover interface { - PermitDCFailover() HostSelectionPolicy -} - func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) { // Config.InsecureSkipVerify | EnableHostVerification | Result // Config is nil | true | verify host diff --git a/policies.go b/policies.go index 8c908bf47..d73af4085 100644 --- a/policies.go +++ b/policies.go @@ -308,7 +308,6 @@ type HostSelectionPolicy interface { SetPartitioner // Experimental, this interface and use may change SetTablets - DCFailover KeyspaceChanged(KeyspaceUpdateEvent) Init(*Session) // Reset is opprotunity to reset HostSelectionPolicy if Session initilization failed and we want to @@ -366,8 +365,7 @@ func (r *roundRobinHostPolicy) Init(*Session) {} func (r *roundRobinHostPolicy) Reset() {} // Experimental, this interface and use may change -func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {} -func (r *roundRobinHostPolicy) PermitDCFailover() HostSelectionPolicy { return r } +func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {} func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1) @@ -550,8 +548,6 @@ func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) { t.tablets.set(tablets) } -func (t *tokenAwareHostPolicy) PermitDCFailover() HostSelectionPolicy { return t } - func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { t.mu.Lock() if t.hosts.add(host) { @@ -832,8 +828,7 @@ func (r *hostPoolHostPolicy) SetPartitioner(string) {} func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true } // Experimental, this interface and use may change -func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {} -func (r *hostPoolHostPolicy) PermitDCFailover() HostSelectionPolicy { return r } +func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {} func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { peers := make([]string, len(hosts)) @@ -960,13 +955,30 @@ type dcAwareRR struct { permitDCFailover bool } +type dcFailoverEnabledPolicy interface { + setDCFailoverEnabled() +} + +type dcAwarePolicyOption func(p dcFailoverEnabledPolicy) + +func HostPolicyOptionEnableDCFailover(p dcFailoverEnabledPolicy) { + p.setDCFailoverEnabled() +} + // DCAwareRoundRobinPolicy is a host selection policies which will prioritize and // return hosts which are in the local datacentre before returning hosts in all // other datercentres -func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy { - return &dcAwareRR{local: localDC, permitDCFailover: false} +func DCAwareRoundRobinPolicy(localDC string, opts ...dcAwarePolicyOption) HostSelectionPolicy { + p := &dcAwareRR{local: localDC, permitDCFailover: false} + for _, opt := range opts { + opt(p) + } + return p } +func (d *dcAwareRR) setDCFailoverEnabled() { + d.permitDCFailover = true +} func (d *dcAwareRR) Init(*Session) {} func (d *dcAwareRR) Reset() {} func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {} @@ -978,10 +990,6 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool { // Experimental, this interface and use may change func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {} -func (d *dcAwareRR) PermitDCFailover() HostSelectionPolicy { - d.permitDCFailover = true - return d -} func (d *dcAwareRR) AddHost(host *HostInfo) { if d.IsLocal(host) { @@ -1069,9 +1077,12 @@ type rackAwareRR struct { permitDCFailover bool } -func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPolicy { - hosts := make([]cowHostList, 3) - return &rackAwareRR{localDC: localDC, localRack: localRack, hosts: hosts, permitDCFailover: false} +func RackAwareRoundRobinPolicy(localDC string, localRack string, opts ...dcAwarePolicyOption) HostSelectionPolicy { + p := &rackAwareRR{localDC: localDC, localRack: localRack, hosts: make([]cowHostList, 3)} + for _, opt := range opts { + opt(p) + } + return p } func (d *rackAwareRR) Init(*Session) {} @@ -1083,6 +1094,10 @@ func (d *rackAwareRR) MaxHostTier() uint { return 2 } +func (d *rackAwareRR) setDCFailoverEnabled() { + d.permitDCFailover = true +} + // Experimental, this interface and use may change func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} func (d *rackAwareRR) PermitDCFailover() HostSelectionPolicy { diff --git a/policies_test.go b/policies_test.go index 6088e6495..0695d409d 100644 --- a/policies_test.go +++ b/policies_test.go @@ -556,7 +556,7 @@ func expectNoMoreHosts(t *testing.T, iter NextHost) { } func TestHostPolicy_DCAwareRR(t *testing.T) { - p := DCAwareRoundRobinPolicy("local").PermitDCFailover() + p := DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover) hosts := [...]*HostInfo{ {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"}, @@ -606,7 +606,7 @@ func TestHostPolicy_DCAwareRR(t *testing.T) { // with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication. func TestHostPolicy_TokenAware(t *testing.T) { const keyspace = "myKeyspace" - policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local").PermitDCFailover()) + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover)) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return keyspace } policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { @@ -708,7 +708,7 @@ func TestHostPolicy_TokenAware(t *testing.T) { // with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication. func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) { const keyspace = "myKeyspace" - policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local").PermitDCFailover(), NonLocalReplicasFallback()) + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback()) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return keyspace } policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { @@ -797,7 +797,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) { } func TestHostPolicy_RackAwareRR(t *testing.T) { - p := RackAwareRoundRobinPolicy("local", "b").PermitDCFailover() + p := RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover) hosts := [...]*HostInfo{ {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local", rack: "a"}, @@ -829,8 +829,8 @@ func TestHostPolicy_RackAwareRR(t *testing.T) { // DC & Rack aware round-robin host selection policy fallback func TestHostPolicy_TokenAware_RackAware(t *testing.T) { const keyspace = "myKeyspace" - policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b").PermitDCFailover()) - policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b").PermitDCFailover(), NonLocalReplicasFallback()) + policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover)) + policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback()) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return keyspace } @@ -960,7 +960,7 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) { } func TestHostPolicy_TokenAware_Issue1274(t *testing.T) { - policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local").PermitDCFailover()) + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover)) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return "myKeyspace" } policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { @@ -1038,7 +1038,7 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) { func TestTokenAwarePolicyReset(t *testing.T) { policy := TokenAwareHostPolicy( - RackAwareRoundRobinPolicy("local", "b").PermitDCFailover(), + RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback(), ) policyInternal := policy.(*tokenAwareHostPolicy) @@ -1083,7 +1083,7 @@ func TestTokenAwarePolicyReset(t *testing.T) { func TestTokenAwarePolicyResetInSessionClose(t *testing.T) { policy := TokenAwareHostPolicy( - RackAwareRoundRobinPolicy("local", "b").PermitDCFailover(), + RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback(), ) policyInternal := policy.(*tokenAwareHostPolicy)