diff --git a/.gitignore b/.gitignore index 1fa1627f..9ee027b1 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ wsrep-API/libwsrep_api_v26.a # Gcov generated files *.dgcov + +# Test logs +wsrep-lib_test.log diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index b81ee9a2..6f5a91d0 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -582,7 +582,8 @@ namespace wsrep * @param lock Lock to protect client state. * @param bf_seqno Seqno of the BF aborter. */ - int bf_abort(wsrep::unique_lock& lock, wsrep::seqno bf_seqno); + int bf_abort(wsrep::unique_lock& lock, + wsrep::seqno bf_seqno); /** * Wrapper to bf_abort() call, grabs lock internally. */ @@ -593,7 +594,8 @@ namespace wsrep * should be called by the TOI operation which needs to * BF abort a transaction. */ - int total_order_bf_abort(wsrep::unique_lock& lock, wsrep::seqno bf_seqno); + int total_order_bf_abort(wsrep::unique_lock& lock, + wsrep::seqno bf_seqno); /** * Wrapper to total_order_bf_abort(), grabs lock internally. diff --git a/include/wsrep/id.hpp b/include/wsrep/id.hpp index fc1e82b2..d1b175ae 100644 --- a/include/wsrep/id.hpp +++ b/include/wsrep/id.hpp @@ -93,6 +93,11 @@ namespace wsrep { return undefined_; } + + /** + * Return id in string representation. + */ + std::string to_string() const; private: static const wsrep::id undefined_; native_type data_; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 9b620795..bc11c81f 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -29,6 +29,7 @@ #include +#include #include #include #include @@ -47,7 +48,7 @@ namespace wsrep class tls_service; class allowlist_service; class event_service; - + class client_service; class stid { public: @@ -283,7 +284,6 @@ namespace wsrep static const int streaming = (1 << 15); static const int snapshot = (1 << 16); static const int nbo = (1 << 17); - /** decipher capability bitmask */ static std::string str(int); }; @@ -375,6 +375,7 @@ namespace wsrep */ virtual enum status bf_abort(wsrep::seqno bf_seqno, wsrep::transaction_id victim_trx, + wsrep::client_service& victim_ctx, wsrep::seqno& victim_seqno) = 0; virtual enum status rollback(wsrep::transaction_id) = 0; virtual enum status commit_order_enter(const wsrep::ws_handle&, @@ -407,6 +408,7 @@ namespace wsrep * Leave total order isolation critical section */ virtual enum status leave_toi(wsrep::client_id, + const wsrep::ws_meta& ws_meta, const wsrep::mutable_buffer& err) = 0; /** @@ -509,11 +511,12 @@ namespace wsrep * @param provider_options Initial options to provider * @param thread_service Optional thread service implementation. */ - static provider* make_provider(wsrep::server_state&, - const std::string& provider_spec, - const std::string& provider_options, - const wsrep::provider::services& services - = wsrep::provider::services()); + static std::unique_ptr make_provider( + wsrep::server_state&, + const std::string& provider_spec, + const std::string& provider_options, + const wsrep::provider::services& services + = wsrep::provider::services()); protected: wsrep::server_state& server_state_; diff --git a/include/wsrep/seqno.hpp b/include/wsrep/seqno.hpp index 9d8cedbc..2baef3d1 100644 --- a/include/wsrep/seqno.hpp +++ b/include/wsrep/seqno.hpp @@ -51,6 +51,11 @@ namespace wsrep return (seqno_ == -1); } + wsrep::seqno prev() const + { + return seqno{seqno_ - 1}; + } + bool operator<(seqno other) const { return (seqno_ < other.seqno_); diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index b280b55d..632d43bc 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -92,7 +92,9 @@ #include "compiler.hpp" #include "xid.hpp" +#include #include +#include #include #include #include @@ -188,8 +190,6 @@ namespace wsrep rm_sync }; - virtual ~server_state(); - wsrep::encryption_service* encryption_service() { return encryption_service_; } @@ -299,6 +299,17 @@ namespace wsrep const wsrep::provider::services& services = wsrep::provider::services()); + using provider_factory_func = + std::function; + + /** + * Set provider factory method. + * + * @param Factory method to create a provider. + */ + void set_provider_factory(const provider_factory_func&); + + /** Unload/unset provider. */ void unload_provider(); bool is_provider_loaded() const { return provider_ != 0; } @@ -310,12 +321,8 @@ namespace wsrep * * @throw wsrep::runtime_error if provider has not been loaded * - * @todo This should not be virtual. However, currently there - * is no mechanism for tests and integrations to provide - * their own provider implementations, so this is kept virtual - * for time being. */ - virtual wsrep::provider& provider() const + wsrep::provider& provider() const { if (provider_ == 0) { @@ -529,6 +536,19 @@ namespace wsrep return init_initialized_; } + /** Recover streaming appliers if not already recoverd yet. + * + * This method recovers streaming appliers from streaming log. + * It must be called before starting to apply events after + * connecting to the cluster. + * + * @param lock Lock object holding server_state mutex. + * @param service Either client or high priority service. + */ + template + void recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock& lock, C& service); + /** * This method will be called by the provider when * a remote write set is being applied. It is the responsibility @@ -618,6 +638,7 @@ namespace wsrep , streaming_appliers_() , streaming_appliers_recovered_() , provider_() + , provider_factory_(wsrep::provider::make_provider) , name_(name) , id_(wsrep::id::undefined()) , incoming_address_(incoming_address) @@ -645,11 +666,7 @@ namespace wsrep // Interrupt all threads which are waiting for state void interrupt_state_waiters(wsrep::unique_lock&); - // Recover streaming appliers if not already recoverd - template - void recover_streaming_appliers_if_not_recovered( - wsrep::unique_lock&, C&); - + private: // Close SR transcations whose origin is outside of current // cluster view. void close_orphaned_sr_transactions( @@ -702,7 +719,8 @@ namespace wsrep wsrep::high_priority_service*> streaming_appliers_map; streaming_appliers_map streaming_appliers_; bool streaming_appliers_recovered_; - wsrep::provider* provider_; + std::unique_ptr provider_; + provider_factory_func provider_factory_; std::string name_; wsrep::id id_; std::string incoming_address_; diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index e68548b7..3c689b67 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -92,6 +92,17 @@ namespace wsrep virtual void store_globals() = 0; virtual void reset_globals() = 0; + + /** + * Return true if the implementation requires storing + * and restoring global state. Return true by default + * since this is the original behavior. Stateless + * implementations may override. + */ + virtual bool requires_globals() const { + return true; + } + }; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 3e4ce7e9..201c3b5d 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -200,9 +200,11 @@ namespace wsrep void after_applying(); bool bf_abort(wsrep::unique_lock& lock, - wsrep::seqno bf_seqno); + wsrep::seqno bf_seqno, + wsrep::client_service&); bool total_order_bf_abort(wsrep::unique_lock&, - wsrep::seqno bf_seqno); + wsrep::seqno bf_seqno, + wsrep::client_service&); void clone_for_replay(const wsrep::transaction& other); diff --git a/include/wsrep/view.hpp b/include/wsrep/view.hpp index d17c27f1..5bf2b968 100644 --- a/include/wsrep/view.hpp +++ b/include/wsrep/view.hpp @@ -117,9 +117,9 @@ namespace wsrep /** * Return true if the view is final */ - bool final() const + bool is_final() const { - return (members_.empty() && own_index_ == -1); + return (status_ != status::primary && members_.empty() && own_index_ == -1); } /** diff --git a/src/client_state.cpp b/src/client_state.cpp index 047e67ab..90dbb23c 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -511,7 +511,7 @@ int wsrep::client_state::bf_abort(wsrep::unique_lock& lock, { assert(lock.owns_lock()); assert(mode_ == m_local || transaction_.is_streaming()); - auto ret = transaction_.bf_abort(lock, bf_seqno); + auto ret = transaction_.bf_abort(lock, bf_seqno, client_service_); assert(lock.owns_lock()); return ret; } @@ -527,7 +527,7 @@ int wsrep::client_state::total_order_bf_abort( { assert(lock.owns_lock()); assert(mode_ == m_local || transaction_.is_streaming()); - auto ret = transaction_.total_order_bf_abort(lock, bf_seqno); + auto ret = transaction_.total_order_bf_abort(lock, bf_seqno, client_service_); assert(lock.owns_lock()); return ret; } @@ -585,7 +585,7 @@ wsrep::client_state::poll_enter_toi( // Successfully entered TOI, but the provider reported failure. // This may happen for example if certification fails. // Leave TOI before proceeding. - if (provider().leave_toi(id_, wsrep::mutable_buffer())) + if (provider().leave_toi(id_, poll_meta, wsrep::mutable_buffer())) { wsrep::log_warning() << "Failed to leave TOI after failure in " @@ -689,10 +689,12 @@ int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err) { debug_log_state("leave_toi_local: enter"); assert(toi_mode_ == m_local); - leave_toi_common(); + auto ret = (provider().leave_toi(id_, toi_meta_, err) == provider::success ? 0 : 1); + leave_toi_common(); debug_log_state("leave_toi_local: leave"); - return (provider().leave_toi(id_, err) == provider::success ? 0 : 1); + + return ret; } void wsrep::client_state::leave_toi_mode() @@ -809,7 +811,7 @@ int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err) assert(mode_ == m_nbo); assert(in_toi()); - enum wsrep::provider::status status(provider().leave_toi(id_, err)); + enum wsrep::provider::status status(provider().leave_toi(id_, toi_meta_, err)); wsrep::unique_lock lock(mutex_); int ret; switch (status) @@ -910,7 +912,7 @@ int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err) assert(toi_mode_ == m_local); assert(in_toi()); enum wsrep::provider::status status( - provider().leave_toi(id_, err)); + provider().leave_toi(id_, toi_meta_, err)); wsrep::unique_lock lock(mutex_); int ret; switch (status) diff --git a/src/config_service_v1.cpp b/src/config_service_v1.cpp index ace61427..5e5c9f03 100644 --- a/src/config_service_v1.cpp +++ b/src/config_service_v1.cpp @@ -151,6 +151,11 @@ int wsrep::config_service_v1_fetch(wsrep::provider& provider, wsrep::provider_options* options) { struct wsrep_st* wsrep = (struct wsrep_st*)provider.native(); + if (wsrep == nullptr) + { + // Not a provider which was loaded via wsrep-API + return 0; + } if (config_service_v1_probe(wsrep->dlh)) { wsrep::log_warning() << "Provider does not support config service v1"; diff --git a/src/id.cpp b/src/id.cpp index 2da188fc..2999dc33 100644 --- a/src/id.cpp +++ b/src/id.cpp @@ -50,13 +50,23 @@ wsrep::id::id(const std::string& str) } } +std::string wsrep::id::to_string() const +{ + std::ostringstream os; + os << *this; + return os.str(); +} + std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id) { const char* ptr(static_cast(id.data())); size_t size(id.size()); - if (static_cast(std::count_if(ptr, ptr + size, ::isalnum)) == size) + if (static_cast( + std::count_if(ptr, ptr + size, + [](char c) { return (::isalnum(c) || c == '\0'); })) + == size) { - return (os << std::string(ptr, size)); + return (os << std::string(ptr, ::strnlen(ptr, size))); } else { diff --git a/src/provider.cpp b/src/provider.cpp index 9e99f7fd..ce9f5da2 100644 --- a/src/provider.cpp +++ b/src/provider.cpp @@ -26,7 +26,7 @@ #include #include -wsrep::provider* wsrep::provider::make_provider( +std::unique_ptr wsrep::provider::make_provider( wsrep::server_state& server_state, const std::string& provider_spec, const std::string& provider_options, @@ -34,8 +34,8 @@ wsrep::provider* wsrep::provider::make_provider( { try { - return new wsrep::wsrep_provider_v26( - server_state, provider_options, provider_spec, services); + return std::unique_ptr(new wsrep::wsrep_provider_v26( + server_state, provider_options, provider_spec, services)); } catch (const wsrep::runtime_error& e) { @@ -120,7 +120,6 @@ std::string wsrep::provider::capability::str(int caps) WSREP_PRINT_CAPABILITY(streaming, "STREAMING"); WSREP_PRINT_CAPABILITY(snapshot, "READ_VIEW"); WSREP_PRINT_CAPABILITY(nbo, "NBO"); - #undef WSREP_PRINT_CAPABILITY if (caps) diff --git a/src/server_state.cpp b/src/server_state.cpp index 2fc9b199..270c45e7 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -307,7 +307,8 @@ static int apply_write_set(wsrep::server_state& server_state, wsrep::log::debug_level_server_state, "Could not find applier context for " << ws_meta.server_id() - << ": " << ws_meta.transaction_id()); + << ": " << ws_meta.transaction_id() + << ", " << ws_meta.seqno()); ret = high_priority_service.log_dummy_write_set( ws_handle, ws_meta, no_error); } @@ -379,7 +380,8 @@ static int apply_write_set(wsrep::server_state& server_state, // it may be an indication of a bug too. wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); + << ": " << ws_meta.transaction_id() + << ", " << ws_meta.seqno(); wsrep::mutable_buffer no_error; ret = high_priority_service.log_dummy_write_set( ws_handle, ws_meta, no_error); @@ -420,7 +422,8 @@ static int apply_write_set(wsrep::server_state& server_state, wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); + << ": " << ws_meta.transaction_id() + << ", " << ws_meta.seqno(); wsrep::mutable_buffer no_error; ret = high_priority_service.log_dummy_write_set( ws_handle, ws_meta, no_error); @@ -501,18 +504,21 @@ int wsrep::server_state::load_provider( { wsrep::log_info() << "Loading provider " << provider_spec << " initial position: " << initial_position_; - - provider_ = wsrep::provider::make_provider(*this, - provider_spec, - provider_options, - services); + provider_ + = provider_factory_(*this, provider_spec, provider_options, services); return (provider_ ? 0 : 1); } +void wsrep::server_state::set_provider_factory( + const provider_factory_func& provider_factory) +{ + assert(provider_factory); + provider_factory_ = provider_factory; +} + void wsrep::server_state::unload_provider() { - delete provider_; - provider_ = 0; + provider_.reset(); } int wsrep::server_state::connect(const std::string& cluster_name, @@ -545,11 +551,6 @@ int wsrep::server_state::disconnect() return provider().disconnect(); } -wsrep::server_state::~server_state() -{ - delete provider_; -} - std::vector wsrep::server_state::status() const { @@ -914,7 +915,7 @@ void wsrep::server_state::on_primary_view( wsrep::high_priority_service* high_priority_service) { wsrep::unique_lock lock(mutex_); - assert(view.final() == false); + assert(view.is_final() == false); // // Reached primary from connected state. This may mean the following // @@ -995,7 +996,7 @@ void wsrep::server_state::on_non_primary_view( { wsrep::unique_lock lock(mutex_); wsrep::log_info() << "Non-primary view"; - if (view.final()) + if (view.is_final()) { go_final(lock, view, high_priority_service); } @@ -1010,7 +1011,7 @@ void wsrep::server_state::go_final(wsrep::unique_lock& lock, wsrep::high_priority_service* hps) { (void)view; // avoid compiler warning "unused parameter 'view'" - assert(view.final()); + assert(view.is_final()); assert(hps); if (hps) { @@ -1064,6 +1065,8 @@ void wsrep::server_state::on_sync() { switch (state_) { + case s_disconnecting: + break; case s_synced: break; case s_connected: // Seed node path: provider becomes @@ -1090,7 +1093,7 @@ void wsrep::server_state::on_sync() // Calls to on_sync() in synced state are possible if // server desyncs itself from the group. Provider does not // inform about this through callbacks. - if (state_ != s_synced) + if (state_ != s_synced && state_ != s_disconnecting) { state(lock, s_synced); } @@ -1395,8 +1398,9 @@ void wsrep::server_state::wait_until_state( // or disconnected and the state has been changed to disconnecting, // this usually means that some error was encountered if (state != s_disconnecting && state != s_disconnected - && state_ == s_disconnecting) + && (state_ == s_disconnecting || state_ == s_disconnected)) { + --state_waiters_[state]; throw wsrep::runtime_error("State wait was interrupted"); } } @@ -1484,6 +1488,7 @@ void wsrep::server_state::close_orphaned_sr_transactions( { wsrep::client_id client_id(i->first); wsrep::transaction_id transaction_id(i->second->transaction().id()); + auto& client_state = *i->second; // It is safe to unlock the server state temporarily here. // The processing happens inside view handler which is // protected by the provider commit ordering critical @@ -1494,7 +1499,7 @@ void wsrep::server_state::close_orphaned_sr_transactions( // remains unlocked, so it should not be accessed after // the bf abort call. lock.unlock(); - i->second->total_order_bf_abort(current_view_.view_seqno()); + client_state.total_order_bf_abort(current_view_.view_seqno()); lock.lock(); streaming_clients_map::const_iterator found_i; while ((found_i = streaming_clients_.find(client_id)) != diff --git a/src/transaction.cpp b/src/transaction.cpp index 3688ad71..c2a8a524 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -63,8 +63,10 @@ namespace { throw wsrep::runtime_error("Null client_state provided"); } - client_service_.reset_globals(); - storage_service_->store_globals(); + if (storage_service_->requires_globals()) { + client_service_.reset_globals(); + storage_service_->store_globals(); + } } wsrep::storage_service& storage_service() @@ -74,8 +76,11 @@ namespace ~scoped_storage_service() { + bool restore_globals = storage_service_->requires_globals(); deleter_(storage_service_); - client_service_.store_globals(); + if (restore_globals) { + client_service_.store_globals(); + } } private: scoped_storage_service(const scoped_storage_service&); @@ -990,7 +995,8 @@ void wsrep::transaction::after_applying() bool wsrep::transaction::bf_abort( wsrep::unique_lock& lock, - wsrep::seqno bf_seqno) + wsrep::seqno bf_seqno, + wsrep::client_service& victim_ctx) { bool ret(false); const enum wsrep::transaction::state state_at_enter(state()); @@ -1021,7 +1027,7 @@ bool wsrep::transaction::bf_abort( wsrep::seqno victim_seqno; enum wsrep::provider::status status(client_state_.provider().bf_abort( - bf_seqno, id_, victim_seqno)); + bf_seqno, id_, victim_ctx, victim_seqno)); switch (status) { case wsrep::provider::success: @@ -1108,14 +1114,15 @@ bool wsrep::transaction::bf_abort( bool wsrep::transaction::total_order_bf_abort( wsrep::unique_lock& lock WSREP_UNUSED, - wsrep::seqno bf_seqno) + wsrep::seqno bf_seqno, + wsrep::client_service& victim_ctx) { /* We must set this flag before entering bf_abort() in order * to streaming_rollback() work correctly. The flag will be * unset if BF abort was not allowed. Note that we rely in * bf_abort() not to release lock if the BF abort is not allowed. */ bf_aborted_in_total_order_ = true; - bool ret(bf_abort(lock, bf_seqno)); + bool ret(bf_abort(lock, bf_seqno, victim_ctx)); if (not ret) { bf_aborted_in_total_order_ = false; diff --git a/src/view.cpp b/src/view.cpp index d5bff099..dc37f6fa 100644 --- a/src/view.cpp +++ b/src/view.cpp @@ -57,7 +57,7 @@ void wsrep::view::print(std::ostream& os) const << " status: " << to_c_string(status()) << "\n" << " protocol_version: " << protocol_version() << "\n" << " capabilities: " << provider::capability::str(capabilities())<<"\n" - << " final: " << (final() ? "yes" : "no") << "\n" + << " final: " << (is_final() ? "yes" : "no") << "\n" << " own_index: " << own_index() << "\n" << " members(" << members().size() << "):\n"; diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 80240fe5..c6f7e4f0 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -950,6 +950,7 @@ enum wsrep::provider::status wsrep::wsrep_provider_v26::bf_abort( wsrep::seqno bf_seqno, wsrep::transaction_id victim_id, + wsrep::client_service& /* Ignored here */, wsrep::seqno& victim_seqno) { wsrep_seqno_t wsrep_victim_seqno; @@ -1047,6 +1048,7 @@ wsrep::wsrep_provider_v26::enter_toi( enum wsrep::provider::status wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id, + const wsrep::ws_meta&, const wsrep::mutable_buffer& err) { const wsrep_buf_t err_buf = { err.data(), err.size() }; diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 0fc82594..53edc208 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -63,6 +63,7 @@ namespace wsrep enum wsrep::provider::status bf_abort(wsrep::seqno, wsrep::transaction_id, + wsrep::client_service&, wsrep::seqno&) WSREP_OVERRIDE; enum wsrep::provider::status rollback(const wsrep::transaction_id) WSREP_OVERRIDE; @@ -83,6 +84,7 @@ namespace wsrep int) WSREP_OVERRIDE; enum wsrep::provider::status leave_toi(wsrep::client_id, + const wsrep::ws_meta& ws_meta, const wsrep::mutable_buffer&) WSREP_OVERRIDE; std::pair diff --git a/test/id_test.cpp b/test/id_test.cpp index 5a87ba16..63dea58e 100644 --- a/test/id_test.cpp +++ b/test/id_test.cpp @@ -37,6 +37,15 @@ BOOST_AUTO_TEST_CASE(id_test_uuid) } BOOST_AUTO_TEST_CASE(id_test_string) +{ + std::string id_str("node1"); + wsrep::id id(id_str); + std::ostringstream os; + os << id; + BOOST_REQUIRE(id_str == os.str()); +} + +BOOST_AUTO_TEST_CASE(id_test_string_max) { std::string id_str("1234567890123456"); wsrep::id id(id_str); diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 75233a7b..7c8e8968 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -117,9 +117,8 @@ namespace wsrep { ++group_seqno_; wsrep::gtid gtid(group_id_, wsrep::seqno(group_seqno_)); - ws_meta = wsrep::ws_meta(gtid, stid, - wsrep::seqno(group_seqno_ - 1), - flags); + ws_meta = wsrep::ws_meta( + gtid, stid, wsrep::seqno(group_seqno_ - 1), flags); return wsrep::provider::success; } else @@ -135,9 +134,8 @@ namespace wsrep { ++group_seqno_; wsrep::gtid gtid(group_id_, wsrep::seqno(group_seqno_)); - ws_meta = wsrep::ws_meta(gtid, stid, - wsrep::seqno(group_seqno_ - 1), - flags); + ws_meta = wsrep::ws_meta( + gtid, stid, wsrep::seqno(group_seqno_ - 1), flags); ret = wsrep::provider::error_bf_abort; } bf_abort_map_.erase(it); @@ -215,8 +213,8 @@ namespace wsrep wsrep::gtid(group_id_, wsrep::seqno(group_seqno_)), wsrep::stid(server_id_, tc.id(), cc.id()), wsrep::seqno(group_seqno_ - 1), - wsrep::provider::flag::start_transaction | - wsrep::provider::flag::commit); + wsrep::provider::flag::start_transaction + | wsrep::provider::flag::commit); } else { @@ -245,12 +243,10 @@ namespace wsrep { ++group_seqno_; wsrep::gtid gtid(group_id_, wsrep::seqno(group_seqno_)); - wsrep::stid stid(server_id_, - wsrep::transaction_id::undefined(), + wsrep::stid stid(server_id_, wsrep::transaction_id::undefined(), client_id); toi_meta = wsrep::ws_meta(gtid, stid, - wsrep::seqno(group_seqno_ - 1), - flags); + wsrep::seqno(group_seqno_ - 1), flags); ++toi_write_sets_; if (flags & wsrep::provider::flag::start_transaction) ++toi_start_transaction_; @@ -260,6 +256,7 @@ namespace wsrep } enum wsrep::provider::status leave_toi(wsrep::client_id, + const wsrep::ws_meta&, const wsrep::mutable_buffer&) WSREP_OVERRIDE { return wsrep::provider::success; } @@ -315,6 +312,7 @@ namespace wsrep enum wsrep::provider::status bf_abort(wsrep::seqno bf_seqno, wsrep::transaction_id trx_id, + wsrep::client_service&, wsrep::seqno& victim_seqno) WSREP_OVERRIDE { diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 093a620a..a2473d8e 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -258,12 +258,28 @@ namespace wsrep rollback_mode) , mutex_() , cond_() - , provider_(*this) - { } - - wsrep::mock_provider& provider() const WSREP_OVERRIDE - { return provider_; } + , provider_() + { + set_provider_factory([&](wsrep::server_state&, + const std::string&, + const std::string&, + const wsrep::provider::services&) + { + // The provider object is destroyed upon server state + // destruction, so using a raw pointer is safe. + provider_ = new wsrep::mock_provider(*this); + return std::unique_ptr(provider_); + }); + + const int ret WSREP_UNUSED = load_provider("mock", ""); + assert(ret == 0); + assert(provider_ != nullptr); + } + wsrep::mock_provider& provider() const + { + return *provider_; + } // mock connected state for tests without overriding the connect() // method. int mock_connect(const std::string& own_id, @@ -308,7 +324,7 @@ namespace wsrep private: wsrep::default_mutex mutex_; wsrep::default_condition_variable cond_; - mutable wsrep::mock_provider provider_; + wsrep::mock_provider* provider_; }; } diff --git a/test/test_utils.cpp b/test/test_utils.cpp index 02e88cf0..43c4ea23 100644 --- a/test/test_utils.cpp +++ b/test/test_utils.cpp @@ -41,11 +41,12 @@ void wsrep_test::bf_abort_in_total_order(wsrep::client_state& cc) } // BF abort method to abort transactions via provider void wsrep_test::bf_abort_provider(wsrep::mock_server_state& sc, - const wsrep::transaction& tc, + const wsrep::client_state& victim_cs, wsrep::seqno bf_seqno) { wsrep::seqno victim_seqno; - sc.provider().bf_abort(bf_seqno, tc.id(), victim_seqno); + sc.provider().bf_abort(bf_seqno, victim_cs.transaction().id(), victim_cs.client_service(), + victim_seqno); (void)victim_seqno; } @@ -63,13 +64,10 @@ void wsrep_test::terminate_streaming_applier( mc.before_command(); wsrep::mock_high_priority_service hps(sc, &mc, false); wsrep::ws_handle ws_handle(transaction_id, (void*)(1)); - wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), - wsrep::seqno(100)), - wsrep::stid(server_id, - transaction_id, - wsrep::client_id(1)), - wsrep::seqno(0), - wsrep::provider::flag::rollback); + wsrep::ws_meta ws_meta( + wsrep::gtid(wsrep::id("cluster1"), wsrep::seqno(100)), + wsrep::stid(server_id, transaction_id, wsrep::client_id(1)), + wsrep::seqno(0), wsrep::provider::flag::rollback); wsrep::const_buffer data(0, 0); sc.on_apply(hps, ws_handle, ws_meta, data); } diff --git a/test/test_utils.hpp b/test/test_utils.hpp index 7e4c896d..e62a5d67 100644 --- a/test/test_utils.hpp +++ b/test/test_utils.hpp @@ -41,7 +41,7 @@ namespace wsrep_test // BF abort method to abort transactions via provider void bf_abort_provider(wsrep::mock_server_state& sc, - const wsrep::transaction& tc, + const wsrep::client_state& victim_cs, wsrep::seqno bf_seqno); // BF abort in total order diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index cb56aea4..22b5ab2e 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -188,7 +188,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); - wsrep_test::bf_abort_provider(sc, tc, wsrep::seqno::undefined()); + wsrep_test::bf_abort_provider(sc, cc, wsrep::seqno::undefined()); // Run before commit BOOST_REQUIRE(cc.before_commit()); @@ -454,7 +454,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); - wsrep_test::bf_abort_provider(sc, tc, wsrep::seqno(1)); + wsrep_test::bf_abort_provider(sc, cc, wsrep::seqno(1)); // Run before commit BOOST_REQUIRE(cc.before_commit()); diff --git a/test/transaction_test_2pc.cpp b/test/transaction_test_2pc.cpp index 8f95828d..f3948362 100644 --- a/test/transaction_test_2pc.cpp +++ b/test/transaction_test_2pc.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018-2019 Codership Oy + * Copyright (C) 2018-2021 Codership Oy * * This file is part of wsrep-lib. * @@ -22,8 +22,7 @@ // // Test a succesful 2PC transaction lifecycle // -BOOST_FIXTURE_TEST_CASE(transaction_2pc, - replicating_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc, replicating_client_fixture_2pc) { cc.start_transaction(wsrep::transaction_id(1)); BOOST_REQUIRE(tc.active()); @@ -52,9 +51,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_2pc, // // Test a 2PC transaction which gets BF aborted before before_prepare // -BOOST_FIXTURE_TEST_CASE( - transaction_2pc_bf_before_before_prepare, - replicating_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc_bf_before_before_prepare, + replicating_client_fixture_2pc) { cc.start_transaction(wsrep::transaction_id(1)); BOOST_REQUIRE(tc.active()); @@ -69,7 +67,7 @@ BOOST_FIXTURE_TEST_CASE( BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborting); BOOST_REQUIRE(cc.after_rollback() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); - BOOST_REQUIRE(cc.after_statement() ); + BOOST_REQUIRE(cc.after_statement()); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); @@ -79,9 +77,8 @@ BOOST_FIXTURE_TEST_CASE( // // Test a 2PC transaction which gets BF aborted before before_prepare // -BOOST_FIXTURE_TEST_CASE( - transaction_2pc_bf_before_after_prepare, - replicating_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc_bf_before_after_prepare, + replicating_client_fixture_2pc) { cc.start_transaction(wsrep::transaction_id(1)); BOOST_REQUIRE(tc.active()); @@ -110,9 +107,8 @@ BOOST_FIXTURE_TEST_CASE( // Test a 2PC transaction which gets BF aborted after_prepare() and // the rollback takes place before entering before_commit(). // -BOOST_FIXTURE_TEST_CASE( - transaction_2pc_bf_after_after_prepare, - replicating_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc_bf_after_after_prepare, + replicating_client_fixture_2pc) { cc.start_transaction(wsrep::transaction_id(1)); BOOST_REQUIRE(tc.active()); @@ -139,9 +135,8 @@ BOOST_FIXTURE_TEST_CASE( // Test a 2PC transaction which gets BF aborted between after_prepare() // and before_commit() // -BOOST_FIXTURE_TEST_CASE( - transaction_2pc_bf_before_before_commit, - replicating_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc_bf_before_before_commit, + replicating_client_fixture_2pc) { cc.start_transaction(wsrep::transaction_id(1)); BOOST_REQUIRE(tc.active()); @@ -168,14 +163,12 @@ BOOST_FIXTURE_TEST_CASE( BOOST_REQUIRE(cc.current_error() == wsrep::e_success); } - // // Test a 2PC transaction which gets BF aborted when trying to grab // commit order. // -BOOST_FIXTURE_TEST_CASE( - transaction_2pc_bf_during_commit_order_enter, - replicating_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc_bf_during_commit_order_enter, + replicating_client_fixture_2pc) { cc.start_transaction(wsrep::transaction_id(1)); BOOST_REQUIRE(tc.active()); @@ -183,7 +176,8 @@ BOOST_FIXTURE_TEST_CASE( BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0); - sc.provider().commit_order_enter_result_ = wsrep::provider::error_bf_abort; + sc.provider().commit_order_enter_result_ + = wsrep::provider::error_bf_abort; BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_replay); BOOST_REQUIRE(cc.will_replay_called() == true); @@ -205,7 +199,6 @@ BOOST_FIXTURE_TEST_CASE( // STREAMING REPLICATION // /////////////////////////////////////////////////////////////////////////////// - BOOST_FIXTURE_TEST_CASE(transaction_streaming_2pc_commit, streaming_client_fixture_row) { @@ -251,8 +244,9 @@ BOOST_FIXTURE_TEST_CASE(transaction_streaming_2pc_commit_two_statements, // internally. This will cause the transaction to leave before_prepare() // in aborted state. // -BOOST_FIXTURE_TEST_CASE(transaction_streaming_2pc_bf_abort_during_fragment_removal, - streaming_client_fixture_row) +BOOST_FIXTURE_TEST_CASE( + transaction_streaming_2pc_bf_abort_during_fragment_removal, + streaming_client_fixture_row) { BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.after_row() == 0); @@ -270,8 +264,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_streaming_2pc_bf_abort_during_fragment_remov // APPLYING // /////////////////////////////////////////////////////////////////////////////// -BOOST_FIXTURE_TEST_CASE(transaction_2pc_applying, - applying_client_fixture_2pc) +BOOST_FIXTURE_TEST_CASE(transaction_2pc_applying, applying_client_fixture_2pc) { BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); diff --git a/test/transaction_test_xa.cpp b/test/transaction_test_xa.cpp index d9fbad27..0e6caa15 100644 --- a/test/transaction_test_xa.cpp +++ b/test/transaction_test_xa.cpp @@ -1,11 +1,28 @@ +/* + * Copyright (C) 2019-2025 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ #include "client_state_fixture.hpp" #include // // Test a successful XA transaction lifecycle // -BOOST_FIXTURE_TEST_CASE(transaction_xa, - replicating_client_fixture_sync_rm) +BOOST_FIXTURE_TEST_CASE(transaction_xa, replicating_client_fixture_sync_rm) { wsrep::xid xid(1, 9, 0, "test xid"); @@ -47,7 +64,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, BOOST_REQUIRE(cc.current_error() == wsrep::e_success); } - // // Test detaching of XA transactions // @@ -119,7 +135,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_detach_rollback_by_xid, server_service.release_high_priority_service(hps); } - // // Test XA replay // @@ -214,8 +229,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_replay_after_command_after_result, // // Test a successful XA transaction lifecycle (applying side) // -BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, - applying_client_fixture) +BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, applying_client_fixture) { wsrep::xid xid(1, 9, 0, "test xid"); @@ -249,8 +263,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, // // Test a successful XA transaction lifecycle // -BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, - streaming_client_fixture_byte) +BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, streaming_client_fixture_byte) { wsrep::xid xid(1, 9, 0, "test xid");