From d2fa3d5052f7ecd5e78c27408e11661c5d1dc1a2 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 21 Nov 2024 14:31:04 +0200 Subject: [PATCH] MDEV-20065 parallel replication for galera slave When replicating transactions from parallel slave replication processing, Galera must respect the commit order of the parallel slave replication. In the current implementation this is done by calling `wait_for_prior_commit()` before the write set is replicated and certified in before-prepare processing. This however establishes a critical section which is held over whole Galera replication step, and the commit rate will be limited by Galera replication latency. In order to allow concurrency in Galera replication step, the critical section must be released at earliest point where Galera can guarantee sequential consistency for replicated write sets. This change passes a callback to release the critical section by calling `wakeup_subsequent_commits()` to Galera library, which will call the callback once the correct replication order can be established. This functionality will be available from Galera 26.4.22 onwards. Note that call to `wakeup_subsequent_commits()` at this stage is safe from group commit point of view as Galera uses separate `wait_for_commit` context to control commit ordering. --- sql/wsrep_mysqld.cc | 10 ++++++++++ sql/wsrep_trans_observer.h | 9 +++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index a20feaaa3f7bc..762342657508e 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -3980,6 +3980,16 @@ bool THD::wsrep_parallel_slave_wait_for_prior_commit() return false; } +void wsrep_parallel_slave_wakeup_subsequent_commits(void *thd_ptr) +{ + THD *thd = (THD*)thd_ptr; + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_commit_ptr) + { + thd->wait_for_commit_ptr->wakeup_subsequent_commits(0); + } +} + /***** callbacks for wsrep service ************/ my_bool get_wsrep_recovery() diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 25e71638efd74..36be65a6d5209 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -27,6 +27,7 @@ class THD; void wsrep_commit_empty(THD* thd, bool all); +void wsrep_parallel_slave_wakeup_subsequent_commits(void *); /* Return true if THD has active wsrep transaction. @@ -265,7 +266,9 @@ static inline int wsrep_before_prepare(THD* thd, bool all) { DBUG_RETURN(ret); } - if ((ret= thd->wsrep_cs().before_prepare()) == 0) + wsrep::provider::seq_cb_t seq_cb{ + thd, wsrep_parallel_slave_wakeup_subsequent_commits}; + if ((ret= thd->wsrep_cs().before_prepare(&seq_cb)) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); wsrep_xid_init(&thd->wsrep_xid, @@ -314,7 +317,9 @@ static inline int wsrep_before_commit(THD* thd, bool all) int ret= 0; DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); - if ((ret= thd->wsrep_cs().before_commit()) == 0) + wsrep::provider::seq_cb_t seq_cb{ + thd, wsrep_parallel_slave_wakeup_subsequent_commits}; + if ((ret= thd->wsrep_cs().before_commit(&seq_cb)) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); if (!thd->variables.gtid_seq_no &&