diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index a20feaaa3f7bc..762342657508e 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -3980,6 +3980,16 @@ bool THD::wsrep_parallel_slave_wait_for_prior_commit() return false; } +void wsrep_parallel_slave_wakeup_subsequent_commits(void *thd_ptr) +{ + THD *thd = (THD*)thd_ptr; + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_commit_ptr) + { + thd->wait_for_commit_ptr->wakeup_subsequent_commits(0); + } +} + /***** callbacks for wsrep service ************/ my_bool get_wsrep_recovery() diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 25e71638efd74..36be65a6d5209 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -27,6 +27,7 @@ class THD; void wsrep_commit_empty(THD* thd, bool all); +void wsrep_parallel_slave_wakeup_subsequent_commits(void *); /* Return true if THD has active wsrep transaction. @@ -265,7 +266,9 @@ static inline int wsrep_before_prepare(THD* thd, bool all) { DBUG_RETURN(ret); } - if ((ret= thd->wsrep_cs().before_prepare()) == 0) + wsrep::provider::seq_cb_t seq_cb{ + thd, wsrep_parallel_slave_wakeup_subsequent_commits}; + if ((ret= thd->wsrep_cs().before_prepare(&seq_cb)) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); wsrep_xid_init(&thd->wsrep_xid, @@ -314,7 +317,9 @@ static inline int wsrep_before_commit(THD* thd, bool all) int ret= 0; DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); - if ((ret= thd->wsrep_cs().before_commit()) == 0) + wsrep::provider::seq_cb_t seq_cb{ + thd, wsrep_parallel_slave_wakeup_subsequent_commits}; + if ((ret= thd->wsrep_cs().before_commit(&seq_cb)) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); if (!thd->variables.gtid_seq_no &&