Skip to content

Commit

Permalink
Add integration and unit tests
Browse files Browse the repository at this point in the history
tablet_integration_test.go: Integration tests checking if
TokenAwareHostPolicy and shard awareness works correctly when
using tablets, and if adding new table changes tablets table.

tablet_test.go: Unit tests checking if searching in tablets list
works correctly.
  • Loading branch information
sylwiaszunejko committed Jan 8, 2024
1 parent 39500a7 commit a98a393
Show file tree
Hide file tree
Showing 6 changed files with 659 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ jobs:
DOCKER_COMPOSE_VERSION: 2.20.0
run: sudo curl -L "https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

- run: sudo sh -c "echo 2097152 >> /proc/sys/fs/aio-max-nr"
- run: ./integration.sh cassandra scylla
- run: ./integration.sh integration scylla
- run: ./integration.sh ccm
- run: ./integration.sh tablet
87 changes: 76 additions & 11 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (
)

var (
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagMultiNodeCluster = flag.String("multiCluster", "127.0.0.2", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")

flagCassVersion cassVersion
)
Expand All @@ -38,6 +39,10 @@ func getClusterHosts() []string {
return strings.Split(*flagCluster, ",")
}

func getMultiNodeClusterHosts() []string {
return strings.Split(*flagMultiNodeCluster, ",")
}

func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
if *flagRunSslTest {
cluster.Port = 9142
Expand Down Expand Up @@ -102,6 +107,35 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
return cluster
}

func createMultiNodeCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
clusterHosts := getMultiNodeClusterHosts()
cluster := NewCluster(clusterHosts...)
cluster.ProtoVersion = *flagProto
cluster.CQLVersion = *flagCQL
cluster.Timeout = *flagTimeout
cluster.Consistency = Quorum
cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
if *flagRetry > 0 {
cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
}

switch *flagCompressTest {
case "snappy":
cluster.Compressor = &SnappyCompressor{}
case "":
default:
panic("invalid compressor: " + *flagCompressTest)
}

cluster = addSslOptions(cluster)

for _, opt := range opts {
opt(cluster)
}

return cluster
}

func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
// TODO: tb.Helper()
c := *cluster
Expand Down Expand Up @@ -149,6 +183,37 @@ func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
return session
}

func createSessionFromMultiNodeCluster(cluster *ClusterConfig, tb testing.TB) *Session {
keyspace := "test1"

session, err := cluster.CreateSession()
if err != nil {
tb.Fatal("createSession:", err)
}

initOnce.Do(func() {
if err = createTable(session, `DROP KEYSPACE IF EXISTS `+keyspace); err != nil {
panic(fmt.Sprintf("unable to drop keyspace: %v", err))
}

if err = createTable(session, fmt.Sprintf(`CREATE KEYSPACE %s
WITH replication = {
'class': 'NetworkTopologyStrategy',
'replication_factor': 1,
'initial_tablets': 8
};`, keyspace)); err != nil {
panic(fmt.Sprintf("unable to create keyspace: %v", err))
}

})

if err := session.control.awaitSchemaAgreement(); err != nil {
tb.Fatal(err)
}

return session
}

func createSession(tb testing.TB, opts ...func(config *ClusterConfig)) *Session {
cluster := createCluster(opts...)
return createSessionFromCluster(cluster, tb)
Expand Down
54 changes: 54 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,60 @@ services:
interval: 5s
timeout: 5s
retries: 18
node_2:
image: scylladb/scylla-nightly
command: |
--experimental-features consistent-topology-changes
--experimental-features tablets
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.12
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.12", "-e", "select * from system.local" ]
interval: 5s
timeout: 5s
retries: 18
node_3:
image: scylladb/scylla-nightly
command: |
--experimental-features consistent-topology-changes
--experimental-features tablets
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.13
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.13", "-e", "select * from system.local" ]
interval: 5s
timeout: 5s
retries: 18
depends_on:
node_2:
condition: service_healthy
node_4:
image: scylladb/scylla-nightly
command: |
--experimental-features consistent-topology-changes
--experimental-features tablets
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.14
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.14", "-e", "select * from system.local" ]
interval: 5s
timeout: 5s
retries: 18
depends_on:
node_3:
condition: service_healthy
networks:
public:
driver: bridge
Expand Down
21 changes: 18 additions & 3 deletions integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,25 @@ function scylla_restart() {
scylla_restart

readonly clusterSize=1
readonly multiNodeClusterSize=3
readonly scylla_liveset="192.168.100.11"
readonly scylla_tablet_liveset="192.168.100.12"
readonly cversion="3.11.4"
readonly proto=4
readonly args="-gocql.timeout=60s -proto=${proto} -rf=${clusterSize} -clusterSize=${clusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -cluster=${scylla_liveset}"

echo "==> Running $* tests with args: ${args}"
go test -timeout=5m -race -tags="$*" ${args} ./...
readonly tabletArgs="-gocql.timeout=60s -proto=${proto} -rf=1 -clusterSize=${multiNodeClusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -multiCluster=${scylla_tablet_liveset}"

if [[ "$*" == *"tablet"* ]];
then
echo "==> Running tablet tests with args: ${tabletArgs}"
go test -timeout=5m -race -tags="tablet" ${tabletArgs} ./...
fi

TAGS=$*
TAGS=${TAGS//"tablet"/}

if [ ! -z "$TAGS" ];
then
echo "==> Running ${TAGS} tests with args: ${args}"
go test -timeout=5m -race -tags="$TAGS" ${args} ./...
fi
134 changes: 134 additions & 0 deletions tablet_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//go:build tablet
// +build tablet

package gocql

import (
"bytes"
"context"
"fmt"
"regexp"
"strings"
"testing"
)

// Check if TokenAwareHostPolicy works correctly when using tablets
func TestTablets(t *testing.T) {
cluster := createMultiNodeCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session := createSessionFromMultiNodeCluster(cluster, t)
defer session.Close()

if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test1", "table1")); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}

hosts, _, err := session.hostSource.GetHosts()
assertTrue(t, "err == nil", err == nil)

hostAddresses := []string{}
for _, host := range hosts {
hostAddresses = append(hostAddresses, host.connectAddress.String())
}

ctx := context.Background()

i := 0
for i < 50 {
i = i + 1
err = session.Query(`INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
if err != nil {
t.Fatal(err)
}
}

i = 0
for i < 50 {
i = i + 1

var pk int
var ck int
var v int

buf := &bytes.Buffer{}
trace := NewTraceWriter(session, buf)

err = session.Query(`SELECT pk, ck, v FROM test1.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
if err != nil {
t.Fatal(err)
}

queriedHosts := 0
for _, hostAddress := range hostAddresses {
if strings.Contains(buf.String(), hostAddress) {
queriedHosts = queriedHosts + 1
}
}

assertEqual(t, "queriedHosts", 1, queriedHosts)
}
}

// Check if shard awareness works correctly when using tablets
func TestTabletsShardAwareness(t *testing.T) {
cluster := createMultiNodeCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session := createSessionFromMultiNodeCluster(cluster, t)
defer session.Close()

if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test1", "table_shard")); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}

ctx := context.Background()

i := 0
for i < 50 {
i = i + 1
err := session.Query(`INSERT INTO test1.table_shard (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
if err != nil {
t.Fatal(err)
}
}

i = 0
for i < 50 {
i = i + 1

var pk int
var ck int
var v int

buf := &bytes.Buffer{}
trace := NewTraceWriter(session, buf)

err := session.Query(`SELECT pk, ck, v FROM test1.table_shard WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
if err != nil {
t.Fatal(err)
}

re := regexp.MustCompile(`\[shard .*\]`)

shards := re.FindAllString(buf.String(), -1)

// find duplicates to check how many shards are used
allShards := make(map[string]bool)
shardList := []string{}
for _, item := range shards {
if _, value := allShards[item]; !value {
allShards[item] = true
shardList = append(shardList, item)
}
}

assertEqual(t, "shardList", 1, len(shardList))
}
}
Loading

0 comments on commit a98a393

Please sign in to comment.