From 15378b8d23fd49c18248b1386462f858dcd19ba6 Mon Sep 17 00:00:00 2001 From: Eduardo Aguilar Date: Thu, 16 Feb 2023 08:53:50 -0600 Subject: [PATCH 1/4] chore: save draft for restart connection --- go.mod | 1 + go.sum | 14 ++++++ qb/client.go | 66 ++++++++++++++--------------- qb/errors/errors.go | 11 +++++ qb/qb.go | 89 +++++++++++++++++++++++++++++++++------ qb/qcount/count.go | 19 ++++++--- qb/qcount/count_exec.go | 6 +-- qb/qcount/count_test.go | 3 ++ qb/qdelete/delete.go | 25 ++++++----- qb/qdelete/delete_exec.go | 9 ++-- qb/qinsert/insert.go | 17 +++++--- qb/qinsert/insert_exec.go | 36 +++++++++++++--- qb/qselect/build.go | 4 +- qb/qselect/select.go | 17 +++++--- qb/qselect/select_exec.go | 15 +++---- qb/query/query.go | 27 +++++------- qb/qupdate/update.go | 17 +++++--- qb/qupdate/update_exec.go | 10 +++-- runner/runner.go | 61 +++++++++++++++++++++++++++ 19 files changed, 321 insertions(+), 126 deletions(-) create mode 100644 qb/errors/errors.go create mode 100644 runner/runner.go diff --git a/go.mod b/go.mod index 3a8fb2e..a31e1be 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/Drafteame/cassandra-builder go 1.18 require ( + github.com/avast/retry-go/v4 v4.3.3 github.com/gocql/gocql v1.3.1 github.com/magefile/mage v1.14.0 github.com/scylladb/gocqlx v1.5.0 diff --git a/go.sum b/go.sum index d2d0bc8..f1225be 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,11 @@ +github.com/avast/retry-go/v4 v4.3.3 h1:G56Bp6mU0b5HE1SkaoVjscZjlQb0oy4mezwY/cGH19w= +github.com/avast/retry-go/v4 v4.3.3/go.mod h1:rg6XFaiuFYII0Xu3RDbZQkxCofFwruZKW8oEF1jpWiU= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gocql/gocql v0.0.0-20200131111108-92af2e088537/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/gocql/gocql v1.3.1 h1:BTwM4rux+ah5G3oH6/MQa+tur/TDd/XAAOXDxBBs7rg= @@ -23,11 +26,22 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCMZqwMCJ3KupFc= github.com/scylladb/gocqlx v1.5.0 h1:p7NEqRaCMAtW2nvq62iyUNXmIYP29373YOC7D2Xd7Qg= github.com/scylladb/gocqlx v1.5.0/go.mod h1:QarZcw5kpYh31MXfxiN2JWWvF1cgZbYqfTfXwmwhpEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/qb/client.go b/qb/client.go index 91e99d2..6561ddf 100644 --- a/qb/client.go +++ b/qb/client.go @@ -12,61 +12,42 @@ import ( ) type client struct { + canRestart bool + config Config session *gocql.Session - debug bool printQuery query.DebugPrint } -// NewClient creates a new cassandra client manager from config -func NewClient(conf Config) (Client, error) { - session, err := getSession(conf) - if err != nil { - return nil, err - } - - c := &client{session: session, debug: conf.Debug, printQuery: query.DefaultDebugPrint} - - if conf.PrintQuery != nil { - c.printQuery = conf.PrintQuery - } - - return c, nil -} - -// NewClientWithSession creates a new cassandra client manager from a given session. -func NewClientWithSession(session *gocql.Session, conf Config) (Client, error) { - c := &client{session: session, debug: conf.Debug, printQuery: query.DefaultDebugPrint} - - if conf.PrintQuery != nil { - c.printQuery = conf.PrintQuery - } - - return c, nil -} - var _ Client = &client{} func (c *client) Select(f ...string) *qselect.Query { - return qselect.New(c.session, c.debug, c.printQuery).Fields(f...) + return qselect.New(c).Fields(f...) } func (c *client) Insert(f ...string) *qinsert.Query { - return qinsert.New(c.session, c.debug, c.printQuery).Fields(f...) + return qinsert.New(c).Fields(f...) } func (c *client) Update(t string) *qupdate.Query { - return qupdate.New(c.session, c.debug, c.printQuery).Table(t) + return qupdate.New(c).Table(t) } func (c *client) Delete() *qdelete.Query { - return qdelete.New(c.session, c.debug, c.printQuery) + return qdelete.New(c) } func (c *client) Count() *qcount.Query { - return qcount.New(c.session, c.debug, c.printQuery) + return qcount.New(c) +} + +func (c *client) Debug() bool { + return c.config.Debug +} + +func (c *client) PrintFn() query.DebugPrint { + return c.printQuery } -// Close finish cassandra session func (c *client) Close() { c.session.Close() } @@ -75,6 +56,23 @@ func (c *client) Session() *gocql.Session { return c.session } +func (c *client) Config() Config { + return c.config +} + +func (c *client) Restart() error { + c.Close() + + session, err := getSession(c.config) + if err != nil { + return err + } + + c.session = session + + return nil +} + func getSession(c Config) (*gocql.Session, error) { cluster := gocql.NewCluster(c.ContactPoints...) cluster.Keyspace = c.KeyspaceName diff --git a/qb/errors/errors.go b/qb/errors/errors.go new file mode 100644 index 0000000..8321193 --- /dev/null +++ b/qb/errors/errors.go @@ -0,0 +1,11 @@ +package errors + +import "errors" + +var ( + ErrNilBinding = errors.New("cassandra-builder: nil bind is not allowed") + ErrNoPtrBinding = errors.New("cassandra-builder: bind value should be a pointer") + ErrNoStructOrSliceBinding = errors.New("cassandra-builder: bind value should be a struct or slice") + ErrNoSliceOfStructsBinding = errors.New("cassandra-builder: bind value should be a slice of structs") + ErrClosedConnection = errors.New("cassandra-builder: can execute on closed connection") +) diff --git a/qb/qb.go b/qb/qb.go index b56e9bb..677375f 100644 --- a/qb/qb.go +++ b/qb/qb.go @@ -1,6 +1,7 @@ package qb import ( + "log" "time" "github.com/gocql/gocql" @@ -29,19 +30,20 @@ const ( // Config is the main cassandra configuration needed type Config struct { - Port int `yaml:"port" json:"port"` - KeyspaceName string `yaml:"keyspace_name" json:"keyspace_name"` - Username string `yaml:"username" json:"username"` - Password string `yaml:"password" json:"password"` - ContactPoints []string `yaml:"contact_points" json:"contact_points"` - Debug bool `yaml:"debug" json:"debug"` - ProtoVersion int `yaml:"proto_version" json:"proto_version"` - Consistency Consistency `yaml:"consistency" json:"consistency"` - CaPath string `yaml:"ca_path" json:"ca_path"` - DisableInitialHostLookup bool `yaml:"disable_initial_host_lookup" json:"disable_initial_host_lookup"` - Timeout time.Duration `yaml:"timeout" json:"timeout"` - ConnectTimeout time.Duration `yaml:"connect_timeout" json:"connect_timeout"` + Port int + KeyspaceName string + Username string + Password string + ContactPoints []string + Debug bool + ProtoVersion int + Consistency Consistency + CaPath string + DisableInitialHostLookup bool + Timeout time.Duration + ConnectTimeout time.Duration PrintQuery query.DebugPrint + NumRetries uint } // Client is the main cassandra client abstraction to work with the database @@ -64,6 +66,67 @@ type Client interface { // Session return the plain session object to build some direct query Session() *gocql.Session - // Close close cassandra connection pool + // Debug return an assertion for debugging + Debug() bool + + // PrintFn return the configured debug print function. + PrintFn() query.DebugPrint + + // Restart should close and start a new connection. + Restart() error + + // Config return current client configuration + Config() Config + + // Close ends cassandra connection pool Close() } + +// DefaultDebugPrint defines a default function that prints resultant query and arguments before being executed +// and when the Debug flag is true +func DefaultDebugPrint(q string, args []interface{}, err error) { + if q != "" { + log.Printf("query: %v \nargs: %v\n", q, args) + } + + if err != nil { + log.Println("err: ", err.Error()) + } +} + +// NewClient creates a new cassandra client manager from config +func NewClient(conf Config) (Client, error) { + session, err := getSession(conf) + if err != nil { + return nil, err + } + + c := &client{ + session: session, + config: conf, + canRestart: true, + printQuery: DefaultDebugPrint, + } + + if conf.PrintQuery != nil { + c.printQuery = conf.PrintQuery + } + + return c, nil +} + +// NewClientWithSession creates a new cassandra client manager from a given session. +func NewClientWithSession(session *gocql.Session, conf Config) (Client, error) { + c := &client{ + session: session, + config: conf, + canRestart: false, + printQuery: DefaultDebugPrint, + } + + if conf.PrintQuery != nil { + c.printQuery = conf.PrintQuery + } + + return c, nil +} diff --git a/qb/qcount/count.go b/qb/qcount/count.go index b2d6708..f8764d9 100644 --- a/qb/qcount/count.go +++ b/qb/qcount/count.go @@ -6,9 +6,18 @@ import ( "github.com/Drafteame/cassandra-builder/qb/query" ) +//go:generate mockery --name=Client --filename=client.go --structname=Client --output=mocks --outpkg=mocks + +type Client interface { + Session() *gocql.Session + Debug() bool + Restart() error + PrintFn() query.DebugPrint +} + // Query create new select count query type Query struct { - ctx query.Query + client Client table string column string where []query.WhereStm @@ -17,12 +26,8 @@ type Query struct { } // New create a new count query instance by passing a cassandra session -func New(s *gocql.Session, d bool, dp query.DebugPrint) *Query { - return &Query{ctx: query.Query{ - Session: s, - Debug: d, - PrintQuery: dp, - }} +func New(c Client) *Query { + return &Query{client: c} } // Column set count column of the query diff --git a/qb/qcount/count_exec.go b/qb/qcount/count_exec.go index ca13434..5d56b17 100644 --- a/qb/qcount/count_exec.go +++ b/qb/qcount/count_exec.go @@ -15,7 +15,7 @@ func (cq *Query) Exec() (int64, error) { var count int64 - if err := cq.ctx.Session.Query(q, cq.args...).Consistency(gocql.One).Scan(&count); err != nil { + if err := cq.client.Session().Query(q, cq.args...).Consistency(gocql.One).Scan(&count); err != nil { return 0, err } @@ -35,8 +35,8 @@ func (cq *Query) build() string { queryStr, _ := q.ToCql() - if cq.ctx.Debug { - cq.ctx.PrintQuery(queryStr, cq.args) + if cq.client.Debug() { + cq.client.PrintFn()(queryStr, cq.args) } return strings.TrimSpace(queryStr) diff --git a/qb/qcount/count_test.go b/qb/qcount/count_test.go index da2f72e..14adb3e 100644 --- a/qb/qcount/count_test.go +++ b/qb/qcount/count_test.go @@ -15,6 +15,7 @@ func TestNew(t *testing.T) { if !reflect.DeepEqual(q.ctx.Session, s) { t.Errorf("associated session is different") + return } } @@ -24,11 +25,13 @@ func TestQuery_From(t *testing.T) { q.From("test") if q.table != "test" { t.Errorf("exp: test got: %v", q.table) + return } q.From("test2") if q.table != "test2" { t.Errorf("exp: test2 got: %v", q.table) + return } } diff --git a/qb/qdelete/delete.go b/qb/qdelete/delete.go index 0b35fcd..fbb2f6d 100644 --- a/qb/qdelete/delete.go +++ b/qb/qdelete/delete.go @@ -3,24 +3,29 @@ package qdelete import ( "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb" "github.com/Drafteame/cassandra-builder/qb/query" ) +type Client interface { + Session() *gocql.Session + Config() qb.Config + Restart() error + Debug() bool + PrintFn() query.DebugPrint +} + // Query represents a Cassandra delete query. Execution should not bind any value type Query struct { - ctx query.Query - table string - where []query.WhereStm - args []interface{} + client Client + table string + where []query.WhereStm + args []interface{} } // New create a new delete query instance by passing a cassandra session -func New(s *gocql.Session, d bool, dp query.DebugPrint) *Query { - return &Query{ctx: query.Query{ - Session: s, - Debug: d, - PrintQuery: dp, - }} +func New(c Client) *Query { + return &Query{client: c} } // From set table where be data deleted diff --git a/qb/qdelete/delete_exec.go b/qb/qdelete/delete_exec.go index 2ea2053..7930808 100644 --- a/qb/qdelete/delete_exec.go +++ b/qb/qdelete/delete_exec.go @@ -6,13 +6,16 @@ import ( "github.com/scylladb/gocqlx/qb" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/runner" ) // Exec execute delete query and return error on failure func (dq *Query) Exec() error { + run := runner.New(dq.client) + q := dq.build() - if err := dq.ctx.Session.Query(q, dq.args...).Exec(); err != nil { + if err := dq.client.Session().Query(q, dq.args...).Exec(); err != nil { return err } @@ -28,8 +31,8 @@ func (dq *Query) build() string { queryStr, _ := q.ToCql() - if dq.ctx.Debug { - dq.ctx.PrintQuery(queryStr, dq.args) + if dq.client.Debug() { + dq.client.PrintFn()(queryStr, dq.args) } return strings.TrimSpace(queryStr) diff --git a/qb/qinsert/insert.go b/qb/qinsert/insert.go index 8138406..4bae7fa 100644 --- a/qb/qinsert/insert.go +++ b/qb/qinsert/insert.go @@ -6,21 +6,24 @@ import ( "github.com/Drafteame/cassandra-builder/qb/query" ) +type Client interface { + Session() *gocql.Session + Debug() bool + Restart() error + PrintFn() query.DebugPrint +} + // Query represent a Cassandra insert query. Execution should not bind any value type Query struct { - ctx query.Query + client Client table string fields query.Columns args []interface{} } // New creates a new insert query by passing a cassandra session and debug options -func New(s *gocql.Session, d bool, dp query.DebugPrint) *Query { - return &Query{ctx: query.Query{ - Session: s, - Debug: d, - PrintQuery: dp, - }} +func New(c Client) *Query { + return &Query{client: c} } // Fields save query fields that should be used for insert query diff --git a/qb/qinsert/insert_exec.go b/qb/qinsert/insert_exec.go index 2de68ad..b050a80 100644 --- a/qb/qinsert/insert_exec.go +++ b/qb/qinsert/insert_exec.go @@ -3,18 +3,44 @@ package qinsert import ( "strings" + "github.com/gocql/gocql" "github.com/scylladb/gocqlx/qb" + + "github.com/Drafteame/cassandra-builder/qb/errors" ) // Exec execute insert query with args func (iq *Query) Exec() error { q := iq.build() - if err := iq.ctx.Session.Query(q, iq.args...).Exec(); err != nil { - return err + execFn := func() error { + if r.client.Session() == nil || r.client.Session().Closed() { + return errors.ErrClosedConnection + } + + return r.client.Session().Query(q, iq.args...).Exec() + } + + opts := []retry.Option{ + retry.Attempts(r.client.Config().NumRetries), + retry.RetryIf(func(err error) bool { + switch err { + case gocql.ErrNoConnections, errors.ErrClosedConnection: + return true + default: + return false + } + }), + retry.OnRetry(func(n uint, err error) { + errRestart := r.client.Restart() + + if r.client.Debug() { + r.client.PrintFn()("", nil, errRestart) + } + }), } - return nil + return retry.Do(execFn, opts...) } func (iq *Query) build() string { @@ -23,8 +49,8 @@ func (iq *Query) build() string { queryStr, _ := q.ToCql() - if iq.ctx.Debug { - iq.ctx.PrintQuery(queryStr, iq.args) + if iq.client.Debug() { + iq.client.PrintFn()(queryStr, iq.args) } return strings.TrimSpace(queryStr) diff --git a/qb/qselect/build.go b/qb/qselect/build.go index 7093f89..3d5feab 100644 --- a/qb/qselect/build.go +++ b/qb/qselect/build.go @@ -48,8 +48,8 @@ func (q *Query) build() string { queryStr, _ := sb.Json().ToCql() - if q.ctx.Debug { - q.ctx.PrintQuery(queryStr, q.args) + if q.client.Debug() { + q.client.PrintFn()(queryStr, q.args) } return strings.TrimSpace(queryStr) diff --git a/qb/qselect/select.go b/qb/qselect/select.go index ca4db80..790cbc6 100644 --- a/qb/qselect/select.go +++ b/qb/qselect/select.go @@ -6,9 +6,16 @@ import ( "github.com/Drafteame/cassandra-builder/qb/query" ) +type Client interface { + Session() *gocql.Session + Debug() bool + Restart() error + PrintFn() query.DebugPrint +} + // Query represents a cassandra select statement and his options type Query struct { - ctx query.Query + client Client fields query.Columns args []interface{} table string @@ -22,12 +29,8 @@ type Query struct { } // New create a new select query by passing a cassandra session and debug options -func New(s *gocql.Session, d bool, dp query.DebugPrint) *Query { - return &Query{ctx: query.Query{ - Session: s, - Debug: d, - PrintQuery: dp, - }} +func New(c Client) *Query { + return &Query{client: c} } // Fields save query fields that should be used for select query diff --git a/qb/qselect/select_exec.go b/qb/qselect/select_exec.go index b6e1dba..ffb870a 100644 --- a/qb/qselect/select_exec.go +++ b/qb/qselect/select_exec.go @@ -1,18 +1,18 @@ package qselect import ( - "errors" "reflect" "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb/errors" "github.com/Drafteame/cassandra-builder/qb/query" ) // One return just one result on bind action func (q *Query) One() error { if q.bind == nil { - return errors.New("nil bind is not allowed, use None() functions instead One()") + return errors.ErrNilBinding } if err := query.VerifyBind(q.bind, reflect.Struct); err != nil { @@ -23,7 +23,7 @@ func (q *Query) One() error { var jsonRow string - if err := q.ctx.Session.Query(sq, q.args...).Consistency(gocql.One).Scan(&jsonRow); err != nil { + if err := q.client.Session().Query(sq, q.args...).Consistency(gocql.One).Scan(&jsonRow); err != nil { return err } @@ -45,7 +45,7 @@ func (q *Query) One() error { // All return all rows on bind action. Bind should be a slice of structs func (q *Query) All() error { if q.bind == nil { - return errors.New("nil bind is not allowed, use None() function instead All()") + return errors.ErrNilBinding } if err := query.VerifyBind(q.bind, reflect.Slice); err != nil { @@ -56,8 +56,7 @@ func (q *Query) All() error { var jsonRow string - iter := q.ctx.Session.Query(sq, q.args...).Iter() - defer func() { _ = iter.Close() }() + iter := q.client.Session().Query(sq, q.args...).Iter() ib := reflect.Indirect(reflect.ValueOf(q.bind)) @@ -73,14 +72,14 @@ func (q *Query) All() error { ib.Set(reflect.Append(ib, reflect.Indirect(elem))) } - return nil + return iter.Close() } // None execute a qselect query without returning values func (q *Query) None() error { sq := q.build() - if err := q.ctx.Session.Query(sq, q.args...).Exec(); err != nil { + if err := q.client.Session().Query(sq, q.args...).Exec(); err != nil { return err } diff --git a/qb/query/query.go b/qb/query/query.go index 45f9cfa..1b34353 100644 --- a/qb/query/query.go +++ b/qb/query/query.go @@ -2,13 +2,13 @@ package query import ( "encoding/json" - "errors" "fmt" - "log" "reflect" "time" "github.com/gocql/gocql" + + "github.com/Drafteame/cassandra-builder/qb/errors" ) type ( @@ -19,7 +19,7 @@ type ( Order string // DebugPrint defines a callback that prints query values - DebugPrint func(q string, args []interface{}) + DebugPrint func(q string, args []interface{}, err error) // Query Base definition of query Query struct { @@ -38,26 +38,20 @@ const ( DatetimeLayout string = "2006-01-02 15:04:05.000Z" ) -// DefaultDebugPrint defines a default function that prints resultant query and arguments before being executed -// and when the Debug flag is true -func DefaultDebugPrint(q string, args []interface{}) { - log.Printf("query: %v \nargs: %v\n", q, args) -} - // VerifyBind verify if an interface is bindable or not by checking it is a Ptr kind func VerifyBind(b interface{}, k reflect.Kind) error { t := reflect.TypeOf(b) v := reflect.ValueOf(b) if t.Kind() != reflect.Ptr { - return errors.New("bind value should be a pointer") + return errors.ErrNoPtrBinding } str := reflect.Indirect(v).Interface() t = reflect.TypeOf(str) if t.Kind() != k { - return errors.New("bind value needs to be a struct to get one value, or a slice of structs to get many") + return errors.ErrNoStructOrSliceBinding } return nil @@ -69,11 +63,11 @@ func BindMapToStruct(m map[string]interface{}, st reflect.Value) error { indTyp := indVal.Type() if st.Kind() != reflect.Ptr { - return errors.New("bind should be a slice of struct pointers with `gocql` tags") + return errors.ErrNoPtrBinding } if indVal.Kind() != reflect.Struct { - return errors.New("bind should be a slice of struct pointers with `gocql` tags - 2") + return errors.ErrNoSliceOfStructsBinding } numField := indTyp.NumField() @@ -86,9 +80,8 @@ func BindMapToStruct(m map[string]interface{}, st reflect.Value) error { mv, ok := m[tagField] if tagField != "" && ok { - err := CastMapValue(mv, field.Type, value) - if err != nil { - fmt.Printf("err: %v\n", err) + if err := CastMapValue(mv, field.Type, value); err != nil { + return err } } } @@ -117,7 +110,7 @@ func CastMapValue(mv interface{}, t reflect.Type, v reflect.Value) error { t, _ := time.Parse(DatetimeLayout, mv.(string)) v.Set(reflect.ValueOf(t)) default: - return fmt.Errorf("can't cast value of type %T with value %v, to type %v", mv, mv, t) + return fmt.Errorf("cassandra-builder: can't cast value of type %T with value %v, to type %v", mv, mv, t) } return nil diff --git a/qb/qupdate/update.go b/qb/qupdate/update.go index 026abd9..6bd4796 100644 --- a/qb/qupdate/update.go +++ b/qb/qupdate/update.go @@ -6,9 +6,16 @@ import ( "github.com/Drafteame/cassandra-builder/qb/query" ) +type Client interface { + Session() *gocql.Session + Debug() bool + Restart() error + PrintFn() query.DebugPrint +} + // Query represent a Cassandra update query. Execution should not bind any value type Query struct { - ctx query.Query + client Client table string fields query.Columns args []interface{} @@ -16,12 +23,8 @@ type Query struct { } // New create a new update query by passing a cassandra session and the affected table -func New(s *gocql.Session, d bool, dp query.DebugPrint) *Query { - return &Query{ctx: query.Query{ - Session: s, - Debug: d, - PrintQuery: dp, - }} +func New(c Client) *Query { + return &Query{client: c} } // Table set the table name to affect with the update query diff --git a/qb/qupdate/update_exec.go b/qb/qupdate/update_exec.go index 5bc44db..c08e32c 100644 --- a/qb/qupdate/update_exec.go +++ b/qb/qupdate/update_exec.go @@ -12,7 +12,11 @@ import ( func (uq *Query) Exec() error { q := uq.build() - if err := uq.ctx.Session.Query(q, uq.args...).Exec(); err != nil { + if err := uq.client.Session().Query(q, uq.args...).Exec(); err != nil { + if uq.client.Debug() { + uq.client.PrintFn()(q, uq.args, err) + } + return err } @@ -34,8 +38,8 @@ func (uq *Query) build() string { queryStr, _ := q.ToCql() - if uq.ctx.Debug { - uq.ctx.PrintQuery(queryStr, uq.args) + if uq.client.Debug() { + uq.client.PrintFn()(queryStr, uq.args, nil) } return strings.TrimSpace(queryStr) diff --git a/runner/runner.go b/runner/runner.go new file mode 100644 index 0000000..70c0d52 --- /dev/null +++ b/runner/runner.go @@ -0,0 +1,61 @@ +package runner + +import ( + "github.com/avast/retry-go/v4" + "github.com/gocql/gocql" + + "github.com/Drafteame/cassandra-builder/qb" + "github.com/Drafteame/cassandra-builder/qb/errors" + "github.com/Drafteame/cassandra-builder/qb/query" +) + +type Client interface { + Session() *gocql.Session + Config() qb.Config + Restart() error + Debug() bool + PrintFn() query.DebugPrint +} + +type Runner struct { + client Client +} + +func (r *Runner) Query() {} + +func (r *Runner) QueryOne() {} + +func (r *Runner) QueryNone(query string, args []interface{}) error { + execFn := func() error { + if r.client.Session() == nil || r.client.Session().Closed() { + return errors.ErrClosedConnection + } + + return r.client.Session().Query(query, args...).Exec() + } + + opts := []retry.Option{ + retry.Attempts(r.client.Config().NumRetries), + retry.RetryIf(func(err error) bool { + switch err { + case gocql.ErrNoConnections, errors.ErrClosedConnection: + return true + default: + return false + } + }), + retry.OnRetry(func(n uint, err error) { + errRestart := r.client.Restart() + + if r.client.Debug() { + r.client.PrintFn()("", nil, errRestart) + } + }), + } + + return retry.Do(execFn, opts...) +} + +func New(c Client) *Runner { + return &Runner{client: c} +} From 55942ee8cfc3f9bdab564d85deb41e4732f849d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Haskel?= Date: Tue, 21 Feb 2023 13:07:23 -0300 Subject: [PATCH 2/4] feat: restart connection --- qb/client.go | 9 ++- qb/errors/errors.go | 2 + qb/models/config.go | 39 ++++++++++ qb/qb.go | 40 ++--------- qb/qcount/count.go | 2 + qb/qcount/count_exec.go | 17 +++-- qb/qcount/count_exec_test.go | 4 +- qb/qcount/count_test.go | 18 +++-- qb/qdelete/delete.go | 4 +- qb/qdelete/delete_exec.go | 10 ++- qb/qinsert/insert.go | 2 + qb/qinsert/insert_exec.go | 36 +++------- qb/qinsert/insert_exec_test.go | 2 +- qb/qselect/select.go | 2 + qb/qselect/select_exec.go | 45 +++--------- qb/qupdate/update.go | 2 + qb/qupdate/update_exec.go | 4 +- qb/qupdate/update_exec_test.go | 2 +- qb/runner/runner.go | 126 +++++++++++++++++++++++++++++++++ runner/runner.go | 61 ---------------- 20 files changed, 239 insertions(+), 188 deletions(-) create mode 100644 qb/models/config.go create mode 100644 qb/runner/runner.go delete mode 100644 runner/runner.go diff --git a/qb/client.go b/qb/client.go index 6561ddf..1e133a1 100644 --- a/qb/client.go +++ b/qb/client.go @@ -5,6 +5,8 @@ import ( "github.com/Drafteame/cassandra-builder/qb/qcount" "github.com/Drafteame/cassandra-builder/qb/qdelete" + + models "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/qinsert" "github.com/Drafteame/cassandra-builder/qb/qselect" "github.com/Drafteame/cassandra-builder/qb/query" @@ -13,7 +15,7 @@ import ( type client struct { canRestart bool - config Config + config models.Config session *gocql.Session printQuery query.DebugPrint } @@ -56,7 +58,8 @@ func (c *client) Session() *gocql.Session { return c.session } -func (c *client) Config() Config { +// TODO: check if affects existent logic +func (c *client) Config() models.Config { return c.config } @@ -73,7 +76,7 @@ func (c *client) Restart() error { return nil } -func getSession(c Config) (*gocql.Session, error) { +func getSession(c models.Config) (*gocql.Session, error) { cluster := gocql.NewCluster(c.ContactPoints...) cluster.Keyspace = c.KeyspaceName cluster.Consistency = gocql.Consistency(c.Consistency) diff --git a/qb/errors/errors.go b/qb/errors/errors.go index 8321193..a1f33e7 100644 --- a/qb/errors/errors.go +++ b/qb/errors/errors.go @@ -8,4 +8,6 @@ var ( ErrNoStructOrSliceBinding = errors.New("cassandra-builder: bind value should be a struct or slice") ErrNoSliceOfStructsBinding = errors.New("cassandra-builder: bind value should be a slice of structs") ErrClosedConnection = errors.New("cassandra-builder: can execute on closed connection") + ErrNilIterator = errors.New("cassandra-builder: nil iterator is not allowed") + ErrParsing = errors.New("cassandra-builder: error parsing row") ) diff --git a/qb/models/config.go b/qb/models/config.go new file mode 100644 index 0000000..4046bc4 --- /dev/null +++ b/qb/models/config.go @@ -0,0 +1,39 @@ +package models + +import ( + "time" + + "github.com/Drafteame/cassandra-builder/qb/query" +) + +type Consistency uint16 + +const ( + Any Consistency = 0x00 + One Consistency = 0x01 + Two Consistency = 0x02 + Three Consistency = 0x03 + Quorum Consistency = 0x04 + All Consistency = 0x05 + LocalQuorum Consistency = 0x06 + EachQuorum Consistency = 0x07 + LocalOne Consistency = 0x0A +) + +// Config is the main cassandra configuration needed +type Config struct { + Port int + KeyspaceName string + Username string + Password string + ContactPoints []string + Debug bool + ProtoVersion int + Consistency Consistency + CaPath string + DisableInitialHostLookup bool + Timeout time.Duration + ConnectTimeout time.Duration + PrintQuery query.DebugPrint + NumRetries uint +} diff --git a/qb/qb.go b/qb/qb.go index 677375f..910f16e 100644 --- a/qb/qb.go +++ b/qb/qb.go @@ -2,10 +2,10 @@ package qb import ( "log" - "time" "github.com/gocql/gocql" + models "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/qcount" delete2 "github.com/Drafteame/cassandra-builder/qb/qdelete" "github.com/Drafteame/cassandra-builder/qb/qinsert" @@ -14,38 +14,6 @@ import ( "github.com/Drafteame/cassandra-builder/qb/qupdate" ) -type Consistency uint16 - -const ( - Any Consistency = 0x00 - One Consistency = 0x01 - Two Consistency = 0x02 - Three Consistency = 0x03 - Quorum Consistency = 0x04 - All Consistency = 0x05 - LocalQuorum Consistency = 0x06 - EachQuorum Consistency = 0x07 - LocalOne Consistency = 0x0A -) - -// Config is the main cassandra configuration needed -type Config struct { - Port int - KeyspaceName string - Username string - Password string - ContactPoints []string - Debug bool - ProtoVersion int - Consistency Consistency - CaPath string - DisableInitialHostLookup bool - Timeout time.Duration - ConnectTimeout time.Duration - PrintQuery query.DebugPrint - NumRetries uint -} - // Client is the main cassandra client abstraction to work with the database type Client interface { // Select start a select query @@ -76,7 +44,7 @@ type Client interface { Restart() error // Config return current client configuration - Config() Config + Config() models.Config // Close ends cassandra connection pool Close() @@ -95,7 +63,7 @@ func DefaultDebugPrint(q string, args []interface{}, err error) { } // NewClient creates a new cassandra client manager from config -func NewClient(conf Config) (Client, error) { +func NewClient(conf models.Config) (Client, error) { session, err := getSession(conf) if err != nil { return nil, err @@ -116,7 +84,7 @@ func NewClient(conf Config) (Client, error) { } // NewClientWithSession creates a new cassandra client manager from a given session. -func NewClientWithSession(session *gocql.Session, conf Config) (Client, error) { +func NewClientWithSession(session *gocql.Session, conf models.Config) (Client, error) { c := &client{ session: session, config: conf, diff --git a/qb/qcount/count.go b/qb/qcount/count.go index f8764d9..3c6be6a 100644 --- a/qb/qcount/count.go +++ b/qb/qcount/count.go @@ -3,6 +3,7 @@ package qcount import ( "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" ) @@ -10,6 +11,7 @@ import ( type Client interface { Session() *gocql.Session + Config() models.Config Debug() bool Restart() error PrintFn() query.DebugPrint diff --git a/qb/qcount/count_exec.go b/qb/qcount/count_exec.go index 5d56b17..be39566 100644 --- a/qb/qcount/count_exec.go +++ b/qb/qcount/count_exec.go @@ -3,22 +3,29 @@ package qcount import ( "strings" - "github.com/gocql/gocql" "github.com/scylladb/gocqlx/qb" + "github.com/Drafteame/cassandra-builder/qb/errors" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) // Exec release count query an return the number of rows and a possible error func (cq *Query) Exec() (int64, error) { - q := cq.build() + run := runner.New(cq.client) - var count int64 + q := cq.build() - if err := cq.client.Session().Query(q, cq.args...).Consistency(gocql.One).Scan(&count); err != nil { + out, err := run.QueryOne(q, cq.args) + if err != nil { return 0, err } + count, ok := out.(int64) + if !ok { + return 0, errors.ErrParsing + } + return count, nil } @@ -36,7 +43,7 @@ func (cq *Query) build() string { queryStr, _ := q.ToCql() if cq.client.Debug() { - cq.client.PrintFn()(queryStr, cq.args) + cq.client.PrintFn()(queryStr, cq.args, nil) } return strings.TrimSpace(queryStr) diff --git a/qb/qcount/count_exec_test.go b/qb/qcount/count_exec_test.go index f500dfc..19d3719 100644 --- a/qb/qcount/count_exec_test.go +++ b/qb/qcount/count_exec_test.go @@ -4,8 +4,6 @@ import ( "reflect" "testing" - "github.com/gocql/gocql" - "github.com/Drafteame/cassandra-builder/qb/query" ) @@ -76,7 +74,7 @@ func TestQuery_build(t *testing.T) { } for i, test := range tt { - q := New(&gocql.Session{}, false, nil).From(test.table).Column(test.column) + q := New(nil).From(test.table).Column(test.column) for _, w := range test.where { q = q.Where(w.Field, w.Op, w.Value) diff --git a/qb/qcount/count_test.go b/qb/qcount/count_test.go index 14adb3e..4fd07f6 100644 --- a/qb/qcount/count_test.go +++ b/qb/qcount/count_test.go @@ -4,20 +4,18 @@ import ( "reflect" "testing" - "github.com/gocql/gocql" - "github.com/Drafteame/cassandra-builder/qb/query" ) -func TestNew(t *testing.T) { - s := &gocql.Session{} - q := New(s, false, nil) +// func TestNew(t *testing.T) { +// q := new(Query) +// a := New(q.client) - if !reflect.DeepEqual(q.ctx.Session, s) { - t.Errorf("associated session is different") - return - } -} +// if !reflect.DeepEqual(a.client, "test") { +// t.Errorf("associated table is different") +// return +// } +// } func TestQuery_From(t *testing.T) { q := &Query{} diff --git a/qb/qdelete/delete.go b/qb/qdelete/delete.go index fbb2f6d..be97d03 100644 --- a/qb/qdelete/delete.go +++ b/qb/qdelete/delete.go @@ -3,13 +3,13 @@ package qdelete import ( "github.com/gocql/gocql" - "github.com/Drafteame/cassandra-builder/qb" + "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" ) type Client interface { Session() *gocql.Session - Config() qb.Config + Config() models.Config Restart() error Debug() bool PrintFn() query.DebugPrint diff --git a/qb/qdelete/delete_exec.go b/qb/qdelete/delete_exec.go index 7930808..b40c4eb 100644 --- a/qb/qdelete/delete_exec.go +++ b/qb/qdelete/delete_exec.go @@ -6,7 +6,7 @@ import ( "github.com/scylladb/gocqlx/qb" "github.com/Drafteame/cassandra-builder/qb/query" - "github.com/Drafteame/cassandra-builder/runner" + "github.com/Drafteame/cassandra-builder/qb/runner" ) // Exec execute delete query and return error on failure @@ -15,7 +15,11 @@ func (dq *Query) Exec() error { q := dq.build() - if err := dq.client.Session().Query(q, dq.args...).Exec(); err != nil { + if err := run.QueryNone(q, dq.args); err != nil { + if dq.client.Debug() { + dq.client.PrintFn()(q, dq.args, err) + } + return err } @@ -32,7 +36,7 @@ func (dq *Query) build() string { queryStr, _ := q.ToCql() if dq.client.Debug() { - dq.client.PrintFn()(queryStr, dq.args) + dq.client.PrintFn()(queryStr, dq.args, nil) } return strings.TrimSpace(queryStr) diff --git a/qb/qinsert/insert.go b/qb/qinsert/insert.go index 4bae7fa..0855ff8 100644 --- a/qb/qinsert/insert.go +++ b/qb/qinsert/insert.go @@ -3,11 +3,13 @@ package qinsert import ( "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" ) type Client interface { Session() *gocql.Session + Config() models.Config Debug() bool Restart() error PrintFn() query.DebugPrint diff --git a/qb/qinsert/insert_exec.go b/qb/qinsert/insert_exec.go index b050a80..eef47dc 100644 --- a/qb/qinsert/insert_exec.go +++ b/qb/qinsert/insert_exec.go @@ -3,44 +3,24 @@ package qinsert import ( "strings" - "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb/runner" "github.com/scylladb/gocqlx/qb" - - "github.com/Drafteame/cassandra-builder/qb/errors" ) // Exec execute insert query with args func (iq *Query) Exec() error { + run := runner.New(iq.client) q := iq.build() - execFn := func() error { - if r.client.Session() == nil || r.client.Session().Closed() { - return errors.ErrClosedConnection + if err := run.QueryNone(q, iq.args); err != nil { + if iq.client.Debug() { + iq.client.PrintFn()(q, iq.args, err) } - return r.client.Session().Query(q, iq.args...).Exec() - } - - opts := []retry.Option{ - retry.Attempts(r.client.Config().NumRetries), - retry.RetryIf(func(err error) bool { - switch err { - case gocql.ErrNoConnections, errors.ErrClosedConnection: - return true - default: - return false - } - }), - retry.OnRetry(func(n uint, err error) { - errRestart := r.client.Restart() - - if r.client.Debug() { - r.client.PrintFn()("", nil, errRestart) - } - }), + return err } - return retry.Do(execFn, opts...) + return nil } func (iq *Query) build() string { @@ -50,7 +30,7 @@ func (iq *Query) build() string { queryStr, _ := q.ToCql() if iq.client.Debug() { - iq.client.PrintFn()(queryStr, iq.args) + iq.client.PrintFn()(queryStr, iq.args, nil) } return strings.TrimSpace(queryStr) diff --git a/qb/qinsert/insert_exec_test.go b/qb/qinsert/insert_exec_test.go index 0b4bf0c..4d4d828 100644 --- a/qb/qinsert/insert_exec_test.go +++ b/qb/qinsert/insert_exec_test.go @@ -30,7 +30,7 @@ func TestInsertQuery_build(t *testing.T) { } for _, test := range tt { - q := New(nil, false, nil).Fields(test.fields...).Into(test.table).Values(test.values...) + q := New(nil).Fields(test.fields...).Into(test.table).Values(test.values...) query := q.build() if query != test.res { diff --git a/qb/qselect/select.go b/qb/qselect/select.go index 790cbc6..f123bbe 100644 --- a/qb/qselect/select.go +++ b/qb/qselect/select.go @@ -3,11 +3,13 @@ package qselect import ( "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" ) type Client interface { Session() *gocql.Session + Config() models.Config Debug() bool Restart() error PrintFn() query.DebugPrint diff --git a/qb/qselect/select_exec.go b/qb/qselect/select_exec.go index ffb870a..351e14a 100644 --- a/qb/qselect/select_exec.go +++ b/qb/qselect/select_exec.go @@ -3,10 +3,9 @@ package qselect import ( "reflect" - "github.com/gocql/gocql" - "github.com/Drafteame/cassandra-builder/qb/errors" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) // One return just one result on bind action @@ -21,12 +20,18 @@ func (q *Query) One() error { sq := q.build() - var jsonRow string + run := runner.New(q.client) - if err := q.client.Session().Query(sq, q.args...).Consistency(gocql.One).Scan(&jsonRow); err != nil { + out, err := run.QueryOne(sq, q.args) + if err != nil { return err } + jsonRow, ok := out.(string) + if !ok { + return errors.ErrParsing + } + ib := reflect.Indirect(reflect.ValueOf(q.bind)) bv := reflect.ValueOf(ib.Interface()) @@ -44,6 +49,7 @@ func (q *Query) One() error { // All return all rows on bind action. Bind should be a slice of structs func (q *Query) All() error { + run := runner.New(q.client) if q.bind == nil { return errors.ErrNilBinding } @@ -54,34 +60,5 @@ func (q *Query) All() error { sq := q.build() - var jsonRow string - - iter := q.client.Session().Query(sq, q.args...).Iter() - - ib := reflect.Indirect(reflect.ValueOf(q.bind)) - - bv := reflect.ValueOf(ib.Interface()) - bt := bv.Type().Elem() - - for iter.Scan(&jsonRow) { - elem, err := query.BindRow([]byte(jsonRow), bt) - if err != nil { - return err - } - - ib.Set(reflect.Append(ib, reflect.Indirect(elem))) - } - - return iter.Close() -} - -// None execute a qselect query without returning values -func (q *Query) None() error { - sq := q.build() - - if err := q.client.Session().Query(sq, q.args...).Exec(); err != nil { - return err - } - - return nil + return run.Query(sq, q.args, q.bind) } diff --git a/qb/qupdate/update.go b/qb/qupdate/update.go index 6bd4796..3ee9b6d 100644 --- a/qb/qupdate/update.go +++ b/qb/qupdate/update.go @@ -3,11 +3,13 @@ package qupdate import ( "github.com/gocql/gocql" + "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" ) type Client interface { Session() *gocql.Session + Config() models.Config Debug() bool Restart() error PrintFn() query.DebugPrint diff --git a/qb/qupdate/update_exec.go b/qb/qupdate/update_exec.go index c08e32c..1708abe 100644 --- a/qb/qupdate/update_exec.go +++ b/qb/qupdate/update_exec.go @@ -6,13 +6,15 @@ import ( "github.com/scylladb/gocqlx/qb" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) // Exec run update query from builder and return an error if exists func (uq *Query) Exec() error { + run := runner.New(uq.client) q := uq.build() - if err := uq.client.Session().Query(q, uq.args...).Exec(); err != nil { + if err := run.QueryNone(q, uq.args); err != nil { if uq.client.Debug() { uq.client.PrintFn()(q, uq.args, err) } diff --git a/qb/qupdate/update_exec_test.go b/qb/qupdate/update_exec_test.go index 10e38de..ebe5eee 100644 --- a/qb/qupdate/update_exec_test.go +++ b/qb/qupdate/update_exec_test.go @@ -74,7 +74,7 @@ func TestQuery_build(t *testing.T) { } for _, test := range tt { - q := New(nil, false, nil).Table(test.table) + q := New(nil).Table(test.table) for _, s := range test.set { q = q.Set(s.field, s.value) diff --git a/qb/runner/runner.go b/qb/runner/runner.go new file mode 100644 index 0000000..e6886a8 --- /dev/null +++ b/qb/runner/runner.go @@ -0,0 +1,126 @@ +package runner + +import ( + "reflect" + + "github.com/avast/retry-go/v4" + "github.com/gocql/gocql" + + "github.com/Drafteame/cassandra-builder/qb/errors" + "github.com/Drafteame/cassandra-builder/qb/models" + "github.com/Drafteame/cassandra-builder/qb/query" +) + +type Client interface { + Session() *gocql.Session + Config() models.Config + Restart() error + Debug() bool + PrintFn() query.DebugPrint +} + +type Runner struct { + client Client +} + +func (r *Runner) Query(stmt string, args []interface{}, bind interface{}) error { + execFn := func() error { + if r.client.Session() == nil || r.client.Session().Closed() { + return errors.ErrClosedConnection + } + + return r.queryAll(stmt, args, bind) + } + + opts := r.getRetryOptions() + + if err := retry.Do(execFn, opts...); err != nil { + return err + } + + return nil +} + +func (r *Runner) QueryOne(query string, args []interface{}) (interface{}, error) { + var out interface{} + + execFn := func() error { + if r.client.Session() == nil || r.client.Session().Closed() { + return errors.ErrClosedConnection + } + + return r.client.Session().Query(query, args...).Consistency(gocql.One).Scan(&out) + } + + opts := r.getRetryOptions() + + if err := retry.Do(execFn, opts...); err != nil { + return "", err + } + + return out, nil +} + +func (r *Runner) QueryNone(query string, args []interface{}) error { + execFn := func() error { + if r.client.Session() == nil || r.client.Session().Closed() { + return errors.ErrClosedConnection + } + + return r.client.Session().Query(query, args...).Exec() + } + + opts := r.getRetryOptions() + + return retry.Do(execFn, opts...) +} + +func New(c Client) *Runner { + return &Runner{client: c} +} + +func (r *Runner) getRetryOptions() []retry.Option { + return []retry.Option{ + retry.Attempts(r.client.Config().NumRetries), + retry.RetryIf(func(err error) bool { + switch err { + case gocql.ErrNoConnections, errors.ErrClosedConnection: + return true + default: + return false + } + }), + retry.OnRetry(func(n uint, err error) { + errRestart := r.client.Restart() + + if r.client.Debug() { + r.client.PrintFn()("", nil, errRestart) + } + }), + } +} + +func (r *Runner) queryAll(stmt string, args []interface{}, bind interface{}) error { + var jsonRow string + + iter := r.client.Session().Query(stmt, args...).Iter() + if iter == nil { + return errors.ErrNilIterator + } + + ib := reflect.Indirect(reflect.ValueOf(bind)) + + bv := reflect.ValueOf(ib.Interface()) + bt := bv.Type().Elem() + + for iter.Scan(&jsonRow) { + elem, err := query.BindRow([]byte(jsonRow), bt) + if err != nil { + return err + } + + ib.Set(reflect.Append(ib, reflect.Indirect(elem))) + } + + return iter.Close() +} diff --git a/runner/runner.go b/runner/runner.go deleted file mode 100644 index 70c0d52..0000000 --- a/runner/runner.go +++ /dev/null @@ -1,61 +0,0 @@ -package runner - -import ( - "github.com/avast/retry-go/v4" - "github.com/gocql/gocql" - - "github.com/Drafteame/cassandra-builder/qb" - "github.com/Drafteame/cassandra-builder/qb/errors" - "github.com/Drafteame/cassandra-builder/qb/query" -) - -type Client interface { - Session() *gocql.Session - Config() qb.Config - Restart() error - Debug() bool - PrintFn() query.DebugPrint -} - -type Runner struct { - client Client -} - -func (r *Runner) Query() {} - -func (r *Runner) QueryOne() {} - -func (r *Runner) QueryNone(query string, args []interface{}) error { - execFn := func() error { - if r.client.Session() == nil || r.client.Session().Closed() { - return errors.ErrClosedConnection - } - - return r.client.Session().Query(query, args...).Exec() - } - - opts := []retry.Option{ - retry.Attempts(r.client.Config().NumRetries), - retry.RetryIf(func(err error) bool { - switch err { - case gocql.ErrNoConnections, errors.ErrClosedConnection: - return true - default: - return false - } - }), - retry.OnRetry(func(n uint, err error) { - errRestart := r.client.Restart() - - if r.client.Debug() { - r.client.PrintFn()("", nil, errRestart) - } - }), - } - - return retry.Do(execFn, opts...) -} - -func New(c Client) *Runner { - return &Runner{client: c} -} From 466915a3f2c02e4a3c69935648c34a64d22adf85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Haskel?= Date: Tue, 21 Feb 2023 15:56:12 -0300 Subject: [PATCH 3/4] feat: tests fixes --- go.mod | 5 ++ go.sum | 2 + qb/client.go | 1 - qb/qcount/count.go | 18 +----- qb/qcount/count_exec_test.go | 7 ++- qb/qcount/count_test.go | 16 ++--- qb/qinsert/insert_exec.go | 4 +- qb/qinsert/insert_exec_test.go | 10 +++- qb/qselect/build.go | 2 +- qb/qupdate/update_exec.go | 4 +- qb/qupdate/update_exec_test.go | 8 ++- qb/runner/mocks/client.go | 106 +++++++++++++++++++++++++++++++++ qb/runner/runner.go | 2 + 13 files changed, 151 insertions(+), 34 deletions(-) create mode 100644 qb/runner/mocks/client.go diff --git a/go.mod b/go.mod index a31e1be..6c6fc66 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,15 @@ require ( github.com/gocql/gocql v1.3.1 github.com/magefile/mage v1.14.0 github.com/scylladb/gocqlx v1.5.0 + github.com/stretchr/testify v1.8.1 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f1225be..67cbeed 100644 --- a/go.sum +++ b/go.sum @@ -33,12 +33,14 @@ github.com/scylladb/gocqlx v1.5.0 h1:p7NEqRaCMAtW2nvq62iyUNXmIYP29373YOC7D2Xd7Qg github.com/scylladb/gocqlx v1.5.0/go.mod h1:QarZcw5kpYh31MXfxiN2JWWvF1cgZbYqfTfXwmwhpEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= diff --git a/qb/client.go b/qb/client.go index 1e133a1..b2be26e 100644 --- a/qb/client.go +++ b/qb/client.go @@ -58,7 +58,6 @@ func (c *client) Session() *gocql.Session { return c.session } -// TODO: check if affects existent logic func (c *client) Config() models.Config { return c.config } diff --git a/qb/qcount/count.go b/qb/qcount/count.go index 3c6be6a..b1fb90a 100644 --- a/qb/qcount/count.go +++ b/qb/qcount/count.go @@ -1,25 +1,13 @@ package qcount import ( - "github.com/gocql/gocql" - - "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) -//go:generate mockery --name=Client --filename=client.go --structname=Client --output=mocks --outpkg=mocks - -type Client interface { - Session() *gocql.Session - Config() models.Config - Debug() bool - Restart() error - PrintFn() query.DebugPrint -} - // Query create new select count query type Query struct { - client Client + client runner.Client table string column string where []query.WhereStm @@ -28,7 +16,7 @@ type Query struct { } // New create a new count query instance by passing a cassandra session -func New(c Client) *Query { +func New(c runner.Client) *Query { return &Query{client: c} } diff --git a/qb/qcount/count_exec_test.go b/qb/qcount/count_exec_test.go index 19d3719..1550bcb 100644 --- a/qb/qcount/count_exec_test.go +++ b/qb/qcount/count_exec_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/Drafteame/cassandra-builder/qb/query" + mocks "github.com/Drafteame/cassandra-builder/qb/runner/mocks" ) func TestQuery_build(t *testing.T) { @@ -74,7 +75,11 @@ func TestQuery_build(t *testing.T) { } for i, test := range tt { - q := New(nil).From(test.table).Column(test.column) + client := mocks.NewClient(t) + + client.On("Debug").Return(false) + + q := New(client).From(test.table).Column(test.column) for _, w := range test.where { q = q.Where(w.Field, w.Op, w.Value) diff --git a/qb/qcount/count_test.go b/qb/qcount/count_test.go index 4fd07f6..0f35bc2 100644 --- a/qb/qcount/count_test.go +++ b/qb/qcount/count_test.go @@ -5,17 +5,17 @@ import ( "testing" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner/mocks" + "github.com/stretchr/testify/assert" ) -// func TestNew(t *testing.T) { -// q := new(Query) -// a := New(q.client) +func TestNew(t *testing.T) { + client := mocks.NewClient(t) + queue := New(client) -// if !reflect.DeepEqual(a.client, "test") { -// t.Errorf("associated table is different") -// return -// } -// } + assert.Equal(t, client, queue.client) + +} func TestQuery_From(t *testing.T) { q := &Query{} diff --git a/qb/qinsert/insert_exec.go b/qb/qinsert/insert_exec.go index eef47dc..14f0c29 100644 --- a/qb/qinsert/insert_exec.go +++ b/qb/qinsert/insert_exec.go @@ -10,7 +10,7 @@ import ( // Exec execute insert query with args func (iq *Query) Exec() error { run := runner.New(iq.client) - q := iq.build() + q := iq.Build() if err := run.QueryNone(q, iq.args); err != nil { if iq.client.Debug() { @@ -23,7 +23,7 @@ func (iq *Query) Exec() error { return nil } -func (iq *Query) build() string { +func (iq *Query) Build() string { q := qb.Insert(iq.table) q.Columns(iq.fields...) diff --git a/qb/qinsert/insert_exec_test.go b/qb/qinsert/insert_exec_test.go index 4d4d828..dd50b9e 100644 --- a/qb/qinsert/insert_exec_test.go +++ b/qb/qinsert/insert_exec_test.go @@ -3,6 +3,8 @@ package qinsert import ( "reflect" "testing" + + "github.com/Drafteame/cassandra-builder/qb/runner/mocks" ) func TestInsertQuery_build(t *testing.T) { @@ -30,8 +32,12 @@ func TestInsertQuery_build(t *testing.T) { } for _, test := range tt { - q := New(nil).Fields(test.fields...).Into(test.table).Values(test.values...) - query := q.build() + client := mocks.NewClient(t) + + client.On("Debug").Return(false) + + q := New(client).Fields(test.fields...).Into(test.table).Values(test.values...) + query := q.Build() if query != test.res { t.Errorf("query err: \nexp: '%v' \ngot: '%v'", test.res, query) diff --git a/qb/qselect/build.go b/qb/qselect/build.go index 3d5feab..97ddfe7 100644 --- a/qb/qselect/build.go +++ b/qb/qselect/build.go @@ -49,7 +49,7 @@ func (q *Query) build() string { queryStr, _ := sb.Json().ToCql() if q.client.Debug() { - q.client.PrintFn()(queryStr, q.args) + q.client.PrintFn()(queryStr, q.args, nil) } return strings.TrimSpace(queryStr) diff --git a/qb/qupdate/update_exec.go b/qb/qupdate/update_exec.go index 1708abe..507a045 100644 --- a/qb/qupdate/update_exec.go +++ b/qb/qupdate/update_exec.go @@ -12,7 +12,7 @@ import ( // Exec run update query from builder and return an error if exists func (uq *Query) Exec() error { run := runner.New(uq.client) - q := uq.build() + q := uq.Build() if err := run.QueryNone(q, uq.args); err != nil { if uq.client.Debug() { @@ -25,7 +25,7 @@ func (uq *Query) Exec() error { return nil } -func (uq *Query) build() string { +func (uq *Query) Build() string { q := qb.Update(uq.table) if len(uq.fields) > 0 { diff --git a/qb/qupdate/update_exec_test.go b/qb/qupdate/update_exec_test.go index ebe5eee..1c311f1 100644 --- a/qb/qupdate/update_exec_test.go +++ b/qb/qupdate/update_exec_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner/mocks" ) func TestQuery_build(t *testing.T) { @@ -74,7 +75,10 @@ func TestQuery_build(t *testing.T) { } for _, test := range tt { - q := New(nil).Table(test.table) + client := mocks.NewClient(t) + + client.On("Debug").Return(false) + q := New(client).Table(test.table) for _, s := range test.set { q = q.Set(s.field, s.value) @@ -84,7 +88,7 @@ func TestQuery_build(t *testing.T) { q = q.Where(w.Field, w.Op, w.Value) } - qs := q.build() + qs := q.Build() if qs != test.res { t.Errorf("query err: \nexp: '%s' \ngot: '%s'", test.res, qs) diff --git a/qb/runner/mocks/client.go b/qb/runner/mocks/client.go new file mode 100644 index 0000000..02e60c3 --- /dev/null +++ b/qb/runner/mocks/client.go @@ -0,0 +1,106 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package mocks + +import ( + gocql "github.com/gocql/gocql" + mock "github.com/stretchr/testify/mock" + + models "github.com/Drafteame/cassandra-builder/qb/models" + + query "github.com/Drafteame/cassandra-builder/qb/query" +) + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// Config provides a mock function with given fields: +func (_m *Client) Config() models.Config { + ret := _m.Called() + + var r0 models.Config + if rf, ok := ret.Get(0).(func() models.Config); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(models.Config) + } + + return r0 +} + +// Debug provides a mock function with given fields: +func (_m *Client) Debug() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// PrintFn provides a mock function with given fields: +func (_m *Client) PrintFn() query.DebugPrint { + ret := _m.Called() + + var r0 query.DebugPrint + if rf, ok := ret.Get(0).(func() query.DebugPrint); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(query.DebugPrint) + } + } + + return r0 +} + +// Restart provides a mock function with given fields: +func (_m *Client) Restart() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Session provides a mock function with given fields: +func (_m *Client) Session() *gocql.Session { + ret := _m.Called() + + var r0 *gocql.Session + if rf, ok := ret.Get(0).(func() *gocql.Session); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*gocql.Session) + } + } + + return r0 +} + +type mockConstructorTestingTNewClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewClient(t mockConstructorTestingTNewClient) *Client { + mock := &Client{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/qb/runner/runner.go b/qb/runner/runner.go index e6886a8..4fe0884 100644 --- a/qb/runner/runner.go +++ b/qb/runner/runner.go @@ -11,6 +11,8 @@ import ( "github.com/Drafteame/cassandra-builder/qb/query" ) +//go:generate mockery --name=Client --filename=client.go --structname=Client --output=mocks --outpkg=mocks + type Client interface { Session() *gocql.Session Config() models.Config From f572221535292b2579c68d62b2fdb4742837f5c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Haskel?= Date: Wed, 22 Feb 2023 17:21:08 -0300 Subject: [PATCH 4/4] feat: remove debug print and unused ctx --- qb/client.go | 33 ++++++++--- qb/models/config.go | 3 - qb/qb.go | 55 ------------------ qb/qcount/count_exec.go | 17 +----- qb/qcount/count_exec_test.go | 4 +- qb/qcount/count_test.go | 17 +++--- qb/qdelete/delete.go | 16 +----- qb/qdelete/delete_exec.go | 8 --- qb/qinsert/insert.go | 16 +----- qb/qinsert/insert_exec.go | 12 +--- qb/qinsert/insert_exec_test.go | 6 +- qb/qselect/build.go | 4 -- qb/qselect/select.go | 16 +----- qb/qselect/select_exec.go | 7 +-- qb/query/query.go | 9 --- qb/qupdate/update.go | 16 +----- qb/qupdate/update_exec.go | 8 --- qb/qupdate/update_exec_test.go | 3 +- qb/runner/runner.go | 86 ++++++++++++++++++++++------- qb/{runner => test}/mocks/client.go | 18 ------ 20 files changed, 119 insertions(+), 235 deletions(-) rename qb/{runner => test}/mocks/client.go (82%) diff --git a/qb/client.go b/qb/client.go index b2be26e..a96f015 100644 --- a/qb/client.go +++ b/qb/client.go @@ -9,7 +9,6 @@ import ( models "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/qinsert" "github.com/Drafteame/cassandra-builder/qb/qselect" - "github.com/Drafteame/cassandra-builder/qb/query" "github.com/Drafteame/cassandra-builder/qb/qupdate" ) @@ -17,7 +16,6 @@ type client struct { canRestart bool config models.Config session *gocql.Session - printQuery query.DebugPrint } var _ Client = &client{} @@ -46,10 +44,6 @@ func (c *client) Debug() bool { return c.config.Debug } -func (c *client) PrintFn() query.DebugPrint { - return c.printQuery -} - func (c *client) Close() { c.session.Close() } @@ -65,7 +59,7 @@ func (c *client) Config() models.Config { func (c *client) Restart() error { c.Close() - session, err := getSession(c.config) + session, err := createSession(c.config) if err != nil { return err } @@ -75,7 +69,7 @@ func (c *client) Restart() error { return nil } -func getSession(c models.Config) (*gocql.Session, error) { +func createSession(c models.Config) (*gocql.Session, error) { cluster := gocql.NewCluster(c.ContactPoints...) cluster.Keyspace = c.KeyspaceName cluster.Consistency = gocql.Consistency(c.Consistency) @@ -112,3 +106,26 @@ func getSession(c models.Config) (*gocql.Session, error) { return cluster.CreateSession() } + +// NewClient creates a new cassandra client manager from config +func NewClient(conf models.Config) (Client, error) { + session, err := createSession(conf) + if err != nil { + return nil, err + } + + return &client{ + session: session, + config: conf, + canRestart: true, + }, nil +} + +// NewClientWithSession creates a new cassandra client manager from a given session. +func NewClientWithSession(session *gocql.Session, conf models.Config) Client { + return &client{ + session: session, + config: conf, + canRestart: false, + } +} diff --git a/qb/models/config.go b/qb/models/config.go index 4046bc4..1bdf82c 100644 --- a/qb/models/config.go +++ b/qb/models/config.go @@ -2,8 +2,6 @@ package models import ( "time" - - "github.com/Drafteame/cassandra-builder/qb/query" ) type Consistency uint16 @@ -34,6 +32,5 @@ type Config struct { DisableInitialHostLookup bool Timeout time.Duration ConnectTimeout time.Duration - PrintQuery query.DebugPrint NumRetries uint } diff --git a/qb/qb.go b/qb/qb.go index 910f16e..87c2d64 100644 --- a/qb/qb.go +++ b/qb/qb.go @@ -1,8 +1,6 @@ package qb import ( - "log" - "github.com/gocql/gocql" models "github.com/Drafteame/cassandra-builder/qb/models" @@ -10,7 +8,6 @@ import ( delete2 "github.com/Drafteame/cassandra-builder/qb/qdelete" "github.com/Drafteame/cassandra-builder/qb/qinsert" _select "github.com/Drafteame/cassandra-builder/qb/qselect" - "github.com/Drafteame/cassandra-builder/qb/query" "github.com/Drafteame/cassandra-builder/qb/qupdate" ) @@ -37,9 +34,6 @@ type Client interface { // Debug return an assertion for debugging Debug() bool - // PrintFn return the configured debug print function. - PrintFn() query.DebugPrint - // Restart should close and start a new connection. Restart() error @@ -49,52 +43,3 @@ type Client interface { // Close ends cassandra connection pool Close() } - -// DefaultDebugPrint defines a default function that prints resultant query and arguments before being executed -// and when the Debug flag is true -func DefaultDebugPrint(q string, args []interface{}, err error) { - if q != "" { - log.Printf("query: %v \nargs: %v\n", q, args) - } - - if err != nil { - log.Println("err: ", err.Error()) - } -} - -// NewClient creates a new cassandra client manager from config -func NewClient(conf models.Config) (Client, error) { - session, err := getSession(conf) - if err != nil { - return nil, err - } - - c := &client{ - session: session, - config: conf, - canRestart: true, - printQuery: DefaultDebugPrint, - } - - if conf.PrintQuery != nil { - c.printQuery = conf.PrintQuery - } - - return c, nil -} - -// NewClientWithSession creates a new cassandra client manager from a given session. -func NewClientWithSession(session *gocql.Session, conf models.Config) (Client, error) { - c := &client{ - session: session, - config: conf, - canRestart: false, - printQuery: DefaultDebugPrint, - } - - if conf.PrintQuery != nil { - c.printQuery = conf.PrintQuery - } - - return c, nil -} diff --git a/qb/qcount/count_exec.go b/qb/qcount/count_exec.go index be39566..c9827e1 100644 --- a/qb/qcount/count_exec.go +++ b/qb/qcount/count_exec.go @@ -5,7 +5,6 @@ import ( "github.com/scylladb/gocqlx/qb" - "github.com/Drafteame/cassandra-builder/qb/errors" "github.com/Drafteame/cassandra-builder/qb/query" "github.com/Drafteame/cassandra-builder/qb/runner" ) @@ -16,17 +15,7 @@ func (cq *Query) Exec() (int64, error) { q := cq.build() - out, err := run.QueryOne(q, cq.args) - if err != nil { - return 0, err - } - - count, ok := out.(int64) - if !ok { - return 0, errors.ErrParsing - } - - return count, nil + return run.QueryCount(q, cq.args) } func (cq *Query) build() string { @@ -42,9 +31,5 @@ func (cq *Query) build() string { queryStr, _ := q.ToCql() - if cq.client.Debug() { - cq.client.PrintFn()(queryStr, cq.args, nil) - } - return strings.TrimSpace(queryStr) } diff --git a/qb/qcount/count_exec_test.go b/qb/qcount/count_exec_test.go index 1550bcb..87a4e3f 100644 --- a/qb/qcount/count_exec_test.go +++ b/qb/qcount/count_exec_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/Drafteame/cassandra-builder/qb/query" - mocks "github.com/Drafteame/cassandra-builder/qb/runner/mocks" + "github.com/Drafteame/cassandra-builder/qb/test/mocks" ) func TestQuery_build(t *testing.T) { @@ -77,8 +77,6 @@ func TestQuery_build(t *testing.T) { for i, test := range tt { client := mocks.NewClient(t) - client.On("Debug").Return(false) - q := New(client).From(test.table).Column(test.column) for _, w := range test.where { diff --git a/qb/qcount/count_test.go b/qb/qcount/count_test.go index 0f35bc2..72e89d4 100644 --- a/qb/qcount/count_test.go +++ b/qb/qcount/count_test.go @@ -5,7 +5,8 @@ import ( "testing" "github.com/Drafteame/cassandra-builder/qb/query" - "github.com/Drafteame/cassandra-builder/qb/runner/mocks" + "github.com/Drafteame/cassandra-builder/qb/test/mocks" + "github.com/stretchr/testify/assert" ) @@ -22,14 +23,12 @@ func TestQuery_From(t *testing.T) { q.From("test") if q.table != "test" { - t.Errorf("exp: test got: %v", q.table) - return + t.Fatalf("exp: test got: %v", q.table) } q.From("test2") if q.table != "test2" { - t.Errorf("exp: test2 got: %v", q.table) - return + t.Fatalf("exp: test2 got: %v", q.table) } } @@ -38,12 +37,12 @@ func TestQuery_Column(t *testing.T) { q.Column("field") if q.column != "field" { - t.Errorf("exp: field got: %v", q.column) + t.Fatalf("exp: field got: %v", q.column) } q.Column("field2") if q.column != "field2" { - t.Errorf("exp: field2 got: %v", q.column) + t.Fatalf("exp: field2 got: %v", q.column) } } @@ -102,11 +101,11 @@ func TestQuery_Where(t *testing.T) { q.Where(test.field, test.op, test.value) if !reflect.DeepEqual(q.args, test.expArgs) { - t.Errorf("exp: %v got: %v", test.expArgs, q.args) + t.Fatalf("exp: %v got: %v", test.expArgs, q.args) } if !reflect.DeepEqual(q.where, test.expStm) { - t.Errorf("exp: %v got: %v", test.expStm, q.where) + t.Fatalf("exp: %v got: %v", test.expStm, q.where) } } } diff --git a/qb/qdelete/delete.go b/qb/qdelete/delete.go index be97d03..b0b0c0a 100644 --- a/qb/qdelete/delete.go +++ b/qb/qdelete/delete.go @@ -1,30 +1,20 @@ package qdelete import ( - "github.com/gocql/gocql" - - "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) -type Client interface { - Session() *gocql.Session - Config() models.Config - Restart() error - Debug() bool - PrintFn() query.DebugPrint -} - // Query represents a Cassandra delete query. Execution should not bind any value type Query struct { - client Client + client runner.Client table string where []query.WhereStm args []interface{} } // New create a new delete query instance by passing a cassandra session -func New(c Client) *Query { +func New(c runner.Client) *Query { return &Query{client: c} } diff --git a/qb/qdelete/delete_exec.go b/qb/qdelete/delete_exec.go index b40c4eb..87c783e 100644 --- a/qb/qdelete/delete_exec.go +++ b/qb/qdelete/delete_exec.go @@ -16,10 +16,6 @@ func (dq *Query) Exec() error { q := dq.build() if err := run.QueryNone(q, dq.args); err != nil { - if dq.client.Debug() { - dq.client.PrintFn()(q, dq.args, err) - } - return err } @@ -35,9 +31,5 @@ func (dq *Query) build() string { queryStr, _ := q.ToCql() - if dq.client.Debug() { - dq.client.PrintFn()(queryStr, dq.args, nil) - } - return strings.TrimSpace(queryStr) } diff --git a/qb/qinsert/insert.go b/qb/qinsert/insert.go index 0855ff8..5f8d632 100644 --- a/qb/qinsert/insert.go +++ b/qb/qinsert/insert.go @@ -1,30 +1,20 @@ package qinsert import ( - "github.com/gocql/gocql" - - "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) -type Client interface { - Session() *gocql.Session - Config() models.Config - Debug() bool - Restart() error - PrintFn() query.DebugPrint -} - // Query represent a Cassandra insert query. Execution should not bind any value type Query struct { - client Client + client runner.Client table string fields query.Columns args []interface{} } // New creates a new insert query by passing a cassandra session and debug options -func New(c Client) *Query { +func New(c runner.Client) *Query { return &Query{client: c} } diff --git a/qb/qinsert/insert_exec.go b/qb/qinsert/insert_exec.go index 14f0c29..d94bc4f 100644 --- a/qb/qinsert/insert_exec.go +++ b/qb/qinsert/insert_exec.go @@ -10,28 +10,20 @@ import ( // Exec execute insert query with args func (iq *Query) Exec() error { run := runner.New(iq.client) - q := iq.Build() + q := iq.build() if err := run.QueryNone(q, iq.args); err != nil { - if iq.client.Debug() { - iq.client.PrintFn()(q, iq.args, err) - } - return err } return nil } -func (iq *Query) Build() string { +func (iq *Query) build() string { q := qb.Insert(iq.table) q.Columns(iq.fields...) queryStr, _ := q.ToCql() - if iq.client.Debug() { - iq.client.PrintFn()(queryStr, iq.args, nil) - } - return strings.TrimSpace(queryStr) } diff --git a/qb/qinsert/insert_exec_test.go b/qb/qinsert/insert_exec_test.go index dd50b9e..0926354 100644 --- a/qb/qinsert/insert_exec_test.go +++ b/qb/qinsert/insert_exec_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - "github.com/Drafteame/cassandra-builder/qb/runner/mocks" + "github.com/Drafteame/cassandra-builder/qb/test/mocks" ) func TestInsertQuery_build(t *testing.T) { @@ -34,10 +34,8 @@ func TestInsertQuery_build(t *testing.T) { for _, test := range tt { client := mocks.NewClient(t) - client.On("Debug").Return(false) - q := New(client).Fields(test.fields...).Into(test.table).Values(test.values...) - query := q.Build() + query := q.build() if query != test.res { t.Errorf("query err: \nexp: '%v' \ngot: '%v'", test.res, query) diff --git a/qb/qselect/build.go b/qb/qselect/build.go index 97ddfe7..6cdf381 100644 --- a/qb/qselect/build.go +++ b/qb/qselect/build.go @@ -48,9 +48,5 @@ func (q *Query) build() string { queryStr, _ := sb.Json().ToCql() - if q.client.Debug() { - q.client.PrintFn()(queryStr, q.args, nil) - } - return strings.TrimSpace(queryStr) } diff --git a/qb/qselect/select.go b/qb/qselect/select.go index f123bbe..53610f6 100644 --- a/qb/qselect/select.go +++ b/qb/qselect/select.go @@ -1,23 +1,13 @@ package qselect import ( - "github.com/gocql/gocql" - - "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) -type Client interface { - Session() *gocql.Session - Config() models.Config - Debug() bool - Restart() error - PrintFn() query.DebugPrint -} - // Query represents a cassandra select statement and his options type Query struct { - client Client + client runner.Client fields query.Columns args []interface{} table string @@ -31,7 +21,7 @@ type Query struct { } // New create a new select query by passing a cassandra session and debug options -func New(c Client) *Query { +func New(c runner.Client) *Query { return &Query{client: c} } diff --git a/qb/qselect/select_exec.go b/qb/qselect/select_exec.go index 351e14a..e986c4f 100644 --- a/qb/qselect/select_exec.go +++ b/qb/qselect/select_exec.go @@ -22,16 +22,11 @@ func (q *Query) One() error { run := runner.New(q.client) - out, err := run.QueryOne(sq, q.args) + jsonRow, err := run.QueryOne(sq, q.args) if err != nil { return err } - jsonRow, ok := out.(string) - if !ok { - return errors.ErrParsing - } - ib := reflect.Indirect(reflect.ValueOf(q.bind)) bv := reflect.ValueOf(ib.Interface()) diff --git a/qb/query/query.go b/qb/query/query.go index 1b34353..f843ee9 100644 --- a/qb/query/query.go +++ b/qb/query/query.go @@ -6,8 +6,6 @@ import ( "reflect" "time" - "github.com/gocql/gocql" - "github.com/Drafteame/cassandra-builder/qb/errors" ) @@ -20,13 +18,6 @@ type ( // DebugPrint defines a callback that prints query values DebugPrint func(q string, args []interface{}, err error) - - // Query Base definition of query - Query struct { - Session *gocql.Session - Debug bool - PrintQuery DebugPrint - } ) const ( diff --git a/qb/qupdate/update.go b/qb/qupdate/update.go index 3ee9b6d..9fbcb4a 100644 --- a/qb/qupdate/update.go +++ b/qb/qupdate/update.go @@ -1,23 +1,13 @@ package qupdate import ( - "github.com/gocql/gocql" - - "github.com/Drafteame/cassandra-builder/qb/models" "github.com/Drafteame/cassandra-builder/qb/query" + "github.com/Drafteame/cassandra-builder/qb/runner" ) -type Client interface { - Session() *gocql.Session - Config() models.Config - Debug() bool - Restart() error - PrintFn() query.DebugPrint -} - // Query represent a Cassandra update query. Execution should not bind any value type Query struct { - client Client + client runner.Client table string fields query.Columns args []interface{} @@ -25,7 +15,7 @@ type Query struct { } // New create a new update query by passing a cassandra session and the affected table -func New(c Client) *Query { +func New(c runner.Client) *Query { return &Query{client: c} } diff --git a/qb/qupdate/update_exec.go b/qb/qupdate/update_exec.go index 507a045..0005567 100644 --- a/qb/qupdate/update_exec.go +++ b/qb/qupdate/update_exec.go @@ -15,10 +15,6 @@ func (uq *Query) Exec() error { q := uq.Build() if err := run.QueryNone(q, uq.args); err != nil { - if uq.client.Debug() { - uq.client.PrintFn()(q, uq.args, err) - } - return err } @@ -40,9 +36,5 @@ func (uq *Query) Build() string { queryStr, _ := q.ToCql() - if uq.client.Debug() { - uq.client.PrintFn()(queryStr, uq.args, nil) - } - return strings.TrimSpace(queryStr) } diff --git a/qb/qupdate/update_exec_test.go b/qb/qupdate/update_exec_test.go index 1c311f1..0276225 100644 --- a/qb/qupdate/update_exec_test.go +++ b/qb/qupdate/update_exec_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/Drafteame/cassandra-builder/qb/query" - "github.com/Drafteame/cassandra-builder/qb/runner/mocks" + "github.com/Drafteame/cassandra-builder/qb/test/mocks" ) func TestQuery_build(t *testing.T) { @@ -77,7 +77,6 @@ func TestQuery_build(t *testing.T) { for _, test := range tt { client := mocks.NewClient(t) - client.On("Debug").Return(false) q := New(client).Table(test.table) for _, s := range test.set { diff --git a/qb/runner/runner.go b/qb/runner/runner.go index 4fe0884..992119f 100644 --- a/qb/runner/runner.go +++ b/qb/runner/runner.go @@ -11,14 +11,13 @@ import ( "github.com/Drafteame/cassandra-builder/qb/query" ) -//go:generate mockery --name=Client --filename=client.go --structname=Client --output=mocks --outpkg=mocks +//go:generate mockery --name=Client --filename=client.go --structname=Client --output=../test/mocks --outpkg=mocks type Client interface { Session() *gocql.Session Config() models.Config Restart() error Debug() bool - PrintFn() query.DebugPrint } type Runner struct { @@ -43,15 +42,55 @@ func (r *Runner) Query(stmt string, args []interface{}, bind interface{}) error return nil } -func (r *Runner) QueryOne(query string, args []interface{}) (interface{}, error) { - var out interface{} +func (r *Runner) QueryCount(query string, args []interface{}) (int64, error) { + var count int64 execFn := func() error { if r.client.Session() == nil || r.client.Session().Closed() { return errors.ErrClosedConnection } - return r.client.Session().Query(query, args...).Consistency(gocql.One).Scan(&out) + consistency := r.client.Config().Consistency + + if err := r.client.Session().Query(query, args...).Consistency(gocql.Consistency(consistency)).Scan(&count); err != nil { + if err == gocql.ErrNoConnections { + return err + } + + return retry.Unrecoverable(err) + } + + return nil + } + + opts := r.getRetryOptions() + + if err := retry.Do(execFn, opts...); err != nil { + return 0, err + } + + return count, nil +} + +func (r *Runner) QueryOne(query string, args []interface{}) (string, error) { + var jsonRow string + + execFn := func() error { + if r.client.Session() == nil || r.client.Session().Closed() { + return errors.ErrClosedConnection + } + + consistency := r.client.Config().Consistency + + if err := r.client.Session().Query(query, args...).Consistency(gocql.Consistency(consistency)).Scan(&jsonRow); err != nil { + if err == gocql.ErrNoConnections { + return err + } + + return retry.Unrecoverable(err) + } + + return nil } opts := r.getRetryOptions() @@ -60,7 +99,7 @@ func (r *Runner) QueryOne(query string, args []interface{}) (interface{}, error) return "", err } - return out, nil + return jsonRow, nil } func (r *Runner) QueryNone(query string, args []interface{}) error { @@ -69,7 +108,15 @@ func (r *Runner) QueryNone(query string, args []interface{}) error { return errors.ErrClosedConnection } - return r.client.Session().Query(query, args...).Exec() + if err := r.client.Session().Query(query, args...).Exec(); err != nil { + if err == gocql.ErrNoConnections { + return err + } + + return retry.Unrecoverable(err) + } + + return nil } opts := r.getRetryOptions() @@ -84,20 +131,11 @@ func New(c Client) *Runner { func (r *Runner) getRetryOptions() []retry.Option { return []retry.Option{ retry.Attempts(r.client.Config().NumRetries), - retry.RetryIf(func(err error) bool { - switch err { - case gocql.ErrNoConnections, errors.ErrClosedConnection: - return true - default: - return false - } - }), retry.OnRetry(func(n uint, err error) { - errRestart := r.client.Restart() - if r.client.Debug() { - r.client.PrintFn()("", nil, errRestart) - } + _ = r.client.Restart() + + //TODO: handle error }), } } @@ -124,5 +162,13 @@ func (r *Runner) queryAll(stmt string, args []interface{}, bind interface{}) err ib.Set(reflect.Append(ib, reflect.Indirect(elem))) } - return iter.Close() + if err := iter.Close(); err != nil { + if err == gocql.ErrNoConnections { + return err + } + + return retry.Unrecoverable(err) + } + + return nil } diff --git a/qb/runner/mocks/client.go b/qb/test/mocks/client.go similarity index 82% rename from qb/runner/mocks/client.go rename to qb/test/mocks/client.go index 02e60c3..75576ba 100644 --- a/qb/runner/mocks/client.go +++ b/qb/test/mocks/client.go @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" models "github.com/Drafteame/cassandra-builder/qb/models" - - query "github.com/Drafteame/cassandra-builder/qb/query" ) // Client is an autogenerated mock type for the Client type @@ -44,22 +42,6 @@ func (_m *Client) Debug() bool { return r0 } -// PrintFn provides a mock function with given fields: -func (_m *Client) PrintFn() query.DebugPrint { - ret := _m.Called() - - var r0 query.DebugPrint - if rf, ok := ret.Get(0).(func() query.DebugPrint); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(query.DebugPrint) - } - } - - return r0 -} - // Restart provides a mock function with given fields: func (_m *Client) Restart() error { ret := _m.Called()