diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index be6f9ad8..4a806df5 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -89,6 +89,19 @@ namespace db void debug_sync(const char*) override { } void debug_crash(const char*) override { } + + void *get_binlog_cache() override + { + return (NULL); + } + + int fragment_cache_remove_transaction( + const wsrep::id&, + wsrep::transaction_id) override + { + return (0); + } + private: db::client& client_; wsrep::client_state& client_state_; diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 669fe502..9520b0d5 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -88,7 +88,8 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, } int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) + const wsrep::ws_meta& ws_meta, + bool) { client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false); int ret(client_.client_state_.before_rollback()); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index d4a80f1b..be62e8e7 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -42,12 +42,14 @@ namespace db const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&, + int, const wsrep::xid&) override { return 0; } int remove_fragments(const wsrep::ws_meta&) override { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&, + bool skip_rollback = false) override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) override; int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&, diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index 839253db..b4b60571 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -35,12 +35,17 @@ namespace db wsrep::transaction_id, int, const wsrep::const_buffer&, - const wsrep::xid&) override + int, + size_t, + const wsrep::xid&, + void *) override { throw wsrep::not_implemented_error(); } int update_fragment_meta(const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int remove_fragments() override { throw wsrep::not_implemented_error(); } + int set_fragments_from_table() override + { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index d47396df..ca2ed287 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -217,6 +217,20 @@ namespace wsrep * been enabled. */ virtual void debug_crash(const char* crash_point) = 0; + + /** + * Return the binlog cache for the currently executing + * transaction or a NULL pointer if no such cache exists. + */ + virtual void *get_binlog_cache() = 0; + + /** + * Remove the given transaction from the fragment cache. + */ + virtual int fragment_cache_remove_transaction( + const wsrep::id& server_id, + wsrep::transaction_id transaction_id) = 0; + }; } diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 138bf5f0..832636fa 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -943,6 +943,15 @@ namespace wsrep return 1; } + /** + * Return a reference to the transaction associated + * with the client state. + */ + wsrep::transaction& transaction() + { + return transaction_; + } + const wsrep::ws_meta& toi_meta() const { return toi_meta_; diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index f1d011ec..3c741006 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -98,6 +98,7 @@ namespace wsrep const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, + int sr_store, const wsrep::xid& xid) = 0; /** @@ -145,7 +146,8 @@ namespace wsrep * @return Zero in case of success, non-zero in case of failure */ virtual int rollback(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) = 0; + const wsrep::ws_meta& ws_meta, + bool skip_rollback = false) = 0; /** * Apply a TOI operation. diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index e68548b7..85d630fd 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -65,8 +65,10 @@ namespace wsrep wsrep::transaction_id client_id, int flags, const wsrep::const_buffer& data, - const wsrep::xid& xid) = 0; - + int sr_store, + size_t offset, + const wsrep::xid& xid, + void *binlog_cache) = 0; /** * Update fragment meta data after certification process. */ @@ -78,6 +80,13 @@ namespace wsrep */ virtual int remove_fragments() = 0; + /** + * Update the list of fragments in the streaming context by + * adding all fragments in the streaming log table for the given + * transaction. + */ + virtual int set_fragments_from_table() = 0; + /** * Commit the transaction. */ diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 9b205c5b..2cddb344 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -47,6 +47,7 @@ namespace wsrep , fragment_size_() , unit_counter_() , log_position_() + , sr_store_(0) { } /** @@ -189,6 +190,17 @@ namespace wsrep unit_counter_ = 0; log_position_ = 0; } + + void set_sr_store(int store_type) + { + sr_store_ = store_type; + } + + int get_sr_store() const + { + return (sr_store_); + } + private: void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) @@ -204,6 +216,7 @@ namespace wsrep size_t fragment_size_; size_t unit_counter_; size_t log_position_; + int sr_store_; }; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 76835fd9..e5bd6976 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -142,6 +142,47 @@ namespace wsrep void xa_detach(); int xa_replay(wsrep::unique_lock&); + int fragment_cache_remove_transaction( + const wsrep::id& server_id, wsrep::transaction_id transaction_id); + void *get_binlog_cache(); + /* state of Streaming Replication Speedup feature for the + transaction. This describes the relationship of this WSREP + transaction and the underlying InnoDB transaction. + */ + enum sr_state + { + /* this is not an SR Speedup transaction */ + sr_state_none, + /* this is an SR Speedup transaction, but SR XID is not set + for the underlying InnoDB transaction + */ + sr_state_require_xid, + /* this is an SR Speedup transaction, and SR XID is set + for the underlying InnoDB transaction + */ + sr_state_xid_set + }; + static const int n_sr_states = sr_state_xid_set + 1; + enum sr_state sr_state() const + { return sr_state_; } + void require_sr_xid() + { + if (sr_state_ == sr_state_none) { + sr_state_ = sr_state_require_xid; + } + } + void sr_xid_was_set() + { + sr_state_ = sr_state_xid_set; + } + bool sr_xid_is_required() + { + return sr_state_ == sr_state_require_xid; + } + bool sr_xid_is_set() + { + return sr_state_ == sr_state_xid_set; + } bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } void pa_unsafe(bool pa_unsafe) { @@ -253,6 +294,7 @@ namespace wsrep int release_commit_order(wsrep::unique_lock&); void streaming_rollback(wsrep::unique_lock&); int replay(wsrep::unique_lock&); + void clear_fragments(); void xa_replay_common(wsrep::unique_lock&); int xa_replay_commit(wsrep::unique_lock&); void cleanup(); @@ -281,6 +323,7 @@ namespace wsrep wsrep::mutable_buffer apply_error_buf_; wsrep::xid xid_; bool streaming_rollback_in_progress_; + enum sr_state sr_state_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/server_state.cpp b/src/server_state.cpp index 88ec12b3..1f0419a7 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -92,6 +92,8 @@ static int apply_fragment(wsrep::server_state& server_state, int ret(0); int apply_err; wsrep::mutable_buffer err; + int sr_store = streaming_applier->transaction().streaming_context(). + get_sr_store(); { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); @@ -132,7 +134,7 @@ static int apply_fragment(wsrep::server_state& server_state, high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); const wsrep::xid xid(streaming_applier->transaction().xid()); ret = high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data, xid); + ws_handle, ws_meta, data, sr_store, xid); high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); ret = ret || (high_priority_service.after_apply(), 0); } @@ -1553,8 +1555,10 @@ void wsrep::server_state::close_transactions_at_disconnect( { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); + int sr_store = streaming_applier->transaction().streaming_context(). + get_sr_store(); streaming_applier->rollback( - wsrep::ws_handle(), wsrep::ws_meta()); + wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0); streaming_applier->after_apply(); } streaming_appliers_.erase(i++); diff --git a/src/transaction.cpp b/src/transaction.cpp index 764b1838..22891d09 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -109,6 +109,7 @@ wsrep::transaction::transaction( , apply_error_buf_() , xid_() , streaming_rollback_in_progress_(false) + , sr_state_(sr_state_none) { } @@ -126,6 +127,7 @@ int wsrep::transaction::start_transaction( server_id_ = client_state_.server_state().id(); id_ = id; state_ = s_executing; + sr_state_ = sr_state_none; state_hist_.clear(); ws_handle_ = wsrep::ws_handle(id); flags(wsrep::provider::flag::start_transaction); @@ -631,7 +633,7 @@ int wsrep::transaction::after_commit() client_state_.server_state_.stop_streaming_client(&client_state_); lock.lock(); } - streaming_context_.cleanup(); + clear_fragments(); } switch (client_state_.mode()) @@ -768,7 +770,7 @@ int wsrep::transaction::after_rollback() if (is_streaming() && state() != s_must_replay) { - streaming_context_.cleanup(); + clear_fragments(); } if (state() == s_aborting) @@ -1280,6 +1282,26 @@ int wsrep::transaction::xa_replay_commit(wsrep::unique_lock& lock) return ret; } +int wsrep::transaction::fragment_cache_remove_transaction( + const wsrep::id& server_id, wsrep::transaction_id transaction_id) +{ + int rcode = client_service_.fragment_cache_remove_transaction( + server_id, transaction_id); + + return (rcode); +} + +void *wsrep::transaction::get_binlog_cache() +{ + void *cache = client_service_.get_binlog_cache(); + + assert(cache); + + return (cache); +} + + + //////////////////////////////////////////////////////////////////////////////// // Private // //////////////////////////////////////////////////////////////////////////////// @@ -1446,6 +1468,8 @@ int wsrep::transaction::certify_fragment( assert(streaming_context_.rolled_back() == false || state() == s_must_abort); + int sr_store = streaming_context_.get_sr_store(); + client_service_.wait_for_replayers(lock); if (abort_or_interrupt(lock)) { @@ -1549,12 +1573,22 @@ int wsrep::transaction::certify_fragment( if (ret == 0 && (storage_service.start_transaction(ws_handle_) || - storage_service.append_fragment( - server_id, - id(), - flags(), - wsrep::const_buffer(data.data(), data.size()), - xid()))) + (sr_store == 0 ? + storage_service.append_fragment( + server_id, + id(), + flags(), + wsrep::const_buffer(data.data(), data.size()), + 0, 0, xid(), nullptr) + : + storage_service.append_fragment( + server_id, + id(), + flags(), + wsrep::const_buffer(data.data(), data.size()), + streaming_context_.get_sr_store(), + log_position - data.size(), + xid(), get_binlog_cache())))) { ret = 1; error = wsrep::e_append_fragment_error; @@ -1685,6 +1719,10 @@ int wsrep::transaction::certify_fragment( flags(flags() & ~wsrep::provider::flag::start_transaction); flags(flags() & ~wsrep::provider::flag::pa_unsafe); } + if (sr_store != 0) { + require_sr_xid(); + } + return ret; } @@ -2015,6 +2053,7 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) if (is_streaming()) { streaming_context_.cleanup(); + clear_fragments(); } provider().release(ws_handle_); break; @@ -2024,7 +2063,7 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) if (is_streaming()) { client_service_.remove_fragments(); - streaming_context_.cleanup(); + clear_fragments(); } state(lock, s_aborted); ret = 1; @@ -2040,6 +2079,12 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) return ret; } +void wsrep::transaction::clear_fragments() +{ + streaming_context_.cleanup(); + fragment_cache_remove_transaction(server_id_, id_); +} + void wsrep::transaction::cleanup() { debug_log_state("cleanup_enter"); diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 56434f73..1babfcd4 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -198,6 +198,17 @@ namespace wsrep // Not going to do this while unit testing } + void *get_binlog_cache() WSREP_OVERRIDE + { + return (NULL); + } + + int fragment_cache_remove_transaction( + const wsrep::id&, + wsrep::transaction_id) WSREP_OVERRIDE + { + return (0); + } // // Knobs to tune the behavior diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index bb67c9a9..e0b340bc 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -106,7 +106,8 @@ int wsrep::mock_high_priority_service::commit( int wsrep::mock_high_priority_service::rollback( const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) + const wsrep::ws_meta& ws_meta, + bool) { client_state_->prepare_for_ordering(ws_handle, ws_meta, false); return (client_state_->before_rollback() || diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index caf40b2c..eba67143 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -60,13 +60,15 @@ namespace wsrep const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&, + int, const wsrep::xid&) WSREP_OVERRIDE { return 0; } int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; - int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&, + bool skip_rollback = false) WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index b0275b97..112e75ed 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -40,12 +40,16 @@ class mock_server_state; wsrep::transaction_id, int, const wsrep::const_buffer&, - const wsrep::xid&) WSREP_OVERRIDE + int, + size_t, + const wsrep::xid&, + void *) WSREP_OVERRIDE { return 0; } int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } int remove_fragments() WSREP_OVERRIDE { return 0; } + int set_fragments_from_table() WSREP_OVERRIDE { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;