Skip to content

Commit

Permalink
Mitigate "gocql: no hosts available in the pool" cannot recover witho…
Browse files Browse the repository at this point in the history
  • Loading branch information
mfamador committed Feb 9, 2023
1 parent 0421da5 commit 05b7e8e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/generikvault/gvalstrings v0.0.0-20180926130504-471f38f0112a
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
github.com/gocql/gocql v1.2.1
github.com/gocql/gocql v1.3.1
github.com/gofrs/uuid v4.2.0+incompatible
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/gocql/gocql v1.2.1 h1:G/STxUzD6pGvRHzG0Fi7S04SXejMKBbRZb7pwre1edU=
github.com/gocql/gocql v1.2.1/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/gocql/gocql v1.3.1 h1:BTwM4rux+ah5G3oH6/MQa+tur/TDd/XAAOXDxBBs7rg=
github.com/gocql/gocql v1.3.1/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
34 changes: 34 additions & 0 deletions internal/impl/cassandra/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type cassandraWriter struct {
backoffMax time.Duration

session *gocql.Session
conn *gocql.ClusterConfig
connLock sync.RWMutex

argsMapping *mapping.Executor
Expand Down Expand Up @@ -192,6 +193,21 @@ func (c *cassandraWriter) parseArgs(mgr bundle.NewManagement) error {
return nil
}

var (
sessLock sync.Mutex
reopenSession bool
)

type observer struct{}

func (observer) ObserveConnect(info gocql.ObservedConnect) {
if info.Err != nil {
sessLock.Lock()
reopenSession = true
sessLock.Unlock()
}
}

func (c *cassandraWriter) Connect(ctx context.Context) error {
c.connLock.Lock()
defer c.connLock.Unlock()
Expand All @@ -214,10 +230,12 @@ func (c *cassandraWriter) Connect(ctx context.Context) error {
Password: c.conf.PasswordAuthenticator.Password,
}
}
conn.ConnectObserver = observer{}
conn.DisableInitialHostLookup = c.conf.DisableInitialHostLookup
if conn.Consistency, err = gocql.ParseConsistencyWrapper(c.conf.Consistency); err != nil {
return fmt.Errorf("parsing consistency: %w", err)
}
c.conn = conn

conn.RetryPolicy = &decorator{
NumRetries: int(c.conf.Config.MaxRetries),
Expand Down Expand Up @@ -249,6 +267,22 @@ func (c *cassandraWriter) WriteBatch(ctx context.Context, msg message.Batch) err
return component.ErrNotConnected
}

sessLock.Lock()
needReOpen := reopenSession
sessLock.Unlock()
if needReOpen {
c.log.Debugln("reopen the cassandra session")
var err error
c.session.Close()
c.session, err = c.conn.CreateSession()
if err != nil {
c.log.Debugf("error reopening session: %w", err)
}
sessLock.Lock()
reopenSession = false
sessLock.Unlock()
}

if msg.Len() == 1 {
return c.writeRow(session, msg)
}
Expand Down

0 comments on commit 05b7e8e

Please sign in to comment.