From f3c8392ea869dd9aa9f4a34ca8d95e21ddc3a10b Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Wed, 28 Sep 2022 17:30:39 +0300 Subject: [PATCH] Removed SR store implementation detail from wsrep-lib The following was moved to application side implementation: - Removed sr_store from streaming context. - Removed sr_state from transaction. - Removed get_binlog_cache() from client_service interface. Other: - Add SR applier reference to append_fragment_and_commit() to make it available for application. - Add separate interface call to rollback SR transactions on disconnect. Rolling back SR transactions due to rollback fragment and rolling back SR transactions due to disconnect have different behaviors. Have separate calls for these different cases for clarity. - Remove non-const transaction accessor, not needed anymore because SR state has been moved to application side. - Remove unneeded set_fragments_from_table(). --- dbsim/db_client_service.hpp | 5 --- dbsim/db_high_priority_service.cpp | 14 ++++++- dbsim/db_high_priority_service.hpp | 6 +-- dbsim/db_storage_service.hpp | 7 +--- include/wsrep/client_service.hpp | 6 --- include/wsrep/client_state.hpp | 9 ----- include/wsrep/high_priority_service.hpp | 20 ++++++++-- include/wsrep/storage_service.hpp | 12 +----- include/wsrep/streaming_context.hpp | 13 ------- include/wsrep/transaction.hpp | 43 -------------------- src/server_state.cpp | 9 +---- src/transaction.cpp | 52 +++---------------------- test/mock_client_state.hpp | 5 --- test/mock_high_priority_service.cpp | 9 ++++- test/mock_high_priority_service.hpp | 7 ++-- test/mock_storage_service.hpp | 6 +-- 16 files changed, 54 insertions(+), 169 deletions(-) diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 4a806df5..91788c9d 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -90,11 +90,6 @@ namespace db void debug_sync(const char*) override { } void debug_crash(const char*) override { } - void *get_binlog_cache() override - { - return (NULL); - } - int fragment_cache_remove_transaction( const wsrep::id&, wsrep::transaction_id) override diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 9520b0d5..321b722b 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -88,8 +88,7 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, } int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool) + const wsrep::ws_meta& ws_meta) { client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false); int ret(client_.client_state_.before_rollback()); @@ -100,6 +99,17 @@ int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, return ret; } +int db::high_priority_service::rollback_sr_on_disconnect() +{ + + auto ret = client_.client_state_.before_rollback(); + assert(ret == 0); + client_.se_trx_.rollback(); + ret = client_.client_state_.after_rollback(); + assert(ret == 0); + return ret; +} + void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err) { client_.client_state_.adopt_apply_error(err); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index be62e8e7..07660368 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -39,17 +39,17 @@ namespace db const wsrep::const_buffer&, wsrep::mutable_buffer&) override; int append_fragment_and_commit( + wsrep::high_priority_service&, const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&, - int, const wsrep::xid&) override { return 0; } int remove_fragments(const wsrep::ws_meta&) override { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&, - bool skip_rollback = false) override; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + int rollback_sr_on_disconnect() override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) override; int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&, diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index b4b60571..839253db 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -35,17 +35,12 @@ namespace db wsrep::transaction_id, int, const wsrep::const_buffer&, - int, - size_t, - const wsrep::xid&, - void *) override + const wsrep::xid&) override { throw wsrep::not_implemented_error(); } int update_fragment_meta(const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int remove_fragments() override { throw wsrep::not_implemented_error(); } - int set_fragments_from_table() override - { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index ca2ed287..f7ba1e13 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -218,12 +218,6 @@ namespace wsrep */ virtual void debug_crash(const char* crash_point) = 0; - /** - * Return the binlog cache for the currently executing - * transaction or a NULL pointer if no such cache exists. - */ - virtual void *get_binlog_cache() = 0; - /** * Remove the given transaction from the fragment cache. */ diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 832636fa..138bf5f0 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -943,15 +943,6 @@ namespace wsrep return 1; } - /** - * Return a reference to the transaction associated - * with the client state. - */ - wsrep::transaction& transaction() - { - return transaction_; - } - const wsrep::ws_meta& toi_meta() const { return toi_meta_; diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 3c741006..b5e21016 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -93,12 +93,18 @@ namespace wsrep * * Note that the call is not done from streaming transaction * context, but from applier context. + * + * @param sr_hps Object that is hosting the streaming transaction. + * @param ws_handle Write set handle corresponding to fragment. + * @param ws_meta Write set meta data corresponding to fragment. + * @param data Fragment data. + * @param xid XID corresponding to streaming transaction. */ virtual int append_fragment_and_commit( + high_priority_service& sr_hps, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - int sr_store, const wsrep::xid& xid) = 0; /** @@ -146,8 +152,16 @@ namespace wsrep * @return Zero in case of success, non-zero in case of failure */ virtual int rollback(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool skip_rollback = false) = 0; + const wsrep::ws_meta& ws_meta) = 0; + + /** + * Roll back a SR transaction when disconnecting from cluster. + * + * The implementation is supposed to roll back the transaction, but + * keep the fragments in fragment store intact to allow recovering + * the ongoing SR transactions on reconnect. + */ + virtual int rollback_sr_on_disconnect() = 0; /** * Apply a TOI operation. diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index 85d630fd..a92aa889 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -65,10 +65,7 @@ namespace wsrep wsrep::transaction_id client_id, int flags, const wsrep::const_buffer& data, - int sr_store, - size_t offset, - const wsrep::xid& xid, - void *binlog_cache) = 0; + const wsrep::xid& xid) = 0; /** * Update fragment meta data after certification process. */ @@ -80,13 +77,6 @@ namespace wsrep */ virtual int remove_fragments() = 0; - /** - * Update the list of fragments in the streaming context by - * adding all fragments in the streaming log table for the given - * transaction. - */ - virtual int set_fragments_from_table() = 0; - /** * Commit the transaction. */ diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 2cddb344..9b205c5b 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -47,7 +47,6 @@ namespace wsrep , fragment_size_() , unit_counter_() , log_position_() - , sr_store_(0) { } /** @@ -190,17 +189,6 @@ namespace wsrep unit_counter_ = 0; log_position_ = 0; } - - void set_sr_store(int store_type) - { - sr_store_ = store_type; - } - - int get_sr_store() const - { - return (sr_store_); - } - private: void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) @@ -216,7 +204,6 @@ namespace wsrep size_t fragment_size_; size_t unit_counter_; size_t log_position_; - int sr_store_; }; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index e5bd6976..a70f0e84 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -142,48 +142,6 @@ namespace wsrep void xa_detach(); int xa_replay(wsrep::unique_lock&); - int fragment_cache_remove_transaction( - const wsrep::id& server_id, wsrep::transaction_id transaction_id); - void *get_binlog_cache(); - /* state of Streaming Replication Speedup feature for the - transaction. This describes the relationship of this WSREP - transaction and the underlying InnoDB transaction. - */ - enum sr_state - { - /* this is not an SR Speedup transaction */ - sr_state_none, - /* this is an SR Speedup transaction, but SR XID is not set - for the underlying InnoDB transaction - */ - sr_state_require_xid, - /* this is an SR Speedup transaction, and SR XID is set - for the underlying InnoDB transaction - */ - sr_state_xid_set - }; - static const int n_sr_states = sr_state_xid_set + 1; - enum sr_state sr_state() const - { return sr_state_; } - void require_sr_xid() - { - if (sr_state_ == sr_state_none) { - sr_state_ = sr_state_require_xid; - } - } - void sr_xid_was_set() - { - sr_state_ = sr_state_xid_set; - } - bool sr_xid_is_required() - { - return sr_state_ == sr_state_require_xid; - } - bool sr_xid_is_set() - { - return sr_state_ == sr_state_xid_set; - } - bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } void pa_unsafe(bool pa_unsafe) { if (pa_unsafe) { @@ -323,7 +281,6 @@ namespace wsrep wsrep::mutable_buffer apply_error_buf_; wsrep::xid xid_; bool streaming_rollback_in_progress_; - enum sr_state sr_state_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/server_state.cpp b/src/server_state.cpp index 1f0419a7..24686f27 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -92,8 +92,6 @@ static int apply_fragment(wsrep::server_state& server_state, int ret(0); int apply_err; wsrep::mutable_buffer err; - int sr_store = streaming_applier->transaction().streaming_context(). - get_sr_store(); { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); @@ -134,7 +132,7 @@ static int apply_fragment(wsrep::server_state& server_state, high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); const wsrep::xid xid(streaming_applier->transaction().xid()); ret = high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data, sr_store, xid); + *streaming_applier, ws_handle, ws_meta, data, xid); high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); ret = ret || (high_priority_service.after_apply(), 0); } @@ -1555,10 +1553,7 @@ void wsrep::server_state::close_transactions_at_disconnect( { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); - int sr_store = streaming_applier->transaction().streaming_context(). - get_sr_store(); - streaming_applier->rollback( - wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0); + streaming_applier->rollback_sr_on_disconnect(); streaming_applier->after_apply(); } streaming_appliers_.erase(i++); diff --git a/src/transaction.cpp b/src/transaction.cpp index 22891d09..fc11e31c 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -109,7 +109,6 @@ wsrep::transaction::transaction( , apply_error_buf_() , xid_() , streaming_rollback_in_progress_(false) - , sr_state_(sr_state_none) { } @@ -127,7 +126,6 @@ int wsrep::transaction::start_transaction( server_id_ = client_state_.server_state().id(); id_ = id; state_ = s_executing; - sr_state_ = sr_state_none; state_hist_.clear(); ws_handle_ = wsrep::ws_handle(id); flags(wsrep::provider::flag::start_transaction); @@ -1282,26 +1280,6 @@ int wsrep::transaction::xa_replay_commit(wsrep::unique_lock& lock) return ret; } -int wsrep::transaction::fragment_cache_remove_transaction( - const wsrep::id& server_id, wsrep::transaction_id transaction_id) -{ - int rcode = client_service_.fragment_cache_remove_transaction( - server_id, transaction_id); - - return (rcode); -} - -void *wsrep::transaction::get_binlog_cache() -{ - void *cache = client_service_.get_binlog_cache(); - - assert(cache); - - return (cache); -} - - - //////////////////////////////////////////////////////////////////////////////// // Private // //////////////////////////////////////////////////////////////////////////////// @@ -1468,8 +1446,6 @@ int wsrep::transaction::certify_fragment( assert(streaming_context_.rolled_back() == false || state() == s_must_abort); - int sr_store = streaming_context_.get_sr_store(); - client_service_.wait_for_replayers(lock); if (abort_or_interrupt(lock)) { @@ -1571,24 +1547,11 @@ int wsrep::transaction::certify_fragment( error = wsrep::e_append_fragment_error; } - if (ret == 0 && - (storage_service.start_transaction(ws_handle_) || - (sr_store == 0 ? - storage_service.append_fragment( - server_id, - id(), - flags(), - wsrep::const_buffer(data.data(), data.size()), - 0, 0, xid(), nullptr) - : - storage_service.append_fragment( - server_id, - id(), - flags(), - wsrep::const_buffer(data.data(), data.size()), - streaming_context_.get_sr_store(), - log_position - data.size(), - xid(), get_binlog_cache())))) + if (ret == 0 + && (storage_service.start_transaction(ws_handle_) + || storage_service.append_fragment( + server_id, id(), flags(), + wsrep::const_buffer(data.data(), data.size()), xid()))) { ret = 1; error = wsrep::e_append_fragment_error; @@ -1719,9 +1682,6 @@ int wsrep::transaction::certify_fragment( flags(flags() & ~wsrep::provider::flag::start_transaction); flags(flags() & ~wsrep::provider::flag::pa_unsafe); } - if (sr_store != 0) { - require_sr_xid(); - } return ret; } @@ -2082,7 +2042,7 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) void wsrep::transaction::clear_fragments() { streaming_context_.cleanup(); - fragment_cache_remove_transaction(server_id_, id_); + client_service_.fragment_cache_remove_transaction(server_id_, id_); } void wsrep::transaction::cleanup() diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 1babfcd4..c85050c2 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -198,11 +198,6 @@ namespace wsrep // Not going to do this while unit testing } - void *get_binlog_cache() WSREP_OVERRIDE - { - return (NULL); - } - int fragment_cache_remove_transaction( const wsrep::id&, wsrep::transaction_id) WSREP_OVERRIDE diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index e0b340bc..32ae3679 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -106,14 +106,19 @@ int wsrep::mock_high_priority_service::commit( int wsrep::mock_high_priority_service::rollback( const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool) + const wsrep::ws_meta& ws_meta) { client_state_->prepare_for_ordering(ws_handle, ws_meta, false); return (client_state_->before_rollback() || client_state_->after_rollback()); } +int wsrep::mock_high_priority_service::rollback_sr_on_disconnect() +{ + return (client_state_->before_rollback() + || client_state_->after_rollback()); +} + int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index eba67143..592ffb98 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -57,18 +57,19 @@ namespace wsrep const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; int append_fragment_and_commit( + wsrep::high_priority_service&, const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&, - int, const wsrep::xid&) WSREP_OVERRIDE { return 0; } int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; - int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&, - bool skip_rollback = false) WSREP_OVERRIDE; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; + int rollback_sr_on_disconnect() WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index 112e75ed..b0275b97 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -40,16 +40,12 @@ class mock_server_state; wsrep::transaction_id, int, const wsrep::const_buffer&, - int, - size_t, - const wsrep::xid&, - void *) WSREP_OVERRIDE + const wsrep::xid&) WSREP_OVERRIDE { return 0; } int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int remove_fragments() WSREP_OVERRIDE { return 0; } - int set_fragments_from_table() WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;