@@ -88,6 +88,10 @@ def _test_rolling_upgrade(self, path, nodes):
88
88
}
89
89
cluster = self ._new_cluster (path .from_version , nodes , settings = settings )
90
90
cluster .start ()
91
+ replica_cluster = None
92
+ if path .from_version .startswith ("5" ):
93
+ replica_cluster = self ._new_cluster (path .from_version , 1 , settings = settings , explicit_discovery = False )
94
+ replica_cluster .start ()
91
95
with connect (cluster .node ().http_url , error_trace = True ) as conn :
92
96
c = conn .cursor ()
93
97
c .execute ("create user arthur with (password = 'secret')" )
@@ -133,6 +137,21 @@ def _test_rolling_upgrade(self, path, nodes):
133
137
) partitioned by (a) clustered into 1 shards with (number_of_replicas = 0)
134
138
''' )
135
139
140
+ # FDW: two CrateDB clusters setting up foreign data wrappers bidirectionally
141
+ if int (path .from_version .split ('.' )[1 ]) >= 7 :
142
+ c .execute ("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)" )
143
+ expected_active_shards += 1
144
+ with connect (replica_cluster .node ().http_url , error_trace = True ) as replica_conn :
145
+ rc = replica_conn .cursor ()
146
+ psql_port = cluster .node ().addresses .psql .port
147
+ replica_psql_port = replica_cluster .node ().addresses .psql .port
148
+ assert 5430 <= psql_port <= 5440 and 5430 <= replica_psql_port <= 5440
149
+ rc .execute ("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)" )
150
+ rc .execute (f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{ psql_port } /')" )
151
+ rc .execute ("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')" )
152
+ c .execute (f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{ replica_psql_port } /')" )
153
+ c .execute ("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')" )
154
+
136
155
c .execute ('''
137
156
CREATE FUNCTION foo(INT)
138
157
RETURNS INT
@@ -282,6 +301,26 @@ def _test_rolling_upgrade(self, path, nodes):
282
301
c .execute ("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?" , [idx ])
283
302
self .assertEqual (c .fetchall (), [[partition_version ]])
284
303
304
+ # Ensure FDWs are functional
305
+ if int (path .from_version .split ('.' )[1 ]) >= 7 :
306
+ with connect (replica_cluster .node ().http_url , error_trace = True ) as replica_conn :
307
+ rc = replica_conn .cursor ()
308
+ wait_for_active_shards (c )
309
+ wait_for_active_shards (rc )
310
+ rc .execute ("select count(a) from doc.remote_y" )
311
+ count = rc .fetchall ()[0 ][0 ]
312
+ c .execute ("insert into doc.y values (1)" )
313
+ c .execute ("refresh table doc.y" )
314
+ rc .execute ("select count(a) from doc.remote_y" )
315
+ self .assertEqual (rc .fetchall ()[0 ][0 ], count + 1 )
316
+
317
+ c .execute ("select count(a) from doc.remote_y" )
318
+ count = c .fetchall ()[0 ][0 ]
319
+ rc .execute ("insert into doc.y values (1)" )
320
+ rc .execute ("refresh table doc.y" )
321
+ c .execute ("select count(a) from doc.remote_y" )
322
+ self .assertEqual (c .fetchall ()[0 ][0 ], count + 1 )
323
+
285
324
# Finally validate that all shards (primaries and replicas) of all partitions are started
286
325
# and writes into the partitioned table while upgrading were successful
287
326
with connect (cluster .node ().http_url , error_trace = True ) as conn :
0 commit comments