Skip to content

Commit

Permalink
Merge remote-tracking branch 'gocql/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
avelanarius committed Jun 15, 2023
2 parents f60a732 + d4a7bef commit d522c40
Show file tree
Hide file tree
Showing 16 changed files with 1,015 additions and 181 deletions.
7 changes: 7 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,10 @@ Stefan Miklosovic <smiklosovic@apache.org>
Adam Burk <amburk@gmail.com>
Valerii Ponomarov <kiparis.kh@gmail.com>
Neal Turett <neal.turett@datadoghq.com>
Doug Schaapveld <djschaap@gmail.com>
Steven Seidman <steven.seidman@datadoghq.com>
Wojciech Przytuła <wojciech.przytula@scylladb.com>
João Reis <joao.reis@datastax.com>
Lauro Ramos Venancio <lauro.venancio@incognia.com>
Dmitry Kropachev <dmitry.kropachev@gmail.com>
Oliver Boyle <pleasedontspamme4321+gocql@gmail.com>
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ There are open pull requests to merge the functionality to the upstream project:

It also provides support for shard aware ports, a faster way to connect to all shards, details available in [blogpost](https://www.scylladb.com/2021/04/27/connect-faster-to-scylla-with-a-shard-aware-port/).

Sunsetting Model
----------------

In general, the gocql team will focus on supporting the current and previous versions of Go. gocql may still work with older versions of Go, but official support for these versions will have been sunset.

Installation
------------

Expand Down
46 changes: 46 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,52 @@ func TestEmptyTimestamp(t *testing.T) {
}
}

// Integration test of just querying for data from the system.schema_keyspace table where the keyspace DOES exist.
func TestGetKeyspaceMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()

keyspaceMetadata, err := getKeyspaceMetadata(session, "gocql_test")
if err != nil {
t.Fatalf("failed to query the keyspace metadata with err: %v", err)
}
if keyspaceMetadata == nil {
t.Fatal("failed to query the keyspace metadata, nil returned")
}
if keyspaceMetadata.Name != "gocql_test" {
t.Errorf("Expected keyspace name to be 'gocql' but was '%s'", keyspaceMetadata.Name)
}
if keyspaceMetadata.StrategyClass != "org.apache.cassandra.locator.SimpleStrategy" {
t.Errorf("Expected replication strategy class to be 'org.apache.cassandra.locator.SimpleStrategy' but was '%s'", keyspaceMetadata.StrategyClass)
}
if keyspaceMetadata.StrategyOptions == nil {
t.Error("Expected replication strategy options map but was nil")
}
rfStr, ok := keyspaceMetadata.StrategyOptions["replication_factor"]
if !ok {
t.Fatalf("Expected strategy option 'replication_factor' but was not found in %v", keyspaceMetadata.StrategyOptions)
}
rfInt, err := strconv.Atoi(rfStr.(string))
if err != nil {
t.Fatalf("Error converting string to int with err: %v", err)
}
if rfInt != *flagRF {
t.Errorf("Expected replication factor to be %d but was %d", *flagRF, rfInt)
}
}

// Integration test of just querying for data from the system.schema_keyspace table where the keyspace DOES NOT exist.
func TestGetKeyspaceMetadataFails(t *testing.T) {
session := createSession(t)
defer session.Close()

_, err := getKeyspaceMetadata(session, "gocql_keyspace_does_not_exist")

if err != ErrKeyspaceDoesNotExist || err == nil {
t.Fatalf("Expected error of type ErrKeySpaceDoesNotExist. Instead, error was %v", err)
}
}

// Integration test of the routing key calculation
func TestRoutingKey(t *testing.T) {
session := createSession(t)
Expand Down
20 changes: 4 additions & 16 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,10 @@ func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter
}
}

func (c *Conn) querySystemLocal(ctx context.Context) *Iter {
return c.query(ctx, "SELECT * FROM system.local WHERE key='local'")
}

func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"

Expand Down Expand Up @@ -1793,22 +1797,6 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas)
}

func (c *Conn) localHostInfo(ctx context.Context) (*HostInfo, error) {
row, err := c.query(ctx, "SELECT * FROM system.local WHERE key='local'").rowMap()
if err != nil {
return nil, err
}

port := c.conn.RemoteAddr().(*net.TCPAddr).Port
// TODO(zariel): avoid doing this here
host, err := c.session.hostInfoFromMap(row, &HostInfo{hostname: c.host.connectAddress.String(), connectAddress: c.host.connectAddress, port: port})
if err != nil {
return nil, err
}

return c.session.ring.addOrUpdate(host), nil
}

var (
ErrQueryArgLength = errors.New("gocql: query argument length mismatch")
ErrTimeoutNoResponse = errors.New("gocql: no response received from cassandra within timeout period")
Expand Down
18 changes: 11 additions & 7 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *controlConn) heartBeat() {
reconn:
// try to connect a bit faster
sleepTime = 1 * time.Second
c.reconnect(true)
c.reconnect()
continue
}
}
Expand Down Expand Up @@ -269,11 +269,14 @@ type connHost struct {

func (c *controlConn) setupConn(conn *Conn) error {
// we need up-to-date host info for the filterHost call below
host, err := conn.localHostInfo(context.TODO())
iter := conn.querySystemLocal(context.TODO())
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.conn.RemoteAddr().(*net.TCPAddr).Port)
if err != nil {
return err
}

host = c.session.ring.addOrUpdate(host)

if c.session.cfg.filterHost(host) {
return fmt.Errorf("host was filtered: %v", host.ConnectAddress())
}
Expand Down Expand Up @@ -335,7 +338,7 @@ func (c *controlConn) registerEvents(conn *Conn) error {
return nil
}

func (c *controlConn) reconnect(refreshring bool) {
func (c *controlConn) reconnect() {
if atomic.LoadInt32(&c.state) == controlConnClosing {
return
}
Expand Down Expand Up @@ -381,8 +384,9 @@ func (c *controlConn) reconnect(refreshring bool) {
return
}

if refreshring {
c.session.hostSource.refreshRing()
err = c.session.refreshRing()
if err != nil {
c.session.logger.Printf("gocql: unable to refresh ring: %v\n", err)
}
}

Expand All @@ -399,7 +403,7 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) {
return
}

c.reconnect(false)
c.reconnect()
}

func (c *controlConn) getConn() *connHost {
Expand Down Expand Up @@ -433,7 +437,7 @@ func (c *controlConn) withConnHost(fn func(*connHost) *Iter) *Iter {

connectAttempts++

c.reconnect(false)
c.reconnect()
continue
}

Expand Down
179 changes: 179 additions & 0 deletions control_ccm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//go:build ccm
// +build ccm

package gocql

/* TODO: Fix this test
import (
"fmt"
"sync"
"testing"
"time"
"github.com/gocql/gocql/internal/ccm"
)
type TestHostFilter struct {
mu sync.Mutex
allowedHosts map[string]ccm.Host
}
func (f *TestHostFilter) Accept(h *HostInfo) bool {
f.mu.Lock()
defer f.mu.Unlock()
_, ok := f.allowedHosts[h.ConnectAddress().String()]
return ok
}
func (f *TestHostFilter) SetAllowedHosts(hosts map[string]ccm.Host) {
f.mu.Lock()
defer f.mu.Unlock()
f.allowedHosts = hosts
}
func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
if err := ccm.AllUp(); err != nil {
t.Fatal(err)
}
allCcmHosts, err := ccm.Status()
if err != nil {
t.Fatal(err)
}
if len(allCcmHosts) < 2 {
t.Skip("this test requires at least 2 nodes")
}
allAllowedHosts := map[string]ccm.Host{}
var firstNode *ccm.Host
for _, node := range allCcmHosts {
if firstNode == nil {
firstNode = &node
}
allAllowedHosts[node.Addr] = node
}
allowedHosts := map[string]ccm.Host{
firstNode.Addr: *firstNode,
}
testFilter := &TestHostFilter{allowedHosts: allowedHosts}
session := createSession(t, func(config *ClusterConfig) {
config.Hosts = []string{firstNode.Addr}
config.Events.DisableTopologyEvents = true
config.Events.DisableNodeStatusEvents = true
config.HostFilter = testFilter
})
defer session.Close()
if session.control == nil || session.control.conn.Load() == nil {
t.Fatal("control conn is nil")
}
controlConnection := session.control.getConn()
ccHost := controlConnection.host
var ccHostName string
for _, node := range allCcmHosts {
if node.Addr == ccHost.ConnectAddress().String() {
ccHostName = node.Name
break
}
}
if ccHostName == "" {
t.Fatal("could not find name of control host")
}
if err := ccm.NodeDown(ccHostName); err != nil {
t.Fatal()
}
defer func() {
ccmStatus, err := ccm.Status()
if err != nil {
t.Logf("could not bring nodes back up after test: %v", err)
return
}
for _, node := range ccmStatus {
if node.State == ccm.NodeStateDown {
err = ccm.NodeUp(node.Name)
if err != nil {
t.Logf("could not bring node %v back up after test: %v", node.Name, err)
}
}
}
}()
assertNodeDown := func() error {
hosts := session.ring.currentHosts()
if len(hosts) != 1 {
return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts))
}
for _, host := range hosts {
if host.IsUp() {
return fmt.Errorf("expected host to be DOWN but %v isn't", host.String())
}
}
session.pool.mu.RLock()
poolsLen := len(session.pool.hostConnPools)
session.pool.mu.RUnlock()
if poolsLen != 0 {
return fmt.Errorf("expected 0 connection pool but there were %v", poolsLen)
}
return nil
}
maxAttempts := 5
delayPerAttempt := 1 * time.Second
assertErr := assertNodeDown()
for i := 0; i < maxAttempts && assertErr != nil; i++ {
time.Sleep(delayPerAttempt)
assertErr = assertNodeDown()
}
if assertErr != nil {
t.Fatal(err)
}
testFilter.SetAllowedHosts(allAllowedHosts)
if err = ccm.NodeUp(ccHostName); err != nil {
t.Fatal(err)
}
assertNodeUp := func() error {
hosts := session.ring.currentHosts()
if len(hosts) != len(allCcmHosts) {
return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts))
}
for _, host := range hosts {
if !host.IsUp() {
return fmt.Errorf("expected all hosts to be UP but %v isn't", host.String())
}
}
session.pool.mu.RLock()
poolsLen := len(session.pool.hostConnPools)
session.pool.mu.RUnlock()
if poolsLen != len(allCcmHosts) {
return fmt.Errorf("expected %v connection pool but there were %v", len(allCcmHosts), poolsLen)
}
return nil
}
maxAttempts = 30
delayPerAttempt = 1 * time.Second
assertErr = assertNodeUp()
for i := 0; i < maxAttempts && assertErr != nil; i++ {
time.Sleep(delayPerAttempt)
assertErr = assertNodeUp()
}
if assertErr != nil {
t.Fatal(err)
}
}
*/
5 changes: 3 additions & 2 deletions dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func (hd *defaultHostDialer) DialHost(ctx context.Context, host *HostInfo) (*Dia
return nil, fmt.Errorf("host missing port: %v", port)
}

addr := host.HostnameAndPort()
conn, err := hd.dialer.DialContext(ctx, "tcp", addr)
connAddr := host.ConnectAddressAndPort()
conn, err := hd.dialer.DialContext(ctx, "tcp", connAddr)
if err != nil {
return nil, err
}
addr := host.HostnameAndPort()
return WrapTLS(ctx, conn, addr, hd.tlsConfig)
}

Expand Down
3 changes: 3 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
// protocol version explicitly, as it's not defined which version will be used in certain situations (for example
// during upgrade of the cluster when some of the nodes support different set of protocol versions than other nodes).
//
// The driver advertises the module name and version in the STARTUP message, so servers are able to detect the version.
// If you use replace directive in go.mod, the driver will send information about the replacement module instead.
//
// When ready, create a session from the configuration. Don't forget to Close the session once you are done with it:
//
// session, err := cluster.CreateSession()
Expand Down
Loading

0 comments on commit d522c40

Please sign in to comment.