Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assert that explicit transaction snapshot never overwrites an older one #1494

Open
wants to merge 1 commit into
base: fb-mysql-8.0.32
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3186,41 +3186,40 @@ static struct SYS_VAR *rocksdb_system_variables[] = {

class Rdb_explicit_snapshot : public explicit_snapshot {
public:
static std::shared_ptr<Rdb_explicit_snapshot> create(
THD *thd, snapshot_info_st *ssinfo, rocksdb::DB *db,
[[nodiscard]] static std::shared_ptr<Rdb_explicit_snapshot> create(
THD *thd, snapshot_info_st &ssinfo, rocksdb::DB *db,
const rocksdb::Snapshot *snapshot) {
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
auto s = std::unique_ptr<rocksdb::ManagedSnapshot>(
new rocksdb::ManagedSnapshot(db, snapshot));
if (!s) {
return nullptr;
}
ssinfo->snapshot_id = ++explicit_snapshot_counter;
const uint64_t client_provided_read_filtering_ts =
rdb_is_binlog_ttl_enabled()
? THDVAR(thd, consistent_snapshot_ttl_read_filtering_ts_nsec) /
1000000000UL
: 0;
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
ssinfo.snapshot_id = ++explicit_snapshot_counter;
// NOTE: We need to set read filtering ts to min of all active trxs and not
// the current hlc because some active trxs can commit to binlog later with
// a lower read filtering ts and that can cause failures when we try to use
// the snapshot to perform dump & load and then replay binlogs because some
// trxs in the binlog may access rows that have been expired according to
// current hlc.
ssinfo->read_filtering_ts =
client_provided_read_filtering_ts
? client_provided_read_filtering_ts
: rdb_get_min_binlog_ttl_read_filtering_ts();
ssinfo.read_filtering_ts = client_provided_read_filtering_ts
? client_provided_read_filtering_ts
: rdb_get_min_binlog_ttl_read_filtering_ts();

auto ret = std::make_shared<Rdb_explicit_snapshot>(*ssinfo, std::move(s));
auto ret = std::make_shared<Rdb_explicit_snapshot>(ssinfo, std::move(s));
if (!ret) {
return nullptr;
}
explicit_snapshots[ssinfo->snapshot_id] = ret;
explicit_snapshots[ssinfo.snapshot_id] = ret;
return ret;
}

static std::string dump_snapshots() {
[[nodiscard]] static std::string dump_snapshots() {
std::string str;
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
for (const auto &elem : explicit_snapshots) {
Expand All @@ -3236,8 +3235,8 @@ class Rdb_explicit_snapshot : public explicit_snapshot {
return str;
}

static std::shared_ptr<Rdb_explicit_snapshot> get(
const ulonglong snapshot_id) {
[[nodiscard]] static std::shared_ptr<Rdb_explicit_snapshot> get(
ulonglong snapshot_id) {
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
auto elem = explicit_snapshots.find(snapshot_id);
if (elem == explicit_snapshots.end()) {
Expand All @@ -3246,7 +3245,9 @@ class Rdb_explicit_snapshot : public explicit_snapshot {
return elem->second.lock();
}

rocksdb::ManagedSnapshot *get_snapshot() { return snapshot.get(); }
[[nodiscard]] rocksdb::ManagedSnapshot *get_snapshot() noexcept {
return snapshot.get();
}

Rdb_explicit_snapshot(snapshot_info_st ssinfo,
std::unique_ptr<rocksdb::ManagedSnapshot> &&snapshot)
Expand Down Expand Up @@ -3648,6 +3649,8 @@ class Rdb_transaction {

rocksdb::ReadOptions m_read_opts[2];

std::shared_ptr<Rdb_explicit_snapshot> m_explicit_snapshot;

// This should be used only when updating binlog information.
[[nodiscard]] virtual rocksdb::WriteBatchBase &get_write_batch() = 0;
virtual bool commit_no_binlog(
Expand Down Expand Up @@ -3695,7 +3698,6 @@ class Rdb_transaction {
my_off_t m_mysql_log_offset;
const char *m_mysql_max_gtid;
String m_detailed_error;
std::shared_ptr<Rdb_explicit_snapshot> m_explicit_snapshot;
bool should_refresh_iterator_after_first_write = false;

/*
Expand Down Expand Up @@ -4028,11 +4030,26 @@ class Rdb_transaction {
return m_read_opts[table_type].snapshot != nullptr;
}

void create_explicit_snapshot(snapshot_info_st *ss_info) {
void share_explicit_snapshot(
std::shared_ptr<Rdb_explicit_snapshot> snapshot) noexcept {
assert(m_explicit_snapshot == nullptr);
m_explicit_snapshot = std::move(snapshot);
}

void create_explicit_snapshot(snapshot_info_st &ss_info) {
assert(m_explicit_snapshot == nullptr);
m_explicit_snapshot = Rdb_explicit_snapshot::create(
m_thd, ss_info, rdb, m_read_opts[TABLE_TYPE::USER_TABLE].snapshot);
}

[[nodiscard]] bool has_explicit_snapshot() const noexcept {
return m_explicit_snapshot != nullptr;
}

[[nodiscard]] snapshot_info_st clone_explicit_snapshot_info() const noexcept {
return m_explicit_snapshot->ss_info;
}

private:
Rdb_bulk_load_context *m_bulk_load_ctx = nullptr;

Expand Down Expand Up @@ -4878,7 +4895,7 @@ class Rdb_transaction {
if (m_thd->binlog_ttl_read_filtering_ts) {
assert(thd_is_executing_binlog_events(m_thd));
m_binlog_ttl_read_filtering_ts = m_thd->binlog_ttl_read_filtering_ts;
} else if (m_explicit_snapshot) {
} else if (has_explicit_snapshot()) {
assert(m_explicit_snapshot->ss_info.read_filtering_ts);
set_ttl_read_filtering_ts(m_explicit_snapshot->ss_info.read_filtering_ts);
} else {
Expand Down Expand Up @@ -5206,12 +5223,12 @@ class Rdb_transaction_impl : public Rdb_transaction {
}

if (!has_snapshot(table_type)) {
const auto thd_ss = std::static_pointer_cast<Rdb_explicit_snapshot>(
auto thd_ss = std::static_pointer_cast<Rdb_explicit_snapshot>(
m_thd->get_explicit_snapshot());
if (thd_ss) {
m_explicit_snapshot = thd_ss;
share_explicit_snapshot(std::move(thd_ss));
}
if (m_explicit_snapshot) {
if (has_explicit_snapshot()) {
auto snapshot = m_explicit_snapshot->get_snapshot()->snapshot();
snapshot_created(snapshot);
} else if (is_tx_read_only()) {
Expand All @@ -5234,7 +5251,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
bool need_clear = m_is_delayed_snapshot;

if (has_snapshot(table_type)) {
if (m_explicit_snapshot) {
if (has_explicit_snapshot()) {
m_explicit_snapshot.reset();
need_clear = false;
} else if (is_tx_read_only()) {
Expand Down Expand Up @@ -7647,7 +7664,7 @@ static int rocksdb_explicit_snapshot(
mysql_bin_log_lock_commits(ss_info);
}
auto s =
Rdb_explicit_snapshot::create(thd, ss_info, rdb, rdb->GetSnapshot());
Rdb_explicit_snapshot::create(thd, *ss_info, rdb, rdb->GetSnapshot());
if (mysql_bin_log_is_open()) {
mysql_bin_log_unlock_commits(ss_info);
}
Expand Down Expand Up @@ -7792,7 +7809,7 @@ static int rocksdb_start_tx_with_shared_read_view(
Rdb_perf_context_guard guard(tx, thd);

if (explicit_snapshot) {
tx->m_explicit_snapshot = explicit_snapshot;
tx->share_explicit_snapshot(std::move(explicit_snapshot));
}

assert(!tx->has_snapshot(TABLE_TYPE::USER_TABLE));
Expand All @@ -7801,9 +7818,9 @@ static int rocksdb_start_tx_with_shared_read_view(
tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE);

// case: an explicit snapshot was not assigned to this transaction
if (!tx->m_explicit_snapshot) {
tx->create_explicit_snapshot(ss_info);
if (!tx->m_explicit_snapshot) {
if (!tx->has_explicit_snapshot()) {
tx->create_explicit_snapshot(*ss_info);
if (unlikely(!tx->has_explicit_snapshot())) {
my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0));
error = HA_EXIT_FAILURE;
}
Expand All @@ -7815,11 +7832,11 @@ static int rocksdb_start_tx_with_shared_read_view(
mysql_bin_log_unlock_commits(ss_info);
}

assert(error == HA_EXIT_FAILURE || tx->m_explicit_snapshot);
assert(error == HA_EXIT_FAILURE || tx->has_explicit_snapshot());

// copy over the snapshot details to pass to the upper layers
if (error == HA_EXIT_SUCCESS && tx && tx->m_explicit_snapshot) {
*ss_info = tx->m_explicit_snapshot->ss_info;
if (likely(error == HA_EXIT_SUCCESS)) {
*ss_info = tx->clone_explicit_snapshot_info();
ss_info->op = op;
}

Expand Down