diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h index cf94364c1cf82..75e280bd8b04b 100644 --- a/include/mysql/service_wsrep.h +++ b/include/mysql/service_wsrep.h @@ -54,6 +54,7 @@ extern struct wsrep_service_st { int (*wsrep_is_wsrep_xid_func)(const void *xid); long long (*wsrep_xid_seqno_func)(const struct xid_t *xid); const unsigned char* (*wsrep_xid_uuid_func)(const struct xid_t *xid); + const struct xid_t* (*wsrep_commit_xid_func)(const MYSQL_THD thd); my_bool (*wsrep_on_func)(const MYSQL_THD thd); bool (*wsrep_prepare_key_for_innodb_func)(MYSQL_THD thd, const unsigned char*, size_t, const unsigned char*, size_t, struct wsrep_buf*, size_t*); void (*wsrep_thd_LOCK_func)(const MYSQL_THD thd); @@ -63,6 +64,7 @@ extern struct wsrep_service_st { int (*wsrep_thd_retry_counter_func)(const MYSQL_THD thd); bool (*wsrep_thd_ignore_table_func)(MYSQL_THD thd); long long (*wsrep_thd_trx_seqno_func)(const MYSQL_THD thd); + const unsigned char* (*wsrep_thd_trx_uuid_func)(const MYSQL_THD thd); my_bool (*wsrep_thd_is_aborting_func)(const MYSQL_THD thd); void (*wsrep_set_data_home_dir_func)(const char *data_dir); my_bool (*wsrep_thd_is_BF_func)(const MYSQL_THD thd, my_bool sync); @@ -109,6 +111,7 @@ extern struct wsrep_service_st { #define wsrep_is_wsrep_xid(X) wsrep_service->wsrep_is_wsrep_xid_func(X) #define wsrep_xid_seqno(X) wsrep_service->wsrep_xid_seqno_func(X) #define wsrep_xid_uuid(X) wsrep_service->wsrep_xid_uuid_func(X) +#define wsrep_commit_xid(thd) wsrep_service->wsrep_commit_xid_func(thd) #define wsrep_on(thd) (thd) && WSREP_ON && wsrep_service->wsrep_on_func(thd) #define wsrep_prepare_key_for_innodb(A,B,C,D,E,F,G) wsrep_service->wsrep_prepare_key_for_innodb_func(A,B,C,D,E,F,G) #define wsrep_thd_LOCK(T) wsrep_service->wsrep_thd_LOCK_func(T) @@ -120,6 +123,7 @@ extern struct wsrep_service_st { #define wsrep_thd_retry_counter(T) wsrep_service->wsrep_thd_retry_counter_func(T) #define wsrep_thd_ignore_table(T) wsrep_service->wsrep_thd_ignore_table_func(T) #define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T) +#define wsrep_thd_trx_uuid(T) wsrep_service->wsrep_thd_trx_uuid_func(T) #define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A) #define wsrep_thd_is_BF(T,S) wsrep_service->wsrep_thd_is_BF_func(T,S) #define wsrep_thd_is_aborting(T) wsrep_service->wsrep_thd_is_aborting_func(T) @@ -163,7 +167,9 @@ extern "C" const char *wsrep_thd_query(const MYSQL_THD thd); extern "C" int wsrep_is_wsrep_xid(const void* xid); extern "C" long long wsrep_xid_seqno(const struct xid_t* xid); const unsigned char* wsrep_xid_uuid(const struct xid_t* xid); +extern "C" const struct xid_t* wsrep_commit_xid(const MYSQL_THD thd); extern "C" long long wsrep_thd_trx_seqno(const MYSQL_THD thd); +extern "C" const unsigned char* wsrep_thd_trx_uuid(const MYSQL_THD thd); my_bool get_wsrep_recovery(); bool wsrep_thd_ignore_table(MYSQL_THD thd); void wsrep_set_data_home_dir(const char *data_dir); diff --git a/sql/handler.cc b/sql/handler.cc index 9ca2fee591c5d..b6977733bacca 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1999,15 +1999,6 @@ int ha_commit_trans(THD *thd, bool all) xid= thd->transaction->implicit_xid.quick_get_my_xid(); #ifdef WITH_WSREP - if (run_wsrep_hooks && !error) - { - wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid); - if (!s.is_undefined()) - { - // xid was rewritten by wsrep - xid= s.get(); - } - } if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all))) goto wsrep_err; #endif /* WITH_WSREP */ @@ -2692,7 +2683,26 @@ static void xarecover_do_commit_or_rollback(transaction_participant *hton, x= *member->full_xid; if (xarecover_decide_to_commit(member, ptr_commit_max)) + { +#ifdef WITH_WSREP + XID wsrep_commit_xid; + if (!member->wsrep_seqno.is_undefined()) + { + wsrep::gtid wsrep_gtid{ + member->wsrep_uuid, + member->wsrep_seqno}; + wsrep_server_gtid_t server_gtid{member->wsrep_gtid_domain_id, + member->wsrep_gtid_server_id, + member->wsrep_gtid_seq_no}; + wsrep_xid_init(&wsrep_commit_xid, wsrep_gtid, server_gtid); + wsrep_recovery_commit_xid= &wsrep_commit_xid; + } +#endif /* WITH_WSREP */ rc= hton->commit_by_xid(&x); +#ifdef WITH_WSREP + wsrep_recovery_commit_xid = nullptr; +#endif /* WITH_WSREP */ + } else if (hton->recover_rollback_by_xid && IF_WSREP(!(WSREP_ON || wsrep_recovery), true)) rc= hton->recover_rollback_by_xid(&x); diff --git a/sql/handler.h b/sql/handler.h index a4da228756598..3036150cfe1b0 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -44,6 +44,10 @@ #include "mem_root_array.h" #include // pair #include /* __attribute__ */ +#ifdef WITH_WSREP +#include "wsrep/id.hpp" +#include "wsrep/seqno.hpp" +#endif /* WITH_WSREP */ class Alter_info; class Virtual_column_info; @@ -1028,12 +1032,29 @@ struct xid_recovery_member XID *full_xid; // needed by wsrep or past it recovery decltype(::server_id) server_id; // server id of orginal server +#ifdef WITH_WSREP + /* wsrep specific fields to reconstruct wsrep_xid for commit. + If wsrep_seqno is undefined, the transaction is not a wsrep transaction. */ + wsrep::seqno wsrep_seqno; + wsrep::id wsrep_uuid; + uint32 wsrep_gtid_domain_id; + uint32 wsrep_gtid_server_id; + uint64 wsrep_gtid_seq_no; +#endif /* WITH_WSREP */ + xid_recovery_member(my_xid xid_arg, uint prepare_arg, bool decided_arg, XID *full_xid_arg, decltype(::server_id) server_id_arg) : xid(xid_arg), in_engine_prepare(prepare_arg), decided_to_commit(decided_arg), binlog_coord(Binlog_offset(MAX_binlog_id, MAX_off_t)), - full_xid(full_xid_arg), server_id(server_id_arg) {}; + full_xid(full_xid_arg), server_id(server_id_arg) +#ifdef WITH_WSREP + , + wsrep_seqno(wsrep::seqno::undefined()), + wsrep_uuid(wsrep::id::undefined()), wsrep_gtid_domain_id(0), + wsrep_gtid_server_id(0), wsrep_gtid_seq_no(0) +#endif /* WITH_WSREP */ + {} }; /* for recover() handlerton call */ diff --git a/sql/log.cc b/sql/log.cc index 2fc87ade6ae1c..bd6418e3bb52c 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -2071,6 +2071,14 @@ binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, DBUG_ASSERT(xid); // replaced former treatment of ONE-PHASE XA Xid_log_event end_evt(thd, xid, TRUE); +#ifdef WITH_WSREP + if (WSREP(thd)) + { + end_evt.wsrep_seqno= wsrep_thd_trx_seqno(thd); + memcpy(end_evt.wsrep_uuid, wsrep_thd_trx_uuid(thd), + sizeof(end_evt.wsrep_uuid)); + } +#endif if (!thd->rgi_slave && !thd->user_time.val) { /* @@ -11791,6 +11799,16 @@ class Recovery_context bool last_gtid_no2pc; // true when the group does not end with Xid event uint last_gtid_engines; Binlog_offset last_gtid_coord; // +#ifdef WITH_WSREP + /* + Wsrep gtid specific fields are initialized from SE checkpoint and + updated when Gtid_log_event with domain id matching last_wsrep_gtid_domain_id + is processed. + */ + uint32 last_wsrep_gtid_domain_id; + uint32 last_wsrep_gtid_server_id; + uint64 last_wsrep_gtid_seq_no; +#endif /* WITH_WSREP */ /* When true, it's semisync slave recovery mode rolls back transactions in doubt and wipes them off from binlog. @@ -11884,6 +11902,10 @@ class Recovery_context */ void process_gtid(int round, Gtid_log_event *gev, LOG_INFO *linfo); +#ifdef WITH_WSREP + void process_wsrep(xid_recovery_member *member, const Xid_log_event *xid_ev); +#endif /* WITH_WSREP */ + /* Compute next action at the end of processing of the current binlog file. It may increment the round. @@ -12027,6 +12049,13 @@ Recovery_context::Recovery_context() : binlog_unsafe_gtid= truncate_gtid= truncate_gtid_1st_round= rpl_gtid(); if (do_truncate) gtid_maybe_to_truncate= new Dynamic_array(16, 16); +#ifdef WITH_WSREP + wsrep_server_gtid_t wsrep_server_gtid= + wsrep_get_SE_checkpoint(); + last_wsrep_gtid_domain_id= wsrep_server_gtid.domain_id; + last_wsrep_gtid_server_id= wsrep_server_gtid.server_id; + last_wsrep_gtid_seq_no= wsrep_server_gtid.seqno; +#endif /* WITH_WSREP */ } bool Recovery_context::reset_truncate_coord(my_off_t pos) @@ -12232,8 +12261,30 @@ void Recovery_context::process_gtid(int round, Gtid_log_event *gev, /* Update the binlog state with any 'valid' GTID logged after Gtid_list. */ last_gtid_valid= true; // may flip at Xid when falls to truncate } +#ifdef WITH_WSREP + if (last_gtid.domain_id == last_wsrep_gtid_domain_id) + { + last_wsrep_gtid_server_id= last_gtid.server_id; + last_wsrep_gtid_seq_no= last_gtid.seq_no; + } +#endif /* WITH_WSREP */ } +#ifdef WITH_WSREP +void Recovery_context::process_wsrep(xid_recovery_member *member, + const Xid_log_event *xid_ev) +{ + if (member && xid_ev->wsrep_seqno != Xid_log_event::wsrep_seqno_undefined) + { + member->wsrep_seqno= wsrep::seqno{xid_ev->wsrep_seqno}; + member->wsrep_uuid= wsrep::id{xid_ev->wsrep_uuid, sizeof(member->wsrep_uuid)}; + member->wsrep_gtid_domain_id= last_wsrep_gtid_domain_id; + member->wsrep_gtid_server_id= last_wsrep_gtid_server_id; + member->wsrep_gtid_seq_no= last_wsrep_gtid_seq_no; + } +} +#endif /* WITH_WSREP */ + int Recovery_context::next_binlog_or_round(int& round, const char *last_log_name, const char *binlog_checkpoint_name, @@ -12381,9 +12432,10 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, case XID_EVENT: if (do_xa) { + const Xid_log_event *xid_ev= static_cast(ev); xid_recovery_member *member= (xid_recovery_member*) - my_hash_search(&xids, (uchar*) &static_cast(ev)->xid, + my_hash_search(&xids, (uchar*) &xid_ev->xid, sizeof(my_xid)); #ifndef HAVE_REPLICATION { @@ -12393,6 +12445,9 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, #else if (ctx.decide_or_assess(member, round, fdle, linfo, end_pos)) goto err2; +#ifdef WITH_WSREP + ctx.process_wsrep(member, xid_ev); +#endif /* WITH_WSREP */ #endif } break; diff --git a/sql/log_event.cc b/sql/log_event.cc index 424fa80fb008b..8f67a7d9e87bd 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1139,7 +1139,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len, ev= new Intvar_log_event(buf, fdle); break; case XID_EVENT: - ev= new Xid_log_event(buf, fdle); + ev= new Xid_log_event(buf, event_len, fdle); break; case XA_PREPARE_LOG_EVENT: ev= new XA_prepare_log_event(buf, fdle); @@ -2743,6 +2743,7 @@ Rand_log_event::Rand_log_event(const uchar *buf, Xid_log_event:: Xid_log_event(const uchar *buf, + uint event_len, const Format_description_log_event *description_event) :Xid_apply_log_event(buf, description_event) { @@ -2750,6 +2751,21 @@ Xid_log_event(const uchar *buf, buf+= description_event->common_header_len + description_event->post_header_len[XID_EVENT-1]; memcpy((char*) &xid, buf, sizeof(xid)); + const uint len_with_wsrep= description_event->common_header_len + + description_event->post_header_len[XID_EVENT-1] + + sizeof(xid) + sizeof(wsrep_seqno) + sizeof(wsrep_uuid); + if (event_len == len_with_wsrep) + { + buf += sizeof(xid); + wsrep_seqno= sint8korr(buf); + buf += sizeof(wsrep_seqno); + memcpy(wsrep_uuid, buf, sizeof(wsrep_uuid)); + } + else + { + wsrep_seqno= wsrep_seqno_undefined; + memset(wsrep_uuid, 0, sizeof(wsrep_uuid)); + } } /************************************************************************** diff --git a/sql/log_event.h b/sql/log_event.h index 68c8f05d8c537..ad59c6b6e6dcb 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -2823,6 +2823,9 @@ class Xid_apply_log_event: public Log_event Logs xid of the transaction-to-be-committed in the 2pc protocol. Has no meaning in replication, slaves ignore it. + Stores wsrep_seqno and wsrep_uuid in the event body if the event + was produced by server with wsrep enabled. + @section Xid_log_event_binary_format Binary Format */ #ifdef MYSQL_CLIENT @@ -2833,10 +2836,16 @@ class Xid_log_event: public Xid_apply_log_event { public: my_xid xid; + /* This matches the definition of WSREP_SEQNO_UNDEFINED and + wsrep::seqno::undefined() */ + constexpr static int64 wsrep_seqno_undefined= -1; + int64 wsrep_seqno; + uchar wsrep_uuid[16]; #ifdef MYSQL_SERVER Xid_log_event(THD* thd_arg, my_xid x, bool direct): - Xid_apply_log_event(thd_arg), xid(x) + Xid_apply_log_event(thd_arg), xid(x), + wsrep_seqno(wsrep_seqno_undefined), wsrep_uuid() { if (direct) cache_type= Log_event::EVENT_NO_CACHE; @@ -2853,10 +2862,16 @@ class Xid_log_event: public Xid_apply_log_event #endif Xid_log_event(const uchar *buf, + uint event_len, const Format_description_log_event *description_event); ~Xid_log_event() = default; Log_event_type get_type_code() override { return XID_EVENT;} - int get_data_size() override { return sizeof(xid); } + int get_data_size() override + { + return sizeof(xid) + + (wsrep_seqno != wsrep_seqno_undefined ? + sizeof(wsrep_seqno) + sizeof(wsrep_uuid) : 0); + } #ifdef MYSQL_SERVER bool write(Log_event_writer *writer) override; #endif diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc index 659d745fe4904..5729e7ef8d29a 100644 --- a/sql/log_event_client.cc +++ b/sql/log_event_client.cc @@ -2464,9 +2464,24 @@ bool Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) char buf[64]; longlong10_to_str(xid, buf, 10); - if (print_header(&cache, print_event_info, FALSE) || - my_b_printf(&cache, "\tXid = %s\n", buf)) - goto err; + if (wsrep_seqno == wsrep_seqno_undefined) + { + if (print_header(&cache, print_event_info, FALSE) || + my_b_printf(&cache, "\tXid = %s\n", buf)) + goto err; + } + else + { + char wsrep_seqno_buf[22]; + longlong10_to_str(wsrep_seqno, wsrep_seqno_buf, 10); + char wsrep_uuid_buf[MY_UUID_STRING_LENGTH + 1]; + my_uuid2str(wsrep_uuid, wsrep_uuid_buf, 1); + wsrep_uuid_buf[MY_UUID_STRING_LENGTH]= '\0'; + if (print_header(&cache, print_event_info, FALSE) || + my_b_printf(&cache, "\tXid = %s, wsrep_seqno = %s, wsrep_uuid = %s\n", + buf, wsrep_seqno_buf, wsrep_uuid_buf)) + goto err; + } } if (my_b_printf(&cache, is_flashback ? "START TRANSACTION%s\n" : "COMMIT%s\n", print_event_info->delimiter)) diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index c0a58ecaf9dec..f48ed14358665 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -3846,9 +3846,21 @@ int Xid_log_event::do_commit() bool Xid_log_event::write(Log_event_writer *writer) { DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(writer, sizeof(xid)) || - write_data(writer, (uchar*)&xid, sizeof(xid)) || - write_footer(writer); + if (wsrep_seqno == wsrep_seqno_undefined) + return write_header(writer, sizeof(xid)) || + write_data(writer, (uchar*)&xid, sizeof(xid)) || + write_footer(writer); + else + { + uchar data[sizeof(xid) + sizeof(wsrep_seqno) + sizeof(wsrep_uuid)]; + memcpy(data, &xid, sizeof(xid)); + int8store(data + sizeof(xid), wsrep_seqno); + memcpy(data + sizeof(xid) + sizeof(wsrep_seqno), wsrep_uuid, + sizeof(wsrep_uuid)); + return write_header(writer, sizeof(xid) + sizeof(wsrep_seqno) + + sizeof(wsrep_uuid)) || + write_data(writer, data, sizeof(data)) || write_footer(writer); + } } /************************************************************************** diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 9d04849aadbeb..6d9f89dc1b9bc 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -27,6 +27,15 @@ extern "C" my_bool wsrep_on(const THD *thd) return my_bool(WSREP(thd)); } +extern "C" const struct xid_t* wsrep_commit_xid(const MYSQL_THD thd) +{ + if (wsrep_recovery_commit_xid) + { + return wsrep_recovery_commit_xid; + } + return thd && wsrep_is_wsrep_xid(&thd->wsrep_xid) ? &thd->wsrep_xid : nullptr; +} + extern "C" void wsrep_thd_LOCK(const THD *thd) { mysql_mutex_lock(&thd->LOCK_thd_data); @@ -110,6 +119,21 @@ extern "C" long long wsrep_thd_trx_seqno(const THD *thd) } } +extern "C" const unsigned char *wsrep_thd_trx_uuid(const MYSQL_THD thd) +{ + const wsrep::client_state &cs= thd->wsrep_cs(); + if (cs.mode() == wsrep::client_state::m_toi) + { + return static_cast( + cs.toi_meta().gtid().id().data()); + } + else + { + return static_cast( + cs.transaction().ws_meta().gtid().id().data()); + } +} + extern "C" void wsrep_thd_self_abort(THD *thd) { thd->wsrep_cs().bf_abort(wsrep::seqno(0)); diff --git a/sql/sql_class.h b/sql/sql_class.h index 6c73bc5863883..297006d8b39ea 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5649,10 +5649,6 @@ class THD: public THD_count, /* this must be first */ const XID *get_xid() const { -#ifdef WITH_WSREP - if (!wsrep_xid.is_null()) - return &wsrep_xid; -#endif /* WITH_WSREP */ return (transaction->xid_state.is_explicit_XA() ? transaction->xid_state.get_xid() : &transaction->implicit_xid); diff --git a/sql/sql_plugin_services.inl b/sql/sql_plugin_services.inl index 2114f3560a7ec..4ff6968fac201 100644 --- a/sql/sql_plugin_services.inl +++ b/sql/sql_plugin_services.inl @@ -149,6 +149,7 @@ static struct wsrep_service_st wsrep_handler = { wsrep_is_wsrep_xid, wsrep_xid_seqno, wsrep_xid_uuid, + wsrep_commit_xid, wsrep_on, wsrep_prepare_key_for_innodb, wsrep_thd_LOCK, @@ -158,6 +159,7 @@ static struct wsrep_service_st wsrep_handler = { wsrep_thd_retry_counter, wsrep_thd_ignore_table, wsrep_thd_trx_seqno, + wsrep_thd_trx_uuid, wsrep_thd_is_aborting, wsrep_set_data_home_dir, wsrep_thd_is_BF, diff --git a/sql/wsrep_xid.cc b/sql/wsrep_xid.cc index 34eafe9c46cce..c6391a33fb28e 100644 --- a/sql/wsrep_xid.cc +++ b/sql/wsrep_xid.cc @@ -24,6 +24,9 @@ #include #include /* std::sort() */ + +XID* wsrep_recovery_commit_xid = nullptr; + /* * WSREPXid */ diff --git a/sql/wsrep_xid.h b/sql/wsrep_xid.h index 45ba6ffee6b9e..cc6c5d80d58de 100644 --- a/sql/wsrep_xid.h +++ b/sql/wsrep_xid.h @@ -35,5 +35,11 @@ bool wsrep_set_SE_checkpoint(const wsrep::gtid& gtid, const wsrep_server_gtid_t& void wsrep_sort_xid_array(XID *array, int len); +/** + Pointer to the xid that is used to commit the transaction + during recovery. +*/ +extern XID* wsrep_recovery_commit_xid; + #endif /* WITH_WSREP */ #endif /* WSREP_UTILS_H */ diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index e27e87b67da97..ee3b359ad753e 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -249,8 +249,8 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) srv_force_recovery >= SRV_FORCE_NO_BACKGROUND))); #ifdef WITH_WSREP - if (wsrep_is_wsrep_xid(&trx->xid)) - trx_rseg_update_wsrep_checkpoint(rseg_header, &trx->xid, mtr); + if (const XID *xid = wsrep_commit_xid(trx->mysql_thd)) + trx_rseg_update_wsrep_checkpoint(rseg_header, xid, mtr); #endif if (trx->mysql_log_file_name && *trx->mysql_log_file_name)