Skip to content

Commit

Permalink
scylla: lazily close excess connections.
Browse files Browse the repository at this point in the history
A list of excess connections is maintained to allow for lazy removal of.
excess connections. Keeping excess connections open helps reaching equilibrium
faster since the likelihood of hitting the same shard decreases with the number
of connections to the shard. The excess connections list is currently
capped to 10 times the number of shards. This magic number has no
backing statistics but sounds reasonable.
  • Loading branch information
Henrik Johansson committed Mar 20, 2019
1 parent 83c184d commit 6a327ee
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,17 @@ func isScyllaConn(conn *Conn) bool {

// scyllaConnPicker is a specialised ConnPicker that selects connections based
// on token trying to get connection to a shard containing the given token.
// A list of excess connections is maintained to allow for lazy removal of.
// excess connections. Keeping excess connections open helps reaching equilibrium
// faster since the likelihood of hitting the same shard decreases with the number
// of connections to the shard.
type scyllaConnPicker struct {
conns []*Conn
nrConns int
nrShards int
msbIgnore uint64
pos int32
conns []*Conn
excessConns []*Conn
nrConns int
nrShards int
msbIgnore uint64
pos int32
}

func newScyllaConnPicker(conn *Conn) *scyllaConnPicker {
Expand Down Expand Up @@ -175,6 +180,8 @@ func (p *scyllaConnPicker) shardOf(token murmur3Token) int {
}

func (p *scyllaConnPicker) Put(conn *Conn) {
const maxExcessConnsFactor = 10

s := parseSupported(conn.supported)
if s.nrShards == 0 {
panic(fmt.Sprintf("scylla: %s not a sharded connection", conn.Address()))
Expand All @@ -189,12 +196,31 @@ func (p *scyllaConnPicker) Put(conn *Conn) {
copy(p.conns, conns)
}
if c := p.conns[s.shard]; c != nil {
conn.Close()
p.excessConns = append(p.excessConns, conn)
if len(p.excessConns) > maxExcessConnsFactor*p.nrShards {
p.closeExcessConns()
}
return
}
p.conns[s.shard] = conn
p.nrConns++
if p.nrConns >= p.nrShards {
// We have reached one connection to each shard and
// it's time to close the excess connections.
p.closeExcessConns()
}
if gocqlDebug {
Logger.Printf("scylla: %s put shard %d connection total: %d missing: %d", conn.Address(), s.shard, p.nrConns, p.nrShards-p.nrConns)
}
}

// closeExcessConns closes the excess connections and clears
// the excessConns slice. This function needs to be called
// in a goroutine safe context, i.e. when the external pool
// write lock is held or other synchronization is needed.
func (p *scyllaConnPicker) closeExcessConns() {
for _, c := range p.excessConns {
c.Close()
}
p.excessConns = nil
}

0 comments on commit 6a327ee

Please sign in to comment.