Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
dkropachev committed Jul 1, 2024
1 parent 25ea2a6 commit a358d40
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 30 deletions.
5 changes: 0 additions & 5 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ type SetTablets interface {
SetTablets(tablets []*TabletInfo)
}

// interface to implement to permit dc failover
type DCFailover interface {
PermitDCFailover() HostSelectionPolicy
}

func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
// Config.InsecureSkipVerify | EnableHostVerification | Result
// Config is nil | true | verify host
Expand Down
47 changes: 31 additions & 16 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ type HostSelectionPolicy interface {
SetPartitioner
// Experimental, this interface and use may change
SetTablets
DCFailover
KeyspaceChanged(KeyspaceUpdateEvent)
Init(*Session)
// Reset is opprotunity to reset HostSelectionPolicy if Session initilization failed and we want to
Expand Down Expand Up @@ -366,8 +365,7 @@ func (r *roundRobinHostPolicy) Init(*Session) {}
func (r *roundRobinHostPolicy) Reset() {}

// Experimental, this interface and use may change
func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {}
func (r *roundRobinHostPolicy) PermitDCFailover() HostSelectionPolicy { return r }
func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {}

func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost {
nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1)
Expand Down Expand Up @@ -550,8 +548,6 @@ func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) {
t.tablets.set(tablets)
}

func (t *tokenAwareHostPolicy) PermitDCFailover() HostSelectionPolicy { return t }

func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
t.mu.Lock()
if t.hosts.add(host) {
Expand Down Expand Up @@ -832,8 +828,7 @@ func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }

// Experimental, this interface and use may change
func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {}
func (r *hostPoolHostPolicy) PermitDCFailover() HostSelectionPolicy { return r }
func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {}

func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
peers := make([]string, len(hosts))
Expand Down Expand Up @@ -960,13 +955,30 @@ type dcAwareRR struct {
permitDCFailover bool
}

type dcFailoverEnabledPolicy interface {
setDCFailoverEnabled()
}

type dcAwarePolicyOption func(p dcFailoverEnabledPolicy)

func HostPolicyOptionEnableDCFailover(p dcFailoverEnabledPolicy) {
p.setDCFailoverEnabled()
}

// DCAwareRoundRobinPolicy is a host selection policies which will prioritize and
// return hosts which are in the local datacentre before returning hosts in all
// other datercentres
func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
return &dcAwareRR{local: localDC, permitDCFailover: false}
func DCAwareRoundRobinPolicy(localDC string, opts ...dcAwarePolicyOption) HostSelectionPolicy {
p := &dcAwareRR{local: localDC, permitDCFailover: false}
for _, opt := range opts {
opt(p)
}
return p
}

func (d *dcAwareRR) setDCFailoverEnabled() {
d.permitDCFailover = true
}
func (d *dcAwareRR) Init(*Session) {}
func (d *dcAwareRR) Reset() {}
func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
Expand All @@ -978,10 +990,6 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool {

// Experimental, this interface and use may change
func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {}
func (d *dcAwareRR) PermitDCFailover() HostSelectionPolicy {
d.permitDCFailover = true
return d
}

func (d *dcAwareRR) AddHost(host *HostInfo) {
if d.IsLocal(host) {
Expand Down Expand Up @@ -1069,9 +1077,12 @@ type rackAwareRR struct {
permitDCFailover bool
}

func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPolicy {
hosts := make([]cowHostList, 3)
return &rackAwareRR{localDC: localDC, localRack: localRack, hosts: hosts, permitDCFailover: false}
func RackAwareRoundRobinPolicy(localDC string, localRack string, opts ...dcAwarePolicyOption) HostSelectionPolicy {
p := &rackAwareRR{localDC: localDC, localRack: localRack, hosts: make([]cowHostList, 3)}
for _, opt := range opts {
opt(p)
}
return p
}

func (d *rackAwareRR) Init(*Session) {}
Expand All @@ -1083,6 +1094,10 @@ func (d *rackAwareRR) MaxHostTier() uint {
return 2
}

func (d *rackAwareRR) setDCFailoverEnabled() {
d.permitDCFailover = true
}

// Experimental, this interface and use may change
func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {}
func (d *rackAwareRR) PermitDCFailover() HostSelectionPolicy {
Expand Down
18 changes: 9 additions & 9 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func expectNoMoreHosts(t *testing.T, iter NextHost) {
}

func TestHostPolicy_DCAwareRR(t *testing.T) {
p := DCAwareRoundRobinPolicy("local").PermitDCFailover()
p := DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover)

hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
Expand Down Expand Up @@ -606,7 +606,7 @@ func TestHostPolicy_DCAwareRR(t *testing.T) {
// with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication.
func TestHostPolicy_TokenAware(t *testing.T) {
const keyspace = "myKeyspace"
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local").PermitDCFailover())
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover))
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -708,7 +708,7 @@ func TestHostPolicy_TokenAware(t *testing.T) {
// with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication.
func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
const keyspace = "myKeyspace"
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local").PermitDCFailover(), NonLocalReplicasFallback())
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback())
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
}

func TestHostPolicy_RackAwareRR(t *testing.T) {
p := RackAwareRoundRobinPolicy("local", "b").PermitDCFailover()
p := RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover)

hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local", rack: "a"},
Expand Down Expand Up @@ -829,8 +829,8 @@ func TestHostPolicy_RackAwareRR(t *testing.T) {
// DC & Rack aware round-robin host selection policy fallback
func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
const keyspace = "myKeyspace"
policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b").PermitDCFailover())
policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b").PermitDCFailover(), NonLocalReplicasFallback())
policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover))
policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback())

policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
Expand Down Expand Up @@ -960,7 +960,7 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
}

func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local").PermitDCFailover())
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover))
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return "myKeyspace" }
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {

func TestTokenAwarePolicyReset(t *testing.T) {
policy := TokenAwareHostPolicy(
RackAwareRoundRobinPolicy("local", "b").PermitDCFailover(),
RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover),
NonLocalReplicasFallback(),
)
policyInternal := policy.(*tokenAwareHostPolicy)
Expand Down Expand Up @@ -1083,7 +1083,7 @@ func TestTokenAwarePolicyReset(t *testing.T) {

func TestTokenAwarePolicyResetInSessionClose(t *testing.T) {
policy := TokenAwareHostPolicy(
RackAwareRoundRobinPolicy("local", "b").PermitDCFailover(),
RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover),
NonLocalReplicasFallback(),
)
policyInternal := policy.(*tokenAwareHostPolicy)
Expand Down

0 comments on commit a358d40

Please sign in to comment.