diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 4a806df5..91788c9d 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -90,11 +90,6 @@ namespace db void debug_sync(const char*) override { } void debug_crash(const char*) override { } - void *get_binlog_cache() override - { - return (NULL); - } - int fragment_cache_remove_transaction( const wsrep::id&, wsrep::transaction_id) override diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 9520b0d5..321b722b 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -88,8 +88,7 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, } int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool) + const wsrep::ws_meta& ws_meta) { client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false); int ret(client_.client_state_.before_rollback()); @@ -100,6 +99,17 @@ int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, return ret; } +int db::high_priority_service::rollback_sr_on_disconnect() +{ + + auto ret = client_.client_state_.before_rollback(); + assert(ret == 0); + client_.se_trx_.rollback(); + ret = client_.client_state_.after_rollback(); + assert(ret == 0); + return ret; +} + void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err) { client_.client_state_.adopt_apply_error(err); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index be62e8e7..07660368 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -39,17 +39,17 @@ namespace db const wsrep::const_buffer&, wsrep::mutable_buffer&) override; int append_fragment_and_commit( + wsrep::high_priority_service&, const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&, - int, const wsrep::xid&) override { return 0; } int remove_fragments(const wsrep::ws_meta&) override { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&, - bool skip_rollback = false) override; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + int rollback_sr_on_disconnect() override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) override; int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&, diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index b4b60571..839253db 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -35,17 +35,12 @@ namespace db wsrep::transaction_id, int, const wsrep::const_buffer&, - int, - size_t, - const wsrep::xid&, - void *) override + const wsrep::xid&) override { throw wsrep::not_implemented_error(); } int update_fragment_meta(const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int remove_fragments() override { throw wsrep::not_implemented_error(); } - int set_fragments_from_table() override - { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index ca2ed287..f7ba1e13 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -218,12 +218,6 @@ namespace wsrep */ virtual void debug_crash(const char* crash_point) = 0; - /** - * Return the binlog cache for the currently executing - * transaction or a NULL pointer if no such cache exists. - */ - virtual void *get_binlog_cache() = 0; - /** * Remove the given transaction from the fragment cache. */ diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 832636fa..138bf5f0 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -943,15 +943,6 @@ namespace wsrep return 1; } - /** - * Return a reference to the transaction associated - * with the client state. - */ - wsrep::transaction& transaction() - { - return transaction_; - } - const wsrep::ws_meta& toi_meta() const { return toi_meta_; diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 3c741006..b5e21016 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -93,12 +93,18 @@ namespace wsrep * * Note that the call is not done from streaming transaction * context, but from applier context. + * + * @param sr_hps Object that is hosting the streaming transaction. + * @param ws_handle Write set handle corresponding to fragment. + * @param ws_meta Write set meta data corresponding to fragment. + * @param data Fragment data. + * @param xid XID corresponding to streaming transaction. */ virtual int append_fragment_and_commit( + high_priority_service& sr_hps, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - int sr_store, const wsrep::xid& xid) = 0; /** @@ -146,8 +152,16 @@ namespace wsrep * @return Zero in case of success, non-zero in case of failure */ virtual int rollback(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool skip_rollback = false) = 0; + const wsrep::ws_meta& ws_meta) = 0; + + /** + * Roll back a SR transaction when disconnecting from cluster. + * + * The implementation is supposed to roll back the transaction, but + * keep the fragments in fragment store intact to allow recovering + * the ongoing SR transactions on reconnect. + */ + virtual int rollback_sr_on_disconnect() = 0; /** * Apply a TOI operation. diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index 85d630fd..a92aa889 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -65,10 +65,7 @@ namespace wsrep wsrep::transaction_id client_id, int flags, const wsrep::const_buffer& data, - int sr_store, - size_t offset, - const wsrep::xid& xid, - void *binlog_cache) = 0; + const wsrep::xid& xid) = 0; /** * Update fragment meta data after certification process. */ @@ -80,13 +77,6 @@ namespace wsrep */ virtual int remove_fragments() = 0; - /** - * Update the list of fragments in the streaming context by - * adding all fragments in the streaming log table for the given - * transaction. - */ - virtual int set_fragments_from_table() = 0; - /** * Commit the transaction. */ diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 2cddb344..9b205c5b 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -47,7 +47,6 @@ namespace wsrep , fragment_size_() , unit_counter_() , log_position_() - , sr_store_(0) { } /** @@ -190,17 +189,6 @@ namespace wsrep unit_counter_ = 0; log_position_ = 0; } - - void set_sr_store(int store_type) - { - sr_store_ = store_type; - } - - int get_sr_store() const - { - return (sr_store_); - } - private: void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) @@ -216,7 +204,6 @@ namespace wsrep size_t fragment_size_; size_t unit_counter_; size_t log_position_; - int sr_store_; }; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index e5bd6976..a70f0e84 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -142,48 +142,6 @@ namespace wsrep void xa_detach(); int xa_replay(wsrep::unique_lock&); - int fragment_cache_remove_transaction( - const wsrep::id& server_id, wsrep::transaction_id transaction_id); - void *get_binlog_cache(); - /* state of Streaming Replication Speedup feature for the - transaction. This describes the relationship of this WSREP - transaction and the underlying InnoDB transaction. - */ - enum sr_state - { - /* this is not an SR Speedup transaction */ - sr_state_none, - /* this is an SR Speedup transaction, but SR XID is not set - for the underlying InnoDB transaction - */ - sr_state_require_xid, - /* this is an SR Speedup transaction, and SR XID is set - for the underlying InnoDB transaction - */ - sr_state_xid_set - }; - static const int n_sr_states = sr_state_xid_set + 1; - enum sr_state sr_state() const - { return sr_state_; } - void require_sr_xid() - { - if (sr_state_ == sr_state_none) { - sr_state_ = sr_state_require_xid; - } - } - void sr_xid_was_set() - { - sr_state_ = sr_state_xid_set; - } - bool sr_xid_is_required() - { - return sr_state_ == sr_state_require_xid; - } - bool sr_xid_is_set() - { - return sr_state_ == sr_state_xid_set; - } - bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } void pa_unsafe(bool pa_unsafe) { if (pa_unsafe) { @@ -323,7 +281,6 @@ namespace wsrep wsrep::mutable_buffer apply_error_buf_; wsrep::xid xid_; bool streaming_rollback_in_progress_; - enum sr_state sr_state_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/server_state.cpp b/src/server_state.cpp index 1f0419a7..24686f27 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -92,8 +92,6 @@ static int apply_fragment(wsrep::server_state& server_state, int ret(0); int apply_err; wsrep::mutable_buffer err; - int sr_store = streaming_applier->transaction().streaming_context(). - get_sr_store(); { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); @@ -134,7 +132,7 @@ static int apply_fragment(wsrep::server_state& server_state, high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); const wsrep::xid xid(streaming_applier->transaction().xid()); ret = high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data, sr_store, xid); + *streaming_applier, ws_handle, ws_meta, data, xid); high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); ret = ret || (high_priority_service.after_apply(), 0); } @@ -1555,10 +1553,7 @@ void wsrep::server_state::close_transactions_at_disconnect( { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); - int sr_store = streaming_applier->transaction().streaming_context(). - get_sr_store(); - streaming_applier->rollback( - wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0); + streaming_applier->rollback_sr_on_disconnect(); streaming_applier->after_apply(); } streaming_appliers_.erase(i++); diff --git a/src/transaction.cpp b/src/transaction.cpp index 22891d09..fc11e31c 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -109,7 +109,6 @@ wsrep::transaction::transaction( , apply_error_buf_() , xid_() , streaming_rollback_in_progress_(false) - , sr_state_(sr_state_none) { } @@ -127,7 +126,6 @@ int wsrep::transaction::start_transaction( server_id_ = client_state_.server_state().id(); id_ = id; state_ = s_executing; - sr_state_ = sr_state_none; state_hist_.clear(); ws_handle_ = wsrep::ws_handle(id); flags(wsrep::provider::flag::start_transaction); @@ -1282,26 +1280,6 @@ int wsrep::transaction::xa_replay_commit(wsrep::unique_lock& lock) return ret; } -int wsrep::transaction::fragment_cache_remove_transaction( - const wsrep::id& server_id, wsrep::transaction_id transaction_id) -{ - int rcode = client_service_.fragment_cache_remove_transaction( - server_id, transaction_id); - - return (rcode); -} - -void *wsrep::transaction::get_binlog_cache() -{ - void *cache = client_service_.get_binlog_cache(); - - assert(cache); - - return (cache); -} - - - //////////////////////////////////////////////////////////////////////////////// // Private // //////////////////////////////////////////////////////////////////////////////// @@ -1468,8 +1446,6 @@ int wsrep::transaction::certify_fragment( assert(streaming_context_.rolled_back() == false || state() == s_must_abort); - int sr_store = streaming_context_.get_sr_store(); - client_service_.wait_for_replayers(lock); if (abort_or_interrupt(lock)) { @@ -1571,24 +1547,11 @@ int wsrep::transaction::certify_fragment( error = wsrep::e_append_fragment_error; } - if (ret == 0 && - (storage_service.start_transaction(ws_handle_) || - (sr_store == 0 ? - storage_service.append_fragment( - server_id, - id(), - flags(), - wsrep::const_buffer(data.data(), data.size()), - 0, 0, xid(), nullptr) - : - storage_service.append_fragment( - server_id, - id(), - flags(), - wsrep::const_buffer(data.data(), data.size()), - streaming_context_.get_sr_store(), - log_position - data.size(), - xid(), get_binlog_cache())))) + if (ret == 0 + && (storage_service.start_transaction(ws_handle_) + || storage_service.append_fragment( + server_id, id(), flags(), + wsrep::const_buffer(data.data(), data.size()), xid()))) { ret = 1; error = wsrep::e_append_fragment_error; @@ -1719,9 +1682,6 @@ int wsrep::transaction::certify_fragment( flags(flags() & ~wsrep::provider::flag::start_transaction); flags(flags() & ~wsrep::provider::flag::pa_unsafe); } - if (sr_store != 0) { - require_sr_xid(); - } return ret; } @@ -2082,7 +2042,7 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) void wsrep::transaction::clear_fragments() { streaming_context_.cleanup(); - fragment_cache_remove_transaction(server_id_, id_); + client_service_.fragment_cache_remove_transaction(server_id_, id_); } void wsrep::transaction::cleanup() diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 1babfcd4..c85050c2 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -198,11 +198,6 @@ namespace wsrep // Not going to do this while unit testing } - void *get_binlog_cache() WSREP_OVERRIDE - { - return (NULL); - } - int fragment_cache_remove_transaction( const wsrep::id&, wsrep::transaction_id) WSREP_OVERRIDE diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index e0b340bc..32ae3679 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -106,14 +106,19 @@ int wsrep::mock_high_priority_service::commit( int wsrep::mock_high_priority_service::rollback( const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - bool) + const wsrep::ws_meta& ws_meta) { client_state_->prepare_for_ordering(ws_handle, ws_meta, false); return (client_state_->before_rollback() || client_state_->after_rollback()); } +int wsrep::mock_high_priority_service::rollback_sr_on_disconnect() +{ + return (client_state_->before_rollback() + || client_state_->after_rollback()); +} + int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index eba67143..592ffb98 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -57,18 +57,19 @@ namespace wsrep const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; int append_fragment_and_commit( + wsrep::high_priority_service&, const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&, - int, const wsrep::xid&) WSREP_OVERRIDE { return 0; } int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; - int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&, - bool skip_rollback = false) WSREP_OVERRIDE; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; + int rollback_sr_on_disconnect() WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index 112e75ed..b0275b97 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -40,16 +40,12 @@ class mock_server_state; wsrep::transaction_id, int, const wsrep::const_buffer&, - int, - size_t, - const wsrep::xid&, - void *) WSREP_OVERRIDE + const wsrep::xid&) WSREP_OVERRIDE { return 0; } int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int remove_fragments() WSREP_OVERRIDE { return 0; } - int set_fragments_from_table() WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;