diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index ce31ff0c..15691efb 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -88,6 +88,10 @@ def _test_rolling_upgrade(self, path, nodes): } cluster = self._new_cluster(path.from_version, nodes, settings=settings) cluster.start() + replica_cluster = None + if path.from_version.startswith("5"): + replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) + replica_cluster.start() with connect(cluster.node().http_url, error_trace=True) as conn: c = conn.cursor() c.execute("create user arthur with (password = 'secret')") @@ -133,6 +137,21 @@ def _test_rolling_upgrade(self, path, nodes): ) partitioned by (a) clustered into 1 shards with (number_of_replicas = 0) ''') + # FDW: two CrateDB clusters setting up foreign data wrappers bidirectionally + if int(path.from_version.split('.')[1]) >= 7: + c.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") + expected_active_shards += 1 + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: + rc = replica_conn.cursor() + psql_port = cluster.node().addresses.psql.port + replica_psql_port = replica_cluster.node().addresses.psql.port + assert 5430 <= psql_port <= 5440 and 5430 <= replica_psql_port <= 5440 + rc.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") + rc.execute(f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{psql_port}/')") + rc.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')") + c.execute(f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{replica_psql_port}/')") + c.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')") + c.execute(''' CREATE FUNCTION foo(INT) RETURNS INT @@ -282,6 +301,26 @@ def _test_rolling_upgrade(self, path, nodes): c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx]) self.assertEqual(c.fetchall(), [[partition_version]]) + # Ensure FDWs are functional + if int(path.from_version.split('.')[1]) >= 7: + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: + rc = replica_conn.cursor() + wait_for_active_shards(c) + wait_for_active_shards(rc) + rc.execute("select count(a) from doc.remote_y") + count = rc.fetchall()[0][0] + c.execute("insert into doc.y values (1)") + c.execute("refresh table doc.y") + rc.execute("select count(a) from doc.remote_y") + self.assertEqual(rc.fetchall()[0][0], count + 1) + + c.execute("select count(a) from doc.remote_y") + count = c.fetchall()[0][0] + rc.execute("insert into doc.y values (1)") + rc.execute("refresh table doc.y") + c.execute("select count(a) from doc.remote_y") + self.assertEqual(c.fetchall()[0][0], count + 1) + # Finally validate that all shards (primaries and replicas) of all partitions are started # and writes into the partitioned table while upgrading were successful with connect(cluster.node().http_url, error_trace=True) as conn: