Skip to content

WIP: Store wsrep transaction seqno and UUID into Xid_log_event #485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/mysql/service_wsrep.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ extern struct wsrep_service_st {
int (*wsrep_is_wsrep_xid_func)(const void *xid);
long long (*wsrep_xid_seqno_func)(const struct xid_t *xid);
const unsigned char* (*wsrep_xid_uuid_func)(const struct xid_t *xid);
const struct xid_t* (*wsrep_commit_xid_func)(const MYSQL_THD thd);
my_bool (*wsrep_on_func)(const MYSQL_THD thd);
bool (*wsrep_prepare_key_for_innodb_func)(MYSQL_THD thd, const unsigned char*, size_t, const unsigned char*, size_t, struct wsrep_buf*, size_t*);
void (*wsrep_thd_LOCK_func)(const MYSQL_THD thd);
Expand All @@ -63,6 +64,7 @@ extern struct wsrep_service_st {
int (*wsrep_thd_retry_counter_func)(const MYSQL_THD thd);
bool (*wsrep_thd_ignore_table_func)(MYSQL_THD thd);
long long (*wsrep_thd_trx_seqno_func)(const MYSQL_THD thd);
const unsigned char* (*wsrep_thd_trx_uuid_func)(const MYSQL_THD thd);
my_bool (*wsrep_thd_is_aborting_func)(const MYSQL_THD thd);
void (*wsrep_set_data_home_dir_func)(const char *data_dir);
my_bool (*wsrep_thd_is_BF_func)(const MYSQL_THD thd, my_bool sync);
Expand Down Expand Up @@ -109,6 +111,7 @@ extern struct wsrep_service_st {
#define wsrep_is_wsrep_xid(X) wsrep_service->wsrep_is_wsrep_xid_func(X)
#define wsrep_xid_seqno(X) wsrep_service->wsrep_xid_seqno_func(X)
#define wsrep_xid_uuid(X) wsrep_service->wsrep_xid_uuid_func(X)
#define wsrep_commit_xid(thd) wsrep_service->wsrep_commit_xid_func(thd)
#define wsrep_on(thd) (thd) && WSREP_ON && wsrep_service->wsrep_on_func(thd)
#define wsrep_prepare_key_for_innodb(A,B,C,D,E,F,G) wsrep_service->wsrep_prepare_key_for_innodb_func(A,B,C,D,E,F,G)
#define wsrep_thd_LOCK(T) wsrep_service->wsrep_thd_LOCK_func(T)
Expand All @@ -120,6 +123,7 @@ extern struct wsrep_service_st {
#define wsrep_thd_retry_counter(T) wsrep_service->wsrep_thd_retry_counter_func(T)
#define wsrep_thd_ignore_table(T) wsrep_service->wsrep_thd_ignore_table_func(T)
#define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T)
#define wsrep_thd_trx_uuid(T) wsrep_service->wsrep_thd_trx_uuid_func(T)
#define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A)
#define wsrep_thd_is_BF(T,S) wsrep_service->wsrep_thd_is_BF_func(T,S)
#define wsrep_thd_is_aborting(T) wsrep_service->wsrep_thd_is_aborting_func(T)
Expand Down Expand Up @@ -163,7 +167,9 @@ extern "C" const char *wsrep_thd_query(const MYSQL_THD thd);
extern "C" int wsrep_is_wsrep_xid(const void* xid);
extern "C" long long wsrep_xid_seqno(const struct xid_t* xid);
const unsigned char* wsrep_xid_uuid(const struct xid_t* xid);
extern "C" const struct xid_t* wsrep_commit_xid(const MYSQL_THD thd);
extern "C" long long wsrep_thd_trx_seqno(const MYSQL_THD thd);
extern "C" const unsigned char* wsrep_thd_trx_uuid(const MYSQL_THD thd);
my_bool get_wsrep_recovery();
bool wsrep_thd_ignore_table(MYSQL_THD thd);
void wsrep_set_data_home_dir(const char *data_dir);
Expand Down
28 changes: 19 additions & 9 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1999,15 +1999,6 @@ int ha_commit_trans(THD *thd, bool all)
xid= thd->transaction->implicit_xid.quick_get_my_xid();

#ifdef WITH_WSREP
if (run_wsrep_hooks && !error)
{
wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
if (!s.is_undefined())
{
// xid was rewritten by wsrep
xid= s.get();
}
}
if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all)))
goto wsrep_err;
#endif /* WITH_WSREP */
Expand Down Expand Up @@ -2692,7 +2683,26 @@ static void xarecover_do_commit_or_rollback(transaction_participant *hton,
x= *member->full_xid;

if (xarecover_decide_to_commit(member, ptr_commit_max))
{
#ifdef WITH_WSREP
XID wsrep_commit_xid;
if (!member->wsrep_seqno.is_undefined())
{
wsrep::gtid wsrep_gtid{
member->wsrep_uuid,
member->wsrep_seqno};
wsrep_server_gtid_t server_gtid{member->wsrep_gtid_domain_id,
member->wsrep_gtid_server_id,
member->wsrep_gtid_seq_no};
wsrep_xid_init(&wsrep_commit_xid, wsrep_gtid, server_gtid);
wsrep_recovery_commit_xid= &wsrep_commit_xid;
}
#endif /* WITH_WSREP */
rc= hton->commit_by_xid(&x);
#ifdef WITH_WSREP
wsrep_recovery_commit_xid = nullptr;
#endif /* WITH_WSREP */
}
else if (hton->recover_rollback_by_xid &&
IF_WSREP(!(WSREP_ON || wsrep_recovery), true))
rc= hton->recover_rollback_by_xid(&x);
Expand Down
23 changes: 22 additions & 1 deletion sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
#include "mem_root_array.h"
#include <utility> // pair
#include <my_attribute.h> /* __attribute__ */
#ifdef WITH_WSREP
#include "wsrep/id.hpp"
#include "wsrep/seqno.hpp"
#endif /* WITH_WSREP */

class Alter_info;
class Virtual_column_info;
Expand Down Expand Up @@ -1028,12 +1032,29 @@ struct xid_recovery_member
XID *full_xid; // needed by wsrep or past it recovery
decltype(::server_id) server_id; // server id of orginal server

#ifdef WITH_WSREP
/* wsrep specific fields to reconstruct wsrep_xid for commit.
If wsrep_seqno is undefined, the transaction is not a wsrep transaction. */
wsrep::seqno wsrep_seqno;
wsrep::id wsrep_uuid;
uint32 wsrep_gtid_domain_id;
uint32 wsrep_gtid_server_id;
uint64 wsrep_gtid_seq_no;
#endif /* WITH_WSREP */

xid_recovery_member(my_xid xid_arg, uint prepare_arg, bool decided_arg,
XID *full_xid_arg, decltype(::server_id) server_id_arg)
: xid(xid_arg), in_engine_prepare(prepare_arg),
decided_to_commit(decided_arg),
binlog_coord(Binlog_offset(MAX_binlog_id, MAX_off_t)),
full_xid(full_xid_arg), server_id(server_id_arg) {};
full_xid(full_xid_arg), server_id(server_id_arg)
#ifdef WITH_WSREP
,
wsrep_seqno(wsrep::seqno::undefined()),
wsrep_uuid(wsrep::id::undefined()), wsrep_gtid_domain_id(0),
wsrep_gtid_server_id(0), wsrep_gtid_seq_no(0)
#endif /* WITH_WSREP */
{}
};

/* for recover() handlerton call */
Expand Down
57 changes: 56 additions & 1 deletion sql/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,14 @@ binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
DBUG_ASSERT(xid); // replaced former treatment of ONE-PHASE XA

Xid_log_event end_evt(thd, xid, TRUE);
#ifdef WITH_WSREP
if (WSREP(thd))
{
end_evt.wsrep_seqno= wsrep_thd_trx_seqno(thd);
memcpy(end_evt.wsrep_uuid, wsrep_thd_trx_uuid(thd),
sizeof(end_evt.wsrep_uuid));
}
#endif
if (!thd->rgi_slave && !thd->user_time.val)
{
/*
Expand Down Expand Up @@ -11791,6 +11799,16 @@ class Recovery_context
bool last_gtid_no2pc; // true when the group does not end with Xid event
uint last_gtid_engines;
Binlog_offset last_gtid_coord; // <binlog id, binlog offset>
#ifdef WITH_WSREP
/*
Wsrep gtid specific fields are initialized from SE checkpoint and
updated when Gtid_log_event with domain id matching last_wsrep_gtid_domain_id
is processed.
*/
uint32 last_wsrep_gtid_domain_id;
uint32 last_wsrep_gtid_server_id;
uint64 last_wsrep_gtid_seq_no;
#endif /* WITH_WSREP */
/*
When true, it's semisync slave recovery mode
rolls back transactions in doubt and wipes them off from binlog.
Expand Down Expand Up @@ -11884,6 +11902,10 @@ class Recovery_context
*/
void process_gtid(int round, Gtid_log_event *gev, LOG_INFO *linfo);

#ifdef WITH_WSREP
void process_wsrep(xid_recovery_member *member, const Xid_log_event *xid_ev);
#endif /* WITH_WSREP */

/*
Compute next action at the end of processing of the current binlog file.
It may increment the round.
Expand Down Expand Up @@ -12027,6 +12049,13 @@ Recovery_context::Recovery_context() :
binlog_unsafe_gtid= truncate_gtid= truncate_gtid_1st_round= rpl_gtid();
if (do_truncate)
gtid_maybe_to_truncate= new Dynamic_array<rpl_gtid>(16, 16);
#ifdef WITH_WSREP
wsrep_server_gtid_t wsrep_server_gtid=
wsrep_get_SE_checkpoint<wsrep_server_gtid_t>();
last_wsrep_gtid_domain_id= wsrep_server_gtid.domain_id;
last_wsrep_gtid_server_id= wsrep_server_gtid.server_id;
last_wsrep_gtid_seq_no= wsrep_server_gtid.seqno;
#endif /* WITH_WSREP */
}

bool Recovery_context::reset_truncate_coord(my_off_t pos)
Expand Down Expand Up @@ -12232,8 +12261,30 @@ void Recovery_context::process_gtid(int round, Gtid_log_event *gev,
/* Update the binlog state with any 'valid' GTID logged after Gtid_list. */
last_gtid_valid= true; // may flip at Xid when falls to truncate
}
#ifdef WITH_WSREP
if (last_gtid.domain_id == last_wsrep_gtid_domain_id)
{
last_wsrep_gtid_server_id= last_gtid.server_id;
last_wsrep_gtid_seq_no= last_gtid.seq_no;
}
#endif /* WITH_WSREP */
}

#ifdef WITH_WSREP
void Recovery_context::process_wsrep(xid_recovery_member *member,
const Xid_log_event *xid_ev)
{
if (member && xid_ev->wsrep_seqno != Xid_log_event::wsrep_seqno_undefined)
{
member->wsrep_seqno= wsrep::seqno{xid_ev->wsrep_seqno};
member->wsrep_uuid= wsrep::id{xid_ev->wsrep_uuid, sizeof(member->wsrep_uuid)};
member->wsrep_gtid_domain_id= last_wsrep_gtid_domain_id;
member->wsrep_gtid_server_id= last_wsrep_gtid_server_id;
member->wsrep_gtid_seq_no= last_wsrep_gtid_seq_no;
}
}
#endif /* WITH_WSREP */

int Recovery_context::next_binlog_or_round(int& round,
const char *last_log_name,
const char *binlog_checkpoint_name,
Expand Down Expand Up @@ -12381,9 +12432,10 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
case XID_EVENT:
if (do_xa)
{
const Xid_log_event *xid_ev= static_cast<const Xid_log_event*>(ev);
xid_recovery_member *member=
(xid_recovery_member*)
my_hash_search(&xids, (uchar*) &static_cast<Xid_log_event*>(ev)->xid,
my_hash_search(&xids, (uchar*) &xid_ev->xid,
sizeof(my_xid));
#ifndef HAVE_REPLICATION
{
Expand All @@ -12393,6 +12445,9 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
#else
if (ctx.decide_or_assess(member, round, fdle, linfo, end_pos))
goto err2;
#ifdef WITH_WSREP
ctx.process_wsrep(member, xid_ev);
#endif /* WITH_WSREP */
#endif
}
break;
Expand Down
18 changes: 17 additions & 1 deletion sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
ev= new Intvar_log_event(buf, fdle);
break;
case XID_EVENT:
ev= new Xid_log_event(buf, fdle);
ev= new Xid_log_event(buf, event_len, fdle);
break;
case XA_PREPARE_LOG_EVENT:
ev= new XA_prepare_log_event(buf, fdle);
Expand Down Expand Up @@ -2743,13 +2743,29 @@ Rand_log_event::Rand_log_event(const uchar *buf,

Xid_log_event::
Xid_log_event(const uchar *buf,
uint event_len,
const Format_description_log_event *description_event)
:Xid_apply_log_event(buf, description_event)
{
/* The Post-Header is empty. The Variable Data part begins immediately. */
buf+= description_event->common_header_len +
description_event->post_header_len[XID_EVENT-1];
memcpy((char*) &xid, buf, sizeof(xid));
const uint len_with_wsrep= description_event->common_header_len +
description_event->post_header_len[XID_EVENT-1] +
sizeof(xid) + sizeof(wsrep_seqno) + sizeof(wsrep_uuid);
if (event_len == len_with_wsrep)
{
buf += sizeof(xid);
wsrep_seqno= sint8korr(buf);
buf += sizeof(wsrep_seqno);
memcpy(wsrep_uuid, buf, sizeof(wsrep_uuid));
}
else
{
wsrep_seqno= wsrep_seqno_undefined;
memset(wsrep_uuid, 0, sizeof(wsrep_uuid));
}
}

/**************************************************************************
Expand Down
19 changes: 17 additions & 2 deletions sql/log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -2823,6 +2823,9 @@ class Xid_apply_log_event: public Log_event
Logs xid of the transaction-to-be-committed in the 2pc protocol.
Has no meaning in replication, slaves ignore it.

Stores wsrep_seqno and wsrep_uuid in the event body if the event
was produced by server with wsrep enabled.

@section Xid_log_event_binary_format Binary Format
*/
#ifdef MYSQL_CLIENT
Expand All @@ -2833,10 +2836,16 @@ class Xid_log_event: public Xid_apply_log_event
{
public:
my_xid xid;
/* This matches the definition of WSREP_SEQNO_UNDEFINED and
wsrep::seqno::undefined() */
constexpr static int64 wsrep_seqno_undefined= -1;
int64 wsrep_seqno;
uchar wsrep_uuid[16];

#ifdef MYSQL_SERVER
Xid_log_event(THD* thd_arg, my_xid x, bool direct):
Xid_apply_log_event(thd_arg), xid(x)
Xid_apply_log_event(thd_arg), xid(x),
wsrep_seqno(wsrep_seqno_undefined), wsrep_uuid()
{
if (direct)
cache_type= Log_event::EVENT_NO_CACHE;
Expand All @@ -2853,10 +2862,16 @@ class Xid_log_event: public Xid_apply_log_event
#endif

Xid_log_event(const uchar *buf,
uint event_len,
const Format_description_log_event *description_event);
~Xid_log_event() = default;
Log_event_type get_type_code() override { return XID_EVENT;}
int get_data_size() override { return sizeof(xid); }
int get_data_size() override
{
return sizeof(xid) +
(wsrep_seqno != wsrep_seqno_undefined ?
sizeof(wsrep_seqno) + sizeof(wsrep_uuid) : 0);
}
#ifdef MYSQL_SERVER
bool write(Log_event_writer *writer) override;
#endif
Expand Down
21 changes: 18 additions & 3 deletions sql/log_event_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2464,9 +2464,24 @@ bool Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
char buf[64];
longlong10_to_str(xid, buf, 10);

if (print_header(&cache, print_event_info, FALSE) ||
my_b_printf(&cache, "\tXid = %s\n", buf))
goto err;
if (wsrep_seqno == wsrep_seqno_undefined)
{
if (print_header(&cache, print_event_info, FALSE) ||
my_b_printf(&cache, "\tXid = %s\n", buf))
goto err;
}
else
{
char wsrep_seqno_buf[22];
longlong10_to_str(wsrep_seqno, wsrep_seqno_buf, 10);
char wsrep_uuid_buf[MY_UUID_STRING_LENGTH + 1];
my_uuid2str(wsrep_uuid, wsrep_uuid_buf, 1);
wsrep_uuid_buf[MY_UUID_STRING_LENGTH]= '\0';
if (print_header(&cache, print_event_info, FALSE) ||
my_b_printf(&cache, "\tXid = %s, wsrep_seqno = %s, wsrep_uuid = %s\n",
buf, wsrep_seqno_buf, wsrep_uuid_buf))
goto err;
}
}
if (my_b_printf(&cache, is_flashback ? "START TRANSACTION%s\n" : "COMMIT%s\n",
print_event_info->delimiter))
Expand Down
18 changes: 15 additions & 3 deletions sql/log_event_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3846,9 +3846,21 @@ int Xid_log_event::do_commit()
bool Xid_log_event::write(Log_event_writer *writer)
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
return write_header(writer, sizeof(xid)) ||
write_data(writer, (uchar*)&xid, sizeof(xid)) ||
write_footer(writer);
if (wsrep_seqno == wsrep_seqno_undefined)
return write_header(writer, sizeof(xid)) ||
write_data(writer, (uchar*)&xid, sizeof(xid)) ||
write_footer(writer);
else
{
uchar data[sizeof(xid) + sizeof(wsrep_seqno) + sizeof(wsrep_uuid)];
memcpy(data, &xid, sizeof(xid));
int8store(data + sizeof(xid), wsrep_seqno);
memcpy(data + sizeof(xid) + sizeof(wsrep_seqno), wsrep_uuid,
sizeof(wsrep_uuid));
return write_header(writer, sizeof(xid) + sizeof(wsrep_seqno) +
sizeof(wsrep_uuid)) ||
write_data(writer, data, sizeof(data)) || write_footer(writer);
}
}

/**************************************************************************
Expand Down
Loading