Skip to content

Commit

Permalink
[WIP] Various changes
Browse files Browse the repository at this point in the history
  + Remove global singleton, by passing around context when possible and by using a clojure for Rend New Cassandra handler creation. Purpose it not recreate a new cnx toward cassandra each time this handler is invoked.

  + Move out from the flaky unlogged batch for SETs. Problem is that managing size of the batch is tricky as it depends not on the number of elements but of the bytes size of the batch

  + Instead of relying a single buffer for the batched SETs. Use a fan out approach where goroutines are responsible for sending single SET command toward cassandra. If perf decrease we can re-use the unlogged batch but we a way lot smaller SET buffer, thus avoiding problem from above.

  + Made shutdown of Memendra thread safe

  + Add a custom ConvictionPolicy to avoid apache/cassandra-gocql-driver#915

  + Prepare statement in the cassandra context to avoid allocating a new string every request with fmt.format()

  +
  • Loading branch information
Romain GERARD committed Apr 27, 2020
1 parent ef49988 commit 39eec8a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 121 deletions.
37 changes: 21 additions & 16 deletions app/memandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ func initDefaultConfig() {
viper.SetDefault("CassandraHostname", "127.0.0.1")
viper.SetDefault("CassandraKeyspace", "kvstore")
viper.SetDefault("CassandraBucket", "bucket")
viper.SetDefault("CassandraBatchBufferItemSize", 80000)
viper.SetDefault("CassandraBatchBufferMaxAgeMs", 200*time.Millisecond)
viper.SetDefault("CassandraBatchMinItemSize", 1000)
viper.SetDefault("CassandraBatchMaxItemSize", 5000)
viper.SetDefault("CassandraTimeoutMs", 1000*time.Millisecond)
viper.SetDefault("CassandraTimeoutMs", 10000*time.Millisecond)
viper.SetDefault("CassandraConnectTimeoutMs", 1000*time.Millisecond)
viper.SetDefault("CassandraNbConcurrentRequests", 500)
viper.SetDefault("MemcachedMaxBufferedSetRequests", 80*1000)
}

func main() {
Expand Down Expand Up @@ -60,15 +58,18 @@ func main() {
var h1 handlers.HandlerConst
var h2 handlers.HandlerConst

// L1Only MODE
h1 = cassandra.New
h2 = handlers.NilHandler

// Init Cassandra connection in handler
if err := cassandra.InitCassandraConn(); err != nil {
// Init Cassandra connection
cassandraHandler, err := cassandra.New()
if err != nil {
log.Fatal(err)
}

// L1Only MODE
h1 = func() (handlers.Handler, error) {
return cassandraHandler, nil
}
h2 = handlers.NilHandler

l := server.TCPListener(viper.GetInt("ListenPort"))
ps := []protocol.Components{binprot.Components, textprot.Components}

Expand All @@ -80,11 +81,15 @@ func main() {
go func() {
_ = <-gracefulStop
log.Println("[INFO] Gracefully stopping Memandra server")
log.Println("[INFO] Setting Cassandra handler to readonly mode")
cassandra.SetReadonlyMode()
time.Sleep(500 * time.Millisecond)
log.Println("[INFO] Forcing write buffer to be flushed before exiting")
cassandra.FlushBuffer()
log.Println("[INFO] Shutting down Cassandra executors")
go func() {
waitTime := 5 * time.Minute
time.Sleep(waitTime)
log.Println("[INFO] Forcing exit as", waitTime, "passed")
os.Exit(0)
}()

cassandraHandler.Shutdown()
os.Exit(0)
}()

Expand Down
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ InternalMetricsListenAddress: 11299
CassandraHostname: "cassandra-cstars999.service.consul.preprod.crto.in"
CassandraKeyspace: "kvstore"
CassandraBucket: "bucket1"
CassandraNbConcurrentRequests: 1
MemcachedMaxBufferedSetRequests: 80000
191 changes: 86 additions & 105 deletions handlers/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ package cassandra
import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"

"github.com/gocql/gocql"
"github.com/netflix/rend/common"
"github.com/netflix/rend/handlers"
"github.com/netflix/rend/metrics"
"github.com/netflix/rend/timer"
"github.com/spf13/viper"
)

type Handler struct {
session *gocql.Session
setbuffer chan CassandraSet
buffertimer *time.Timer
readonlymode bool
session *gocql.Session
setbuffer chan CassandraSet
keyspace string
bucket string
setStmt string
getStmt string
replaceStmt string
isShutingDown uint32
executors sync.WaitGroup
}

type CassandraSet struct {
Expand All @@ -37,142 +43,125 @@ var (
HistSetBufferWait = metrics.AddHistogram("set_batch_buffer_timewait", false, nil)
)

var singleton *Handler

// SetReadonlyMode switch Cassandra handler to readonly mode for graceful exit
func SetReadonlyMode() {
singleton.readonlymode = true
func (h *Handler) Shutdown() {
atomic.AddUint32(&h.isShutingDown, 1)
h.executors.Wait()
}

func bufferSizeCheckLoop() {
ticker := time.NewTicker(5 * time.Millisecond)
for {
select {
case <-ticker.C:
if len(singleton.setbuffer) >= viper.GetInt("CassandraBatchMinItemSize") {
go FlushBuffer()
}
}
}
func (h *Handler) IsShutingDown() bool {
return atomic.LoadUint32(&h.isShutingDown) > 0
}

// FlushBuffer triggers a batched write operation in Cassandra target
func FlushBuffer() {
chanLen := len(singleton.setbuffer)
metrics.SetIntGauge(MetricSetBufferSize, uint64(chanLen))
// We read from the channel from N executor/goroutine in order to propagate the writes
func cassandraSetReal(cassandraHandler *Handler) {

if chanLen > 0 {
metrics.IncCounter(MetricCmdSetBatch)
for !cassandraHandler.IsShutingDown() {
item := <-cassandraHandler.setbuffer
query := cassandraHandler.session.Query(cassandraHandler.setStmt, item.Key, item.Data, item.Exptime)

// fmt.Println(chanLen)
if chanLen >= viper.GetInt("CassandraBatchMaxItemSize") {
chanLen = viper.GetInt("CassandraBatchMaxItemSize")
}
b := singleton.session.NewBatch(gocql.UnloggedBatch)
for i := 1; i <= chanLen; i++ {
item := (<-singleton.setbuffer)
b.Query(
fmt.Sprintf(
"INSERT INTO %s.%s (keycol,valuecol) VALUES (?, ?) USING TTL ?",
viper.GetString("CassandraKeyspace"),
viper.GetString("CassandraBucket"),
),
item.Key,
item.Data,
item.Exptime,
)
if err := query.Exec(); err != nil {
log.Println("[ERROR] Cassandra SET returned an error. ", err)
}

// exec CQL batch
start := timer.Now()
err := singleton.session.ExecuteBatch(b)
if err != nil {
metrics.IncCounter(MetricCmdSetBatchErrors)
log.Println("[ERROR] Batched Cassandra SET returned an error. ", err)
} else {
metrics.IncCounter(MetricCmdSetBatchSuccess)
metrics.ObserveHist(HistSetBatch, timer.Since(start))
}
}
//batch := cassandraHandler.session.NewBatch(gocql.UnloggedBatch)
//for i := 1; i <= 1000; i++ {
// item := <-cassandraHandler.setbuffer
// batch.Query(queryStr, item.Key, item.Data, item.Exptime)
//}
//if err := cassandraHandler.session.ExecuteBatch(batch); err != nil {
// log.Println("[ERROR] Cassandra BATCHED SET returned an error. ", err)
//}

// TODO: we need to protect this timer reset, and make it thread safe !!
singleton.buffertimer.Reset(200 * time.Millisecond)
}
cassandraHandler.executors.Done()
}

// InitCassandraConn initialize Cassandra global connection, call it once before starting ListenAndServe()
func InitCassandraConn() error {
// Only spawn a unique cassandra session,
// store this session in a global singleton.
if singleton == nil {
clust := gocql.NewCluster(viper.GetString("CassandraHostname"))
clust.Keyspace = viper.GetString("CassandraKeyspace")
clust.Consistency = gocql.LocalOne
clust.Timeout = viper.GetDuration("CassandraTimeoutMs")
clust.ConnectTimeout = viper.GetDuration("CassandraConnectTimeoutMs")
sess, err := clust.CreateSession()
if err != nil {
return err
}
// Dirty https://github.com/gocql/gocql/issues/915
type SimpleConvictionPolicy struct{}

singleton = &Handler{
session: sess,
setbuffer: make(chan CassandraSet, viper.GetInt("CassandraBatchBufferItemSize")),
buffertimer: time.AfterFunc(viper.GetDuration("CassandraBatchBufferMaxAgeMs"), FlushBuffer),
readonlymode: false,
}
func (e *SimpleConvictionPolicy) Reset(host *gocql.HostInfo) {}
func (e *SimpleConvictionPolicy) AddFailure(error error, host *gocql.HostInfo) bool {
return false
}

go bufferSizeCheckLoop()
func New() (*Handler, error) {
cluster := gocql.NewCluster(viper.GetString("CassandraHostname"))
cluster.Keyspace = viper.GetString("CassandraKeyspace")
cluster.Consistency = gocql.LocalOne
cluster.Timeout = viper.GetDuration("CassandraTimeoutMs")
cluster.ConnectTimeout = viper.GetDuration("CassandraConnectTimeoutMs")
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
cluster.ConvictionPolicy = &SimpleConvictionPolicy{}
cluster.NumConns = 10

sess, err := cluster.CreateSession()
if err != nil {
return nil, err
}

// TODO : prepare Cassandra statements for common queries
// Currently using session.bind() seems to registers statements if they don't exists in C*
cassandraHandler := &Handler{
session: sess,
keyspace: viper.GetString("CassandraKeyspace"),
bucket: viper.GetString("CassandraBucket"),

return nil
}
// Statements
setbuffer: make(chan CassandraSet, viper.GetInt("MemcachedMaxBufferedSetRequests")),
setStmt: fmt.Sprintf("INSERT INTO %s (key,value) VALUES (?, ?) USING TTL ?", viper.GetString("CassandraBucket")),
getStmt: fmt.Sprintf("SELECT key,value FROM %s WHERE key=? LIMIT 1", viper.GetString("CassandraBucket")),
replaceStmt: fmt.Sprintf("SELECT writetime(value) FROM %s WHERE key=? LIMIT 1", viper.GetString("CassandraBucket")),

func New() (handlers.Handler, error) {
// Synchronization
isShutingDown: 0,
executors: sync.WaitGroup{},
}

// Spawn go-routines that will fan out memcached requests to Cassandra
for i := 1; i <= viper.GetInt("CassandraNbConcurrentRequests"); i++ {
cassandraHandler.executors.Add(1)
go cassandraSetReal(cassandraHandler)
}

return singleton, nil
return cassandraHandler, nil
}

func (h *Handler) Close() error {

return nil
}

func computeExpTime(Exptime uint32) uint32 {
// Maximum allowed relative TTL in memcached protocol
max_ttl := uint32(60 * 60 * 24 * 30) // number of seconds in 30 days
if Exptime > max_ttl {
maxTtl := uint32(60 * 60 * 24 * 30) // number of seconds in 30 days
if Exptime > maxTtl {
return Exptime - uint32(time.Now().Unix())
}
return Exptime
}

func (h *Handler) Set(cmd common.SetRequest) error {
if h.readonlymode {
if h.IsShutingDown() {
return common.ErrItemNotStored
}
realExptime := computeExpTime(cmd.Exptime)

start := timer.Now()
h.setbuffer <- CassandraSet{
Key: cmd.Key,
Data: cmd.Data,
Flags: cmd.Flags,
Exptime: realExptime,
Exptime: computeExpTime(cmd.Exptime),
}
metrics.ObserveHist(HistSetBufferWait, timer.Since(start))
// TODO : maybe add a set timeout that return "not_stored" in case of buffer error ?

//// TODO : maybe add a set timeout that return "not_stored" in case of buffer error ?
return nil
}

func (h *Handler) Add(cmd common.SetRequest) error {

return nil
}

func (h *Handler) Replace(cmd common.SetRequest) error {
if h.readonlymode {
if h.IsShutingDown() {
return common.ErrItemNotStored
}

Expand Down Expand Up @@ -234,28 +223,21 @@ func (h *Handler) Get(cmd common.GetRequest) (<-chan common.GetResponse, <-chan

var val []byte

if err := h.session.Bind(
fmt.Sprintf(
"SELECT keycol,valuecol FROM %s.%s where keycol=?",
viper.GetString("CassandraKeyspace"),
viper.GetString("CassandraBucket"),
),
key_qi,
).Scan(&key, &val); err == nil {
if err := h.session.Bind(h.getStmt, key_qi).Scan(&key, &val); err == nil {
dataOut <- common.GetResponse{
Miss: false,
Quiet: cmd.Quiet[idx],
Opaque: cmd.Opaques[idx],
Flags: 0,
Key: []byte(key),
Key: key,
Data: val,
}
} else {
dataOut <- common.GetResponse{
Miss: true,
Quiet: cmd.Quiet[idx],
Opaque: cmd.Opaques[idx],
Key: []byte(key),
Key: key,
Data: nil,
}
}
Expand All @@ -282,7 +264,7 @@ func (h *Handler) GetE(cmd common.GetRequest) (<-chan common.GetEResponse, <-cha

if err := h.session.Bind(
fmt.Sprintf(
"SELECT keycol,valuecol,TTL(valuecol) FROM %s.%s where keycol=?",
"SELECT key,value,TTL(value) FROM %s.%s where key=?",
viper.GetString("CassandraKeyspace"),
viper.GetString("CassandraBucket"),
),
Expand All @@ -293,7 +275,7 @@ func (h *Handler) GetE(cmd common.GetRequest) (<-chan common.GetEResponse, <-cha
Quiet: cmd.Quiet[idx],
Opaque: cmd.Opaques[idx],
Flags: 0,
Key: []byte(key),
Key: key,
Data: val,
Exptime: ttl,
}
Expand All @@ -302,7 +284,7 @@ func (h *Handler) GetE(cmd common.GetRequest) (<-chan common.GetEResponse, <-cha
Miss: true,
Quiet: cmd.Quiet[idx],
Opaque: cmd.Opaques[idx],
Key: []byte(key),
Key: key,
Data: nil,
}
}
Expand Down Expand Up @@ -342,6 +324,5 @@ func (h *Handler) Delete(cmd common.DeleteRequest) error {
}

func (h *Handler) Touch(cmd common.TouchRequest) error {

return nil
}

0 comments on commit 39eec8a

Please sign in to comment.