diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 9c38ae9cebc8..67e7e200e551 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -3186,41 +3186,40 @@ static struct SYS_VAR *rocksdb_system_variables[] = { class Rdb_explicit_snapshot : public explicit_snapshot { public: - static std::shared_ptr create( - THD *thd, snapshot_info_st *ssinfo, rocksdb::DB *db, + [[nodiscard]] static std::shared_ptr create( + THD *thd, snapshot_info_st &ssinfo, rocksdb::DB *db, const rocksdb::Snapshot *snapshot) { - std::lock_guard lock(explicit_snapshot_mutex); auto s = std::unique_ptr( new rocksdb::ManagedSnapshot(db, snapshot)); if (!s) { return nullptr; } - ssinfo->snapshot_id = ++explicit_snapshot_counter; const uint64_t client_provided_read_filtering_ts = rdb_is_binlog_ttl_enabled() ? THDVAR(thd, consistent_snapshot_ttl_read_filtering_ts_nsec) / 1000000000UL : 0; + std::lock_guard lock(explicit_snapshot_mutex); + ssinfo.snapshot_id = ++explicit_snapshot_counter; // NOTE: We need to set read filtering ts to min of all active trxs and not // the current hlc because some active trxs can commit to binlog later with // a lower read filtering ts and that can cause failures when we try to use // the snapshot to perform dump & load and then replay binlogs because some // trxs in the binlog may access rows that have been expired according to // current hlc. - ssinfo->read_filtering_ts = - client_provided_read_filtering_ts - ? client_provided_read_filtering_ts - : rdb_get_min_binlog_ttl_read_filtering_ts(); + ssinfo.read_filtering_ts = client_provided_read_filtering_ts + ? client_provided_read_filtering_ts + : rdb_get_min_binlog_ttl_read_filtering_ts(); - auto ret = std::make_shared(*ssinfo, std::move(s)); + auto ret = std::make_shared(ssinfo, std::move(s)); if (!ret) { return nullptr; } - explicit_snapshots[ssinfo->snapshot_id] = ret; + explicit_snapshots[ssinfo.snapshot_id] = ret; return ret; } - static std::string dump_snapshots() { + [[nodiscard]] static std::string dump_snapshots() { std::string str; std::lock_guard lock(explicit_snapshot_mutex); for (const auto &elem : explicit_snapshots) { @@ -3236,8 +3235,8 @@ class Rdb_explicit_snapshot : public explicit_snapshot { return str; } - static std::shared_ptr get( - const ulonglong snapshot_id) { + [[nodiscard]] static std::shared_ptr get( + ulonglong snapshot_id) { std::lock_guard lock(explicit_snapshot_mutex); auto elem = explicit_snapshots.find(snapshot_id); if (elem == explicit_snapshots.end()) { @@ -3246,7 +3245,9 @@ class Rdb_explicit_snapshot : public explicit_snapshot { return elem->second.lock(); } - rocksdb::ManagedSnapshot *get_snapshot() { return snapshot.get(); } + [[nodiscard]] rocksdb::ManagedSnapshot *get_snapshot() noexcept { + return snapshot.get(); + } Rdb_explicit_snapshot(snapshot_info_st ssinfo, std::unique_ptr &&snapshot) @@ -3648,6 +3649,8 @@ class Rdb_transaction { rocksdb::ReadOptions m_read_opts[2]; + std::shared_ptr m_explicit_snapshot; + // This should be used only when updating binlog information. [[nodiscard]] virtual rocksdb::WriteBatchBase &get_write_batch() = 0; virtual bool commit_no_binlog( @@ -3695,7 +3698,6 @@ class Rdb_transaction { my_off_t m_mysql_log_offset; const char *m_mysql_max_gtid; String m_detailed_error; - std::shared_ptr m_explicit_snapshot; bool should_refresh_iterator_after_first_write = false; /* @@ -4028,11 +4030,26 @@ class Rdb_transaction { return m_read_opts[table_type].snapshot != nullptr; } - void create_explicit_snapshot(snapshot_info_st *ss_info) { + void share_explicit_snapshot( + std::shared_ptr snapshot) noexcept { + assert(m_explicit_snapshot == nullptr); + m_explicit_snapshot = std::move(snapshot); + } + + void create_explicit_snapshot(snapshot_info_st &ss_info) { + assert(m_explicit_snapshot == nullptr); m_explicit_snapshot = Rdb_explicit_snapshot::create( m_thd, ss_info, rdb, m_read_opts[TABLE_TYPE::USER_TABLE].snapshot); } + [[nodiscard]] bool has_explicit_snapshot() const noexcept { + return m_explicit_snapshot != nullptr; + } + + [[nodiscard]] snapshot_info_st clone_explicit_snapshot_info() const noexcept { + return m_explicit_snapshot->ss_info; + } + private: Rdb_bulk_load_context *m_bulk_load_ctx = nullptr; @@ -4878,7 +4895,7 @@ class Rdb_transaction { if (m_thd->binlog_ttl_read_filtering_ts) { assert(thd_is_executing_binlog_events(m_thd)); m_binlog_ttl_read_filtering_ts = m_thd->binlog_ttl_read_filtering_ts; - } else if (m_explicit_snapshot) { + } else if (has_explicit_snapshot()) { assert(m_explicit_snapshot->ss_info.read_filtering_ts); set_ttl_read_filtering_ts(m_explicit_snapshot->ss_info.read_filtering_ts); } else { @@ -5206,12 +5223,12 @@ class Rdb_transaction_impl : public Rdb_transaction { } if (!has_snapshot(table_type)) { - const auto thd_ss = std::static_pointer_cast( + auto thd_ss = std::static_pointer_cast( m_thd->get_explicit_snapshot()); if (thd_ss) { - m_explicit_snapshot = thd_ss; + share_explicit_snapshot(std::move(thd_ss)); } - if (m_explicit_snapshot) { + if (has_explicit_snapshot()) { auto snapshot = m_explicit_snapshot->get_snapshot()->snapshot(); snapshot_created(snapshot); } else if (is_tx_read_only()) { @@ -5234,7 +5251,7 @@ class Rdb_transaction_impl : public Rdb_transaction { bool need_clear = m_is_delayed_snapshot; if (has_snapshot(table_type)) { - if (m_explicit_snapshot) { + if (has_explicit_snapshot()) { m_explicit_snapshot.reset(); need_clear = false; } else if (is_tx_read_only()) { @@ -7647,7 +7664,7 @@ static int rocksdb_explicit_snapshot( mysql_bin_log_lock_commits(ss_info); } auto s = - Rdb_explicit_snapshot::create(thd, ss_info, rdb, rdb->GetSnapshot()); + Rdb_explicit_snapshot::create(thd, *ss_info, rdb, rdb->GetSnapshot()); if (mysql_bin_log_is_open()) { mysql_bin_log_unlock_commits(ss_info); } @@ -7792,7 +7809,7 @@ static int rocksdb_start_tx_with_shared_read_view( Rdb_perf_context_guard guard(tx, thd); if (explicit_snapshot) { - tx->m_explicit_snapshot = explicit_snapshot; + tx->share_explicit_snapshot(std::move(explicit_snapshot)); } assert(!tx->has_snapshot(TABLE_TYPE::USER_TABLE)); @@ -7801,9 +7818,9 @@ static int rocksdb_start_tx_with_shared_read_view( tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); // case: an explicit snapshot was not assigned to this transaction - if (!tx->m_explicit_snapshot) { - tx->create_explicit_snapshot(ss_info); - if (!tx->m_explicit_snapshot) { + if (!tx->has_explicit_snapshot()) { + tx->create_explicit_snapshot(*ss_info); + if (unlikely(!tx->has_explicit_snapshot())) { my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0)); error = HA_EXIT_FAILURE; } @@ -7815,11 +7832,11 @@ static int rocksdb_start_tx_with_shared_read_view( mysql_bin_log_unlock_commits(ss_info); } - assert(error == HA_EXIT_FAILURE || tx->m_explicit_snapshot); + assert(error == HA_EXIT_FAILURE || tx->has_explicit_snapshot()); // copy over the snapshot details to pass to the upper layers - if (error == HA_EXIT_SUCCESS && tx && tx->m_explicit_snapshot) { - *ss_info = tx->m_explicit_snapshot->ss_info; + if (likely(error == HA_EXIT_SUCCESS)) { + *ss_info = tx->clone_explicit_snapshot_info(); ss_info->op = op; }