Skip to content

Commit

Permalink
Various changes
Browse files Browse the repository at this point in the history
  + Remove global singleton, by passing around context when possible and by using a clojure for Rend New Cassandra handler creation. Purpose it not recreate a new cnx toward cassandra each time this handler is invoked.

  + Move out from the flaky unlogged batch for SETs. Problem is that managing size of the batch is tricky as it depends not on the number of elements but of the bytes size of the batch

  + Instead of relying a single buffer for the batched SETs. Use a fan out approach where goroutines are responsible for sending single SET command toward cassandra. If perf decrease we can re-use the unlogged batch but we a way lot smaller SET buffer, thus avoiding problem from above.

  + Made shutdown of Memendra thread safe

  + Add a custom ConvictionPolicy to avoid apache/cassandra-gocql-driver#915

  + Prepare statement in the cassandra context to avoid allocating a new string every request with fmt.format()

  + Replace Bind(...) calls by Query in new goCQL version (simplify the code)
  • Loading branch information
Romain GERARD committed Apr 29, 2020
1 parent ef49988 commit d884d0c
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 223 deletions.
38 changes: 22 additions & 16 deletions app/memandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ func initDefaultConfig() {
viper.SetDefault("CassandraHostname", "127.0.0.1")
viper.SetDefault("CassandraKeyspace", "kvstore")
viper.SetDefault("CassandraBucket", "bucket")
viper.SetDefault("CassandraBatchBufferItemSize", 80000)
viper.SetDefault("CassandraBatchBufferMaxAgeMs", 200*time.Millisecond)
viper.SetDefault("CassandraBatchMinItemSize", 1000)
viper.SetDefault("CassandraBatchMaxItemSize", 5000)
viper.SetDefault("CassandraTimeoutMs", 1000*time.Millisecond)
viper.SetDefault("CassandraTimeoutMs", 10000*time.Millisecond)
viper.SetDefault("CassandraConnectTimeoutMs", 1000*time.Millisecond)
viper.SetDefault("CassandraNbConcurrentRequests", 500)
viper.SetDefault("MemcachedMaxBufferedSetRequests", 80*1000)
viper.SetDefault("GracefulShutdownWaitTimeInSec", 10*60)
}

func main() {
Expand Down Expand Up @@ -60,15 +59,18 @@ func main() {
var h1 handlers.HandlerConst
var h2 handlers.HandlerConst

// L1Only MODE
h1 = cassandra.New
h2 = handlers.NilHandler

// Init Cassandra connection in handler
if err := cassandra.InitCassandraConn(); err != nil {
// Init Cassandra connection
cassandraHandler, err := cassandra.New()
if err != nil {
log.Fatal(err)
}

// L1Only MODE
h1 = func() (handlers.Handler, error) {
return cassandraHandler, nil
}
h2 = handlers.NilHandler

l := server.TCPListener(viper.GetInt("ListenPort"))
ps := []protocol.Components{binprot.Components, textprot.Components}

Expand All @@ -80,11 +82,15 @@ func main() {
go func() {
_ = <-gracefulStop
log.Println("[INFO] Gracefully stopping Memandra server")
log.Println("[INFO] Setting Cassandra handler to readonly mode")
cassandra.SetReadonlyMode()
time.Sleep(500 * time.Millisecond)
log.Println("[INFO] Forcing write buffer to be flushed before exiting")
cassandra.FlushBuffer()
go func() {
waitTime := viper.GetDuration("GracefulShutdownWaitTimeInSec") * time.Second
log.Println("[INFO] Waiting", waitTime, "for Cassandra executors to shutdown")
time.Sleep(waitTime)
log.Println("[INFO] Forcing exit as", waitTime, "passed")
os.Exit(0)
}()

cassandraHandler.Shutdown()
os.Exit(0)
}()

Expand Down
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ InternalMetricsListenAddress: 11299
CassandraHostname: "cassandra-cstars999.service.consul.preprod.crto.in"
CassandraKeyspace: "kvstore"
CassandraBucket: "bucket1"
CassandraNbConcurrentRequests: 1
MemcachedMaxBufferedSetRequests: 80000
Loading

0 comments on commit d884d0c

Please sign in to comment.