diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9b7ba32c5..cc617c0a8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -34,6 +34,8 @@ jobs: DOCKER_COMPOSE_VERSION: 2.20.0 run: sudo curl -L "https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + - run: sudo sh -c "echo 2097152 >> /proc/sys/fs/aio-max-nr" - run: ./integration.sh cassandra scylla - run: ./integration.sh integration scylla - run: ./integration.sh ccm + - run: ./integration.sh tablet diff --git a/common_test.go b/common_test.go index 8821be381..02cdce38f 100644 --- a/common_test.go +++ b/common_test.go @@ -13,17 +13,18 @@ import ( ) var ( - flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples") - flagProto = flag.Int("proto", 0, "protcol version") - flagCQL = flag.String("cql", "3.0.0", "CQL version") - flagRF = flag.Int("rf", 1, "replication factor for test keyspace") - clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster") - flagRetry = flag.Int("retries", 5, "number of times to retry queries") - flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll") - flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test") - flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test") - flagCompressTest = flag.String("compressor", "", "compressor to use") - flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations") + flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples") + flagMultiNodeCluster = flag.String("multiCluster", "127.0.0.2", "a comma-separated list of host:port tuples") + flagProto = flag.Int("proto", 0, "protcol version") + flagCQL = flag.String("cql", "3.0.0", "CQL version") + flagRF = flag.Int("rf", 1, "replication factor for test keyspace") + clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster") + flagRetry = flag.Int("retries", 5, "number of times to retry queries") + flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll") + flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test") + flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test") + flagCompressTest = flag.String("compressor", "", "compressor to use") + flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations") flagCassVersion cassVersion ) @@ -38,6 +39,10 @@ func getClusterHosts() []string { return strings.Split(*flagCluster, ",") } +func getMultiNodeClusterHosts() []string { + return strings.Split(*flagMultiNodeCluster, ",") +} + func addSslOptions(cluster *ClusterConfig) *ClusterConfig { if *flagRunSslTest { cluster.Port = 9142 @@ -102,6 +107,35 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig { return cluster } +func createMultiNodeCluster(opts ...func(*ClusterConfig)) *ClusterConfig { + clusterHosts := getMultiNodeClusterHosts() + cluster := NewCluster(clusterHosts...) + cluster.ProtoVersion = *flagProto + cluster.CQLVersion = *flagCQL + cluster.Timeout = *flagTimeout + cluster.Consistency = Quorum + cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow + if *flagRetry > 0 { + cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry} + } + + switch *flagCompressTest { + case "snappy": + cluster.Compressor = &SnappyCompressor{} + case "": + default: + panic("invalid compressor: " + *flagCompressTest) + } + + cluster = addSslOptions(cluster) + + for _, opt := range opts { + opt(cluster) + } + + return cluster +} + func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) { // TODO: tb.Helper() c := *cluster @@ -149,6 +183,37 @@ func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session { return session } +func createSessionFromMultiNodeCluster(cluster *ClusterConfig, tb testing.TB) *Session { + keyspace := "test1" + + session, err := cluster.CreateSession() + if err != nil { + tb.Fatal("createSession:", err) + } + + initOnce.Do(func() { + if err = createTable(session, `DROP KEYSPACE IF EXISTS `+keyspace); err != nil { + panic(fmt.Sprintf("unable to drop keyspace: %v", err)) + } + + if err = createTable(session, fmt.Sprintf(`CREATE KEYSPACE %s + WITH replication = { + 'class': 'NetworkTopologyStrategy', + 'replication_factor': 1, + 'initial_tablets': 8 + };`, keyspace)); err != nil { + panic(fmt.Sprintf("unable to create keyspace: %v", err)) + } + + }) + + if err := session.control.awaitSchemaAgreement(); err != nil { + tb.Fatal(err) + } + + return session +} + func createSession(tb testing.TB, opts ...func(config *ClusterConfig)) *Session { cluster := createCluster(opts...) return createSessionFromCluster(cluster, tb) diff --git a/docker-compose.yml b/docker-compose.yml index 9e7490c7d..8090eb7e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,6 +32,60 @@ services: interval: 5s timeout: 5s retries: 18 + node_2: + image: scylladb/scylla-nightly + command: | + --experimental-features consistent-topology-changes + --experimental-features tablets + --smp 2 + --memory 1G + --seeds 192.168.100.12 + networks: + public: + ipv4_address: 192.168.100.12 + healthcheck: + test: [ "CMD", "cqlsh", "192.168.100.12", "-e", "select * from system.local" ] + interval: 5s + timeout: 5s + retries: 18 + node_3: + image: scylladb/scylla-nightly + command: | + --experimental-features consistent-topology-changes + --experimental-features tablets + --smp 2 + --memory 1G + --seeds 192.168.100.12 + networks: + public: + ipv4_address: 192.168.100.13 + healthcheck: + test: [ "CMD", "cqlsh", "192.168.100.13", "-e", "select * from system.local" ] + interval: 5s + timeout: 5s + retries: 18 + depends_on: + node_2: + condition: service_healthy + node_4: + image: scylladb/scylla-nightly + command: | + --experimental-features consistent-topology-changes + --experimental-features tablets + --smp 2 + --memory 1G + --seeds 192.168.100.12 + networks: + public: + ipv4_address: 192.168.100.14 + healthcheck: + test: [ "CMD", "cqlsh", "192.168.100.14", "-e", "select * from system.local" ] + interval: 5s + timeout: 5s + retries: 18 + depends_on: + node_3: + condition: service_healthy networks: public: driver: bridge diff --git a/integration.sh b/integration.sh index 5c29615e9..6598599d1 100755 --- a/integration.sh +++ b/integration.sh @@ -28,10 +28,25 @@ function scylla_restart() { scylla_restart readonly clusterSize=1 +readonly multiNodeClusterSize=3 readonly scylla_liveset="192.168.100.11" +readonly scylla_tablet_liveset="192.168.100.12" readonly cversion="3.11.4" readonly proto=4 readonly args="-gocql.timeout=60s -proto=${proto} -rf=${clusterSize} -clusterSize=${clusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -cluster=${scylla_liveset}" - -echo "==> Running $* tests with args: ${args}" -go test -timeout=5m -race -tags="$*" ${args} ./... +readonly tabletArgs="-gocql.timeout=60s -proto=${proto} -rf=1 -clusterSize=${multiNodeClusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -multiCluster=${scylla_tablet_liveset}" + +if [[ "$*" == *"tablet"* ]]; +then + echo "==> Running tablet tests with args: ${tabletArgs}" + go test -timeout=5m -race -tags="tablet" ${tabletArgs} ./... +fi + +TAGS=$* +TAGS=${TAGS//"tablet"/} + +if [ ! -z "$TAGS" ]; +then + echo "==> Running ${TAGS} tests with args: ${args}" + go test -timeout=5m -race -tags="$TAGS" ${args} ./... +fi diff --git a/tablet_integration_test.go b/tablet_integration_test.go new file mode 100644 index 000000000..aec8d744e --- /dev/null +++ b/tablet_integration_test.go @@ -0,0 +1,134 @@ +//go:build tablet +// +build tablet + +package gocql + +import ( + "bytes" + "context" + "fmt" + "regexp" + "strings" + "testing" +) + +// Check if TokenAwareHostPolicy works correctly when using tablets +func TestTablets(t *testing.T) { + cluster := createMultiNodeCluster() + + fallback := RoundRobinHostPolicy() + cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback) + + session := createSessionFromMultiNodeCluster(cluster, t) + defer session.Close() + + if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck)); + `, "test1", "table1")); err != nil { + panic(fmt.Sprintf("unable to create table: %v", err)) + } + + hosts, _, err := session.hostSource.GetHosts() + assertTrue(t, "err == nil", err == nil) + + hostAddresses := []string{} + for _, host := range hosts { + hostAddresses = append(hostAddresses, host.connectAddress.String()) + } + + ctx := context.Background() + + i := 0 + for i < 50 { + i = i + 1 + err = session.Query(`INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec() + if err != nil { + t.Fatal(err) + } + } + + i = 0 + for i < 50 { + i = i + 1 + + var pk int + var ck int + var v int + + buf := &bytes.Buffer{} + trace := NewTraceWriter(session, buf) + + err = session.Query(`SELECT pk, ck, v FROM test1.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v) + if err != nil { + t.Fatal(err) + } + + queriedHosts := 0 + for _, hostAddress := range hostAddresses { + if strings.Contains(buf.String(), hostAddress) { + queriedHosts = queriedHosts + 1 + } + } + + assertEqual(t, "queriedHosts", 1, queriedHosts) + } +} + +// Check if shard awareness works correctly when using tablets +func TestTabletsShardAwareness(t *testing.T) { + cluster := createMultiNodeCluster() + + fallback := RoundRobinHostPolicy() + cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback) + + session := createSessionFromMultiNodeCluster(cluster, t) + defer session.Close() + + if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck)); + `, "test1", "table_shard")); err != nil { + panic(fmt.Sprintf("unable to create table: %v", err)) + } + + ctx := context.Background() + + i := 0 + for i < 50 { + i = i + 1 + err := session.Query(`INSERT INTO test1.table_shard (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec() + if err != nil { + t.Fatal(err) + } + } + + i = 0 + for i < 50 { + i = i + 1 + + var pk int + var ck int + var v int + + buf := &bytes.Buffer{} + trace := NewTraceWriter(session, buf) + + err := session.Query(`SELECT pk, ck, v FROM test1.table_shard WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v) + if err != nil { + t.Fatal(err) + } + + re := regexp.MustCompile(`\[shard .*\]`) + + shards := re.FindAllString(buf.String(), -1) + + // find duplicates to check how many shards are used + allShards := make(map[string]bool) + shardList := []string{} + for _, item := range shards { + if _, value := allShards[item]; !value { + allShards[item] = true + shardList = append(shardList, item) + } + } + + assertEqual(t, "shardList", 1, len(shardList)) + } +} diff --git a/tablet_test.go b/tablet_test.go new file mode 100644 index 000000000..f6afda8e2 --- /dev/null +++ b/tablet_test.go @@ -0,0 +1,375 @@ +//go:build all || unit +// +build all unit + +package gocql + +import ( + "sync" + "testing" +) + +var tablets = []*TabletInfo{ + { + sync.RWMutex{}, + "test1", + "table1", + -7917529027641081857, + -6917529027641081857, + []ReplicaInfo{{TimeUUID(), 9}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{{TimeUUID(), 8}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{{TimeUUID(), 9}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + -2305843009213693953, + -1, + []ReplicaInfo{{TimeUUID(), 8}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + -1, + 2305843009213693951, + []ReplicaInfo{{TimeUUID(), 3}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + 2305843009213693951, + 4611686018427387903, + []ReplicaInfo{{TimeUUID(), 3}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + 4611686018427387903, + 6917529027641081855, + []ReplicaInfo{{TimeUUID(), 7}}, + }, + { + sync.RWMutex{}, + "test1", + "table1", + 6917529027641081855, + 9223372036854775807, + []ReplicaInfo{{TimeUUID(), 7}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + -7917529027641081857, + -6917529027641081857, + []ReplicaInfo{{TimeUUID(), 9}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{{TimeUUID(), 8}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{{TimeUUID(), 9}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + -2305843009213693953, + -1, + []ReplicaInfo{{TimeUUID(), 8}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + -1, + 2305843009213693951, + []ReplicaInfo{{TimeUUID(), 3}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + 2305843009213693951, + 4611686018427387903, + []ReplicaInfo{{TimeUUID(), 3}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + 4611686018427387903, + 6917529027641081855, + []ReplicaInfo{{TimeUUID(), 7}}, + }, + { + sync.RWMutex{}, + "test2", + "table1", + 6917529027641081855, + 9223372036854775807, + []ReplicaInfo{{TimeUUID(), 7}}, + }, +} + +func TestFindTablets(t *testing.T) { + id, id2 := findTablets(tablets, "test1", "table1") + assertEqual(t, "id", 0, id) + assertEqual(t, "id2", 7, id2) + + id, id2 = findTablets(tablets, "test2", "table1") + assertEqual(t, "id", 8, id) + assertEqual(t, "id2", 15, id2) + + id, id2 = findTablets(tablets, "test3", "table1") + assertEqual(t, "id", -1, id) + assertEqual(t, "id2", -1, id2) +} + +func TestFindTabletForToken(t *testing.T) { + tablet := findTabletForToken(tablets, parseInt64Token("0"), 0, 7) + assertTrue(t, "tablet.lastToken == 2305843009213693951", tablet.lastToken == 2305843009213693951) + + tablet = findTabletForToken(tablets, parseInt64Token("9223372036854775807"), 0, 7) + assertTrue(t, "tablet.lastToken == 9223372036854775807", tablet.lastToken == 9223372036854775807) + + tablet = findTabletForToken(tablets, parseInt64Token("-4611686018427387904"), 0, 7) + assertTrue(t, "tablet.lastToken == -2305843009213693953", tablet.lastToken == -2305843009213693953) +} + +func CompareRanges(tablets []*TabletInfo, ranges [][]int64) bool { + if len(tablets) != len(ranges) { + return false + } + + for idx, tablet := range tablets { + if tablet.FirstToken() != ranges[idx][0] || tablet.LastToken() != ranges[idx][1] { + return false + } + } + return true +} +func TestAddTabletToEmptyTablets(t *testing.T) { + tablets := []*TabletInfo{} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", CompareRanges(tablets, [][]int64{{-6917529027641081857, -4611686018427387905}})) +} + +func TestAddTabletAtTheBeggining(t *testing.T) { + tablets := []*TabletInfo{{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -8611686018427387905, + -7917529027641081857, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", + CompareRanges(tablets, [][]int64{{-8611686018427387905, -7917529027641081857}, {-6917529027641081857, -4611686018427387905}})) +} + +func TestAddTabletAtTheEnd(t *testing.T) { + tablets := []*TabletInfo{{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -1, + 2305843009213693951, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", CompareRanges(tablets, [][]int64{{-6917529027641081857, -4611686018427387905}, + {-1, 2305843009213693951}})) +} + +func TestAddTabletInTheMiddle(t *testing.T) { + tablets := []*TabletInfo{{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }, { + sync.RWMutex{}, + "test_ks", + "test_tb", + -1, + 2305843009213693951, + []ReplicaInfo{}, + }} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", CompareRanges(tablets, [][]int64{{-6917529027641081857, -4611686018427387905}, + {-4611686018427387905, -2305843009213693953}, + {-1, 2305843009213693951}})) +} + +func TestAddTabletIntersecting(t *testing.T) { + tablets := []*TabletInfo{{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }, { + sync.RWMutex{}, + "test_ks", + "test_tb", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{}, + }, { + sync.RWMutex{}, + "test_ks", + "test_tb", + -2305843009213693953, + -1, + []ReplicaInfo{}, + }, { + sync.RWMutex{}, + "test_ks", + "test_tb", + -1, + 2305843009213693951, + []ReplicaInfo{}, + }} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -3611686018427387905, + -6, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", + CompareRanges(tablets, [][]int64{{-6917529027641081857, -4611686018427387905}, + {-3611686018427387905, -6}, + {-1, 2305843009213693951}})) +} + +func TestAddTabletIntersectingWithFirst(t *testing.T) { + tablets := []*TabletInfo{{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -8611686018427387905, + -7917529027641081857, + []ReplicaInfo{}, + }, { + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -8011686018427387905, + -7987529027641081857, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", CompareRanges(tablets, [][]int64{{-8011686018427387905, -7987529027641081857}, + {-6917529027641081857, -4611686018427387905}})) +} + +func TestAddTabletIntersectingWithLast(t *testing.T) { + tablets := []*TabletInfo{{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -8611686018427387905, + -7917529027641081857, + []ReplicaInfo{}, + }, { + sync.RWMutex{}, + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{}, + }} + + tablets = addTabletToTabletsList(tablets, &TabletInfo{ + sync.RWMutex{}, + "test_ks", + "test_tb", + -5011686018427387905, + -2987529027641081857, + []ReplicaInfo{}, + }) + + assertTrue(t, "Token range in tablets table not correct", CompareRanges(tablets, [][]int64{{-8611686018427387905, -7917529027641081857}, + {-5011686018427387905, -2987529027641081857}})) +}