Skip to content

Commit

Permalink
Merge pull request #166 from moguchev/host_policy_reset
Browse files Browse the repository at this point in the history
HostSelectionPolicy: add Reset method, Session.Close: call HostSelect…
  • Loading branch information
sylwiaszunejko authored Jun 28, 2024
2 parents 258cf8e + b22eaf2 commit e853e9b
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 0 deletions.
20 changes: 20 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ type HostSelectionPolicy interface {
SetTablets
KeyspaceChanged(KeyspaceUpdateEvent)
Init(*Session)
// Reset is opprotunity to reset HostSelectionPolicy if Session initilization failed and we want to
// call HostSelectionPolicy.Init() again with new Session
Reset()
IsLocal(host *HostInfo) bool
// Pick returns an iteration function over selected hosts.
// Multiple attempts of a single query execution won't call the returned NextHost function concurrently,
Expand Down Expand Up @@ -359,6 +362,7 @@ func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool { return tru
func (r *roundRobinHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {}
func (r *roundRobinHostPolicy) Init(*Session) {}
func (r *roundRobinHostPolicy) Reset() {}

// Experimental, this interface and use may change
func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {}
Expand Down Expand Up @@ -472,6 +476,19 @@ func (t *tokenAwareHostPolicy) Init(s *Session) {
t.logger = s.logger
}

func (t *tokenAwareHostPolicy) Reset() {
t.mu.Lock()
defer t.mu.Unlock()

// Sharing token aware host selection policy between sessions is not supported
// but session initialization can failed for some reasons. So in our application
// may be we want to create new session again.
// Reset method should be called in Session.Close method
t.getKeyspaceMetadata = nil
t.getKeyspaceName = nil
t.logger = nil
}

func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
return t.fallback.IsLocal(host)
}
Expand Down Expand Up @@ -805,6 +822,7 @@ type hostPoolHostPolicy struct {
}

func (r *hostPoolHostPolicy) Init(*Session) {}
func (r *hostPoolHostPolicy) Reset() {}
func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }
Expand Down Expand Up @@ -944,6 +962,7 @@ func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
}

func (d *dcAwareRR) Init(*Session) {}
func (d *dcAwareRR) Reset() {}
func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (d *dcAwareRR) SetPartitioner(p string) {}

Expand Down Expand Up @@ -1040,6 +1059,7 @@ func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPo
}

func (d *rackAwareRR) Init(*Session) {}
func (d *rackAwareRR) Reset() {}
func (d *rackAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (d *rackAwareRR) SetPartitioner(p string) {}

Expand Down
98 changes: 98 additions & 0 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,3 +1035,101 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {
time.Sleep(100 * time.Millisecond)
close(cancel)
}

func TestTokenAwarePolicyReset(t *testing.T) {
policy := TokenAwareHostPolicy(
RackAwareRoundRobinPolicy("local", "b"),
NonLocalReplicasFallback(),
)
policyInternal := policy.(*tokenAwareHostPolicy)

if policyInternal.fallback == nil {
t.Fatal("fallback is nil")
}
if !policyInternal.nonLocalReplicasFallback {
t.Fatal("nonLocalReplicasFallback is false")
}

policy.Init(&Session{logger: &defaultLogger{}})
if policyInternal.getKeyspaceMetadata == nil {
t.Fatal("keyspace metatadata fn is nil")
}
if policyInternal.getKeyspaceName == nil {
t.Fatal("keyspace name fn is nil")
}
if policyInternal.logger == nil {
t.Fatal("logger is nil")
}

// Reset - should reset fields that were set in Init
policy.Reset()

if policyInternal.fallback == nil { // we don't touch fallback
t.Fatal("fallback is nil")
}
if !policyInternal.nonLocalReplicasFallback { // we don't touch nonLocalReplicasFallback
t.Fatal("nonLocalReplicasFallback is false")
}
if policyInternal.getKeyspaceMetadata != nil {
t.Fatal("keyspace metatadata fn is not nil")
}
if policyInternal.getKeyspaceName != nil {
t.Fatal("keyspace name fn is not nil")
}
if policyInternal.logger != nil {
t.Fatal("logger is nil")
}
}

func TestTokenAwarePolicyResetInSessionClose(t *testing.T) {
policy := TokenAwareHostPolicy(
RackAwareRoundRobinPolicy("local", "b"),
NonLocalReplicasFallback(),
)
policyInternal := policy.(*tokenAwareHostPolicy)

if policyInternal.fallback == nil {
t.Fatal("fallback is nil")
}
if !policyInternal.nonLocalReplicasFallback {
t.Fatal("nonLocalReplicasFallback is false")
}

// emulate session initialization
session := &Session{
logger: &defaultLogger{},
policy: policy,
}
policy.Init(session)
// check that we are realy initialize policy
if policyInternal.getKeyspaceMetadata == nil {
t.Fatal("keyspace metatadata fn is nil")
}
if policyInternal.getKeyspaceName == nil {
t.Fatal("keyspace name fn is nil")
}
if policyInternal.logger == nil {
t.Fatal("logger is nil")
}

// session.Close should call policy.Reset method
session.Close()

// check that session.Close has called policy.Reset method

if policyInternal.fallback == nil { // we don't touch fallback in Reset
t.Fatal("fallback is nil")
}
if !policyInternal.nonLocalReplicasFallback { // we don't touch nonLocalReplicasFallback in Reset
t.Fatal("nonLocalReplicasFallback is false")
}
if policyInternal.getKeyspaceMetadata != nil {
t.Fatal("keyspace metatadata fn is not nil")
}
if policyInternal.getKeyspaceName != nil {
t.Fatal("keyspace name fn is not nil")
}
if policyInternal.logger != nil {
t.Fatal("logger is nil")
}
}
4 changes: 4 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,10 @@ func (s *Session) Close() {
s.cancel()
}

if s.policy != nil {
s.policy.Reset()
}

s.sessionStateMu.Lock()
s.isClosed = true
s.sessionStateMu.Unlock()
Expand Down

0 comments on commit e853e9b

Please sign in to comment.