Skip to content

Commit 27cf106

Browse files
committed
add catchup() method to PostgresNode
1 parent 76729ec commit 27cf106

File tree

2 files changed

+68
-21
lines changed

2 files changed

+68
-21
lines changed

testgres/testgres.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ class BackupException(Exception):
109109
pass
110110

111111

112+
class CatchUpException(Exception):
113+
pass
114+
115+
112116
class TestgresLogger(threading.Thread):
113117
"""
114118
Helper class to implement reading from postgresql.log
@@ -318,12 +322,14 @@ def spawn_primary(self, name, destroy=True):
318322
# Copy backup to new data dir
319323
shutil.copytree(data1, data2)
320324
except Exception as e:
321-
raise BackupException(e.message)
325+
raise BackupException(str(e))
322326
else:
323327
base_dir = self.base_dir
324328

325329
# build a new PostgresNode
326-
node = get_new_node(name, base_dir)
330+
node = PostgresNode(name=name,
331+
base_dir=base_dir,
332+
master=self.original_node)
327333
node.append_conf("postgresql.conf", "port = {}".format(node.port))
328334

329335
# record new status
@@ -803,13 +809,21 @@ def restore(self, dbname, filename, username=None):
803809

804810
self.psql(dbname=dbname, filename=filename, username=username)
805811

806-
def poll_query_until(self, dbname, query, username=None, max_attempts=60, sleep_time=1):
812+
def poll_query_until(self,
813+
dbname,
814+
query,
815+
username=None,
816+
max_attempts=60,
817+
sleep_time=1):
807818
"""
808819
Run a query once a second until it returs True.
809820
810821
Args:
811822
dbname: database name to connect to (str).
812823
query: query to be executed (str).
824+
username: database user name (str).
825+
max_attempts: how many times should we try?
826+
sleep_time: how long should we sleep after a failure?
813827
"""
814828

815829
attemps = 0
@@ -844,7 +858,7 @@ def execute(self, dbname, query, username=None, commit=False):
844858
dbname: database name to connect to (str).
845859
query: query to be executed (str).
846860
username: database user name (str).
847-
commit: should we commit this query?.
861+
commit: should we commit this query?
848862
849863
Returns:
850864
A list of tuples representing rows.
@@ -885,6 +899,32 @@ def replicate(self, name, username=None, xlog_method=DEFAULT_XLOG_METHOD):
885899
return self.backup(username=username,
886900
xlog_method=xlog_method).spawn_replica(name)
887901

902+
def catchup(self):
903+
"""
904+
Wait until async replica catches up with its master.
905+
"""
906+
907+
master = self.master
908+
909+
cur_ver = LooseVersion(get_pg_config()["VERSION_NUM"])
910+
min_ver = LooseVersion('10.0')
911+
912+
if cur_ver >= min_ver:
913+
poll_lsn = "select pg_current_wal_lsn()::text"
914+
wait_lsn = "select pg_last_wal_replay_lsn() >= '{}'::pg_lsn"
915+
else:
916+
poll_lsn = "select pg_current_xlog_location()::text"
917+
wait_lsn = "select pg_last_xlog_replay_location() >= '{}'::pg_lsn"
918+
919+
if not master:
920+
raise CatchUpException("Master node is not specified")
921+
922+
try:
923+
lsn = master.execute('postgres', poll_lsn)[0][0]
924+
self.poll_query_until('postgres', wait_lsn.format(lsn))
925+
except Exception as e:
926+
raise CatchUpException(str(e))
927+
888928
def pgbench_init(self, dbname='postgres', scale=1, options=[]):
889929
"""
890930
Prepare database for pgbench (create tables etc).
@@ -963,7 +1003,7 @@ def call_initdb(_data_dir):
9631003
_params = [_data_dir, "-N"] + initdb_params
9641004
_execute_utility("initdb", _params, initdb_logfile)
9651005
except Exception as e:
966-
raise InitNodeException(e.message)
1006+
raise InitNodeException(str(e))
9671007

9681008
# Call initdb if we have custom params
9691009
if initdb_params:
@@ -981,7 +1021,7 @@ def call_initdb(_data_dir):
9811021
# Copy cached initdb to current data dir
9821022
shutil.copytree(cached_data_dir, data_dir)
9831023
except Exception as e:
984-
raise InitNodeException(e.message)
1024+
raise InitNodeException(str(e))
9851025

9861026

9871027
def _execute_utility(util, args, logfile, write_to_pipe=True):

tests/test_simple.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,24 +224,12 @@ def test_backup_and_replication(self):
224224
res = replica.execute('postgres', 'select * from abc')
225225
self.assertListEqual(res, [(1, 2)])
226226

227-
cur_ver = LooseVersion(get_pg_config()['VERSION_NUM'])
228-
min_ver = LooseVersion('10')
229-
230-
# Prepare the query which would check whether record reached replica
231-
# (It is slightly different for Postgres 9.6 and Postgres 10+)
232-
if cur_ver >= min_ver:
233-
wait_lsn = 'SELECT pg_current_wal_lsn() <= replay_lsn ' \
234-
'FROM pg_stat_replication WHERE application_name = \'%s\'' \
235-
% replica.name
236-
else:
237-
wait_lsn = 'SELECT pg_current_xlog_location() <= replay_location '\
238-
'FROM pg_stat_replication WHERE application_name = \'%s\'' \
239-
% replica.name
240-
241227
# Insert into master node
242228
node.psql('postgres', 'insert into abc values (3, 4)')
229+
243230
# Wait until data syncronizes
244-
node.poll_query_until('postgres', wait_lsn)
231+
replica.catchup()
232+
245233
# Check that this record was exported to replica
246234
res = replica.execute('postgres', 'select * from abc')
247235
self.assertListEqual(res, [(1, 2), (3, 4)])
@@ -254,6 +242,25 @@ def test_replicate(self):
254242
res = replica.start().execute('postgres', 'select 1')
255243
self.assertListEqual(res, [(1, )])
256244

245+
node.execute(
246+
'postgres', 'create table test (val int)', commit=True)
247+
248+
replica.catchup()
249+
250+
res = node.execute('postgres', 'select * from test')
251+
self.assertListEqual(res, [])
252+
253+
def test_incorrect_catchup(self):
254+
with get_new_node('node') as node:
255+
node.init(allow_streaming=True).start()
256+
257+
got_exception = False
258+
try:
259+
node.catchup()
260+
except Exception as e:
261+
pass
262+
self.assertTrue(got_exception)
263+
257264
def test_dump(self):
258265
with get_new_node('node1') as node1:
259266
node1.init().start()

0 commit comments

Comments
 (0)