From 3de594b662752a741391c8461c623baf14160172 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 21 Nov 2024 14:29:11 +0200 Subject: [PATCH 1/2] Add application defined sequential consistency for certification Client state methods before_prepare() and before_commit() accept a callback which is called by the provider after it can guarantee sequential consistency. This allows application threads which wish to maintain sequential consistency to enter before_prepare() and before_commit() calls concurrently without waiting the prior call to finish. --- include/wsrep/client_state.hpp | 42 ++++++++++++++++++++++++++++++++-- include/wsrep/provider.hpp | 33 +++++++++++++++++++++++--- include/wsrep/transaction.hpp | 8 ++++--- src/client_state.cpp | 8 +++---- src/transaction.cpp | 22 +++++++++--------- src/wsrep_provider_v26.cpp | 24 +++++++++++++++---- src/wsrep_provider_v26.hpp | 2 +- test/mock_provider.hpp | 3 ++- wsrep-API/v26 | 2 +- 9 files changed, 113 insertions(+), 31 deletions(-) diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index d8449d7a..b81ee9a2 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -413,11 +413,49 @@ namespace wsrep /** @name Commit ordering interface */ /** @{ */ - int before_prepare(); + + /** + * This method should be called before the transaction + * is prepared. This call certifies the transaction and + * assigns write set meta data. + * + * @param seq_cb Callback which is passed to underlying + * certify() call. See wsrep::provider::certify(). + * + * @return Zero on success, non-zero on failure. + */ + int before_prepare(const wsrep::provider::seq_cb_t* seq_cb); + + /** Same as before_prepare() above, but nullptr is passed + * to seq_cb. */ + int before_prepare() + { + return before_prepare(nullptr); + } int after_prepare(); - int before_commit(); + /** + * This method should be called before transaction is committed. + * This call makes the transaction to enter commit time + * critical section. The critical section is left by calling + * ordered_commit(). + * + * If before_prepare() is not called before this call, the + * before_prepare() is called internally. + * + * @param seq_cb Callback which is passed to underlying + * before_prepare() call. + * + * @return Zero on success, non-zero on failure. + */ + int before_commit(const wsrep::provider::seq_cb_t* seq_cb); + + /** Same as before_commit(), but nullptr is passed to seq_cb. */ + int before_commit() + { + return before_commit(nullptr); + } int ordered_commit(); diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 5e82ecd8..9b620795 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -332,10 +332,37 @@ namespace wsrep virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0; virtual enum status append_data( wsrep::ws_handle&, const wsrep::const_buffer&) = 0; + + /** + * Callback for application defined sequential consistency. + * The provider will call + * the callback once it can guarantee sequential consistency. */ + typedef struct seq_cb { + /** Opaque caller context */ + void *ctx; + /** Function to be called by the provider when sequential + * consistency is guaranteed. */ + void (*fn)(void *ctx); + } seq_cb_t; + + /** + * Certify the write set. + * + * @param client_id[in] Id of the client session. + * @param ws_handle[in,out] Write set handle associated to the current + * transaction. + * @param flags[in] Flags associated to the write set (see struct flag). + * @param ws_meta[out] Write set meta data associated to the + * replicated write set. + * @param seq_cb[in] Optional callback for application defined + * sequential consistency. + * + * @return Status code defined in struct status. + */ virtual enum status - certify(wsrep::client_id, wsrep::ws_handle&, - int, - wsrep::ws_meta&) = 0; + certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle, + int flags, wsrep::ws_meta& ws_meta, const seq_cb_t* seq_cb) + = 0; /** * BF abort a transaction inside provider. * diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 3328c093..3e4ce7e9 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -175,11 +175,12 @@ namespace wsrep int after_row(); - int before_prepare(wsrep::unique_lock&); + int before_prepare(wsrep::unique_lock&, + const wsrep::provider::seq_cb_t*); int after_prepare(wsrep::unique_lock&); - int before_commit(); + int before_commit(const wsrep::provider::seq_cb_t*); int ordered_commit(); @@ -248,7 +249,8 @@ namespace wsrep bool abort_or_interrupt(wsrep::unique_lock&); int streaming_step(wsrep::unique_lock&, bool force = false); int certify_fragment(wsrep::unique_lock&); - int certify_commit(wsrep::unique_lock&); + int certify_commit(wsrep::unique_lock&, + const wsrep::provider::seq_cb_t*); int append_sr_keys_for_commit(); int release_commit_order(wsrep::unique_lock&); void remove_fragments_in_storage_service_scope( diff --git a/src/client_state.cpp b/src/client_state.cpp index c6708f13..047e67ab 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -365,12 +365,12 @@ int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta) return transaction_.next_fragment(meta); } -int wsrep::client_state::before_prepare() +int wsrep::client_state::before_prepare(const wsrep::provider::seq_cb_t* seq_cb) { wsrep::unique_lock lock(mutex_); assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec); - return transaction_.before_prepare(lock); + return transaction_.before_prepare(lock, seq_cb); } int wsrep::client_state::after_prepare() @@ -381,11 +381,11 @@ int wsrep::client_state::after_prepare() return transaction_.after_prepare(lock); } -int wsrep::client_state::before_commit() +int wsrep::client_state::before_commit(const wsrep::provider::seq_cb_t* seq_cb) { assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); - return transaction_.before_commit(); + return transaction_.before_commit(seq_cb); } int wsrep::client_state::ordered_commit() diff --git a/src/transaction.cpp b/src/transaction.cpp index c5e5bf3b..3688ad71 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -273,8 +273,8 @@ int wsrep::transaction::after_row() return ret; } -int wsrep::transaction::before_prepare( - wsrep::unique_lock& lock) +int wsrep::transaction::before_prepare(wsrep::unique_lock& lock, + const wsrep::provider::seq_cb_t* seq_cb) { assert(lock.owns_lock()); int ret(0); @@ -349,7 +349,7 @@ int wsrep::transaction::before_prepare( } else { - ret = certify_commit(lock); + ret = certify_commit(lock, seq_cb); } assert((ret == 0 && state() == s_preparing) || @@ -445,7 +445,7 @@ int wsrep::transaction::after_prepare( return ret; } -int wsrep::transaction::before_commit() +int wsrep::transaction::before_commit(const wsrep::provider::seq_cb* seq_cb) { int ret(1); @@ -465,7 +465,7 @@ int wsrep::transaction::before_commit() case wsrep::client_state::m_local: if (state() == s_executing) { - ret = before_prepare(lock) || after_prepare(lock); + ret = before_prepare(lock, seq_cb) || after_prepare(lock); assert((ret == 0 && (state() == s_committing || state() == s_prepared)) || @@ -495,7 +495,7 @@ int wsrep::transaction::before_commit() if (ret == 0 && state() == s_prepared) { - ret = certify_commit(lock); + ret = certify_commit(lock, nullptr); assert((ret == 0 && state() == s_committing) || (state() == s_must_abort || state() == s_must_replay || @@ -543,7 +543,7 @@ int wsrep::transaction::before_commit() } else if (state() == s_executing || state() == s_replaying) { - ret = before_prepare(lock) || after_prepare(lock); + ret = before_prepare(lock, nullptr) || after_prepare(lock); } else { @@ -1195,7 +1195,7 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, provider().certify(client_state_.id(), ws_handle_, flags(), - meta)); + meta, nullptr)); int ret; if (cert_ret == wsrep::provider::success) @@ -1622,7 +1622,7 @@ int wsrep::transaction::certify_fragment( cert_ret = provider().certify(client_state_.id(), ws_handle_, flags(), - sr_ws_meta); + sr_ws_meta, nullptr); client_service_.debug_crash( "crash_replicate_fragment_after_certify"); @@ -1744,7 +1744,7 @@ int wsrep::transaction::certify_fragment( } int wsrep::transaction::certify_commit( - wsrep::unique_lock& lock) + wsrep::unique_lock& lock, const provider::seq_cb_t* seq_cb) { assert(lock.owns_lock()); assert(active()); @@ -1828,7 +1828,7 @@ int wsrep::transaction::certify_commit( cert_ret(provider().certify(client_state_.id(), ws_handle_, flags(), - ws_meta_)); + ws_meta_, seq_cb)); client_service_.debug_sync("wsrep_after_certification"); lock.lock(); diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 8fa9feec..80240fe5 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -674,6 +674,7 @@ namespace } wsrep_node_isolation_mode_set_fn_v1 node_isolation_mode_set; + wsrep_certify_fn_v1 certify_v1; } @@ -721,6 +722,9 @@ void wsrep::wsrep_provider_v26::init_services( node_isolation_mode_set = wsrep_impl::resolve_function( wsrep_->dlh, WSREP_NODE_ISOLATION_MODE_SET_V1); + + certify_v1 = wsrep_impl::resolve_function( + wsrep_->dlh, WSREP_CERTIFY_V1); } void wsrep::wsrep_provider_v26::deinit_services() @@ -922,14 +926,24 @@ enum wsrep::provider::status wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle, int flags, - wsrep::ws_meta& ws_meta) + wsrep::ws_meta& ws_meta, + const seq_cb_t* seq_cb) { mutable_ws_handle mwsh(ws_handle); mutable_ws_meta mmeta(ws_meta, flags); - return map_return_value( - wsrep_->certify(wsrep_, client_id.get(), mwsh.native(), - mmeta.native_flags(), - mmeta.native())); + if (seq_cb && certify_v1) + { + wsrep_seq_cb_t wseq_cb{seq_cb->ctx, seq_cb->fn}; + return map_return_value(certify_v1(wsrep_, client_id.get(), + mwsh.native(), mmeta.native_flags(), + mmeta.native(), &wseq_cb)); + } + else + { + return map_return_value( + wsrep_->certify(wsrep_, client_id.get(), mwsh.native(), + mmeta.native_flags(), mmeta.native())); + } } enum wsrep::provider::status diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 2e03c55e..0fc82594 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -59,7 +59,7 @@ namespace wsrep enum wsrep::provider::status certify(wsrep::client_id, wsrep::ws_handle&, int, - wsrep::ws_meta&) WSREP_OVERRIDE; + wsrep::ws_meta&, const seq_cb_t*) WSREP_OVERRIDE; enum wsrep::provider::status bf_abort(wsrep::seqno, wsrep::transaction_id, diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index b99ddebd..75233a7b 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -79,7 +79,8 @@ namespace wsrep certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle, int flags, - wsrep::ws_meta& ws_meta) + wsrep::ws_meta& ws_meta, + const seq_cb* /* Ignored in unit tests. */) WSREP_OVERRIDE { ws_handle = wsrep::ws_handle(ws_handle.transaction_id(), (void*)1); diff --git a/wsrep-API/v26 b/wsrep-API/v26 index 427c73c5..12c02f5f 160000 --- a/wsrep-API/v26 +++ b/wsrep-API/v26 @@ -1 +1 @@ -Subproject commit 427c73c5c8c443765ec16bfc70d94a65a7fca64c +Subproject commit 12c02f5fdafec70e55d52536b4068e7728675f01 From 7994288534369c64881b2be8dc0ccb78eac1f5b0 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 28 Nov 2024 12:33:21 +0200 Subject: [PATCH 2/2] Check local sequential consistency in dbsim - Release commit time critical section in callback - Check the consistency inside commit order critical section Other: Add 2pc switch to dbsim --- dbsim/db_client.cpp | 39 +++++++++++++++++--- dbsim/db_client.hpp | 1 - dbsim/db_high_priority_service.cpp | 14 +++++++- dbsim/db_high_priority_service.hpp | 1 + dbsim/db_params.cpp | 6 ++++ dbsim/db_params.hpp | 57 +++++++++++------------------- dbsim/db_server.cpp | 16 +++++++++ dbsim/db_server.hpp | 25 +++++++++++++ 8 files changed, 116 insertions(+), 43 deletions(-) diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index b5d56d37..97be6d8e 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -91,6 +91,15 @@ int db::client::client_command(F f) return err; } +static void release_commit_critical_section(void* ptr) +{ + auto* crit = static_cast(ptr); + if (crit->lock.owns_lock()) + { + crit->lock.unlock(); + } +} + void db::client::run_one_transaction() { if (params_.sync_wait) @@ -145,15 +154,35 @@ void db::client::run_one_transaction() err = err || client_command( [&]() { - // wsrep::log_debug() << "Commit"; + auto commit_crit = server_.get_commit_critical_section(); + if (not params_.check_sequential_consistency) { + commit_crit.lock.unlock(); + } + + client_state_.append_data({&commit_crit.commit_seqno, + sizeof(commit_crit.commit_seqno)}); + + wsrep::provider::seq_cb seq_cb { + &commit_crit, + release_commit_critical_section + }; + assert(err == 0); - if (do_2pc()) + if (params_.do_2pc) { - err = err || client_state_.before_prepare(); + err = err || client_state_.before_prepare(&seq_cb); err = err || client_state_.after_prepare(); } - err = err || client_state_.before_commit(); - if (err == 0) se_trx_.commit(transaction.ws_meta().gtid()); + err = err || client_state_.before_commit(&seq_cb); + if (err == 0) + { + se_trx_.commit(transaction.ws_meta().gtid()); + if (params_.check_sequential_consistency) + { + server_.check_sequential_consistency( + client_state_.id(), commit_crit.commit_seqno); + } + } err = err || client_state_.ordered_commit(); err = err || client_state_.after_commit(); if (err) diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index 5536a449..ca93a187 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -62,7 +62,6 @@ namespace db void start(); wsrep::client_state& client_state() { return client_state_; } wsrep::client_service& client_service() { return client_service_; } - bool do_2pc() const { return false; } private: friend class db::server_state; friend class db::client_service; diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 669fe502..f24c45e9 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -26,6 +26,7 @@ db::high_priority_service::high_priority_service( : wsrep::high_priority_service(server.server_state()) , server_(server) , client_(client) + , commit_seqno_() { } int db::high_priority_service::start_transaction( @@ -52,11 +53,14 @@ int db::high_priority_service::adopt_transaction(const wsrep::transaction&) int db::high_priority_service::apply_write_set( const wsrep::ws_meta&, - const wsrep::const_buffer&, + const wsrep::const_buffer& buf, wsrep::mutable_buffer&) { client_.se_trx_.start(&client_); client_.se_trx_.apply(client_.client_state().transaction()); + assert(buf.size() > sizeof(uint64_t)); + ::memcpy(&commit_seqno_, buf.data() + buf.size() - sizeof(uint64_t), + sizeof(uint64_t)); return 0; } @@ -82,6 +86,14 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); int ret(client_.client_state_.before_commit()); if (ret == 0) client_.se_trx_.commit(ws_meta.gtid()); + + /* Local client session replaying. */ + if (ws_meta.server_id() == server_.server_state().id() + && client_.params_.check_sequential_consistency) + { + server_.check_sequential_consistency(ws_meta.client_id(), + commit_seqno_); + } ret = ret || client_.client_state_.ordered_commit(); ret = ret || client_.client_state_.after_commit(); return ret; diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index d4a80f1b..f5309c09 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -69,6 +69,7 @@ namespace db high_priority_service& operator=(const high_priority_service&); db::server& server_; db::client& client_; + uint64_t commit_seqno_; }; class replayer_service : public db::high_priority_service diff --git a/dbsim/db_params.cpp b/dbsim/db_params.cpp index 40433f6c..b7a036bd 100644 --- a/dbsim/db_params.cpp +++ b/dbsim/db_params.cpp @@ -95,6 +95,12 @@ db::params db::parse_args(int argc, char** argv) "Configure TLS service stubs.\n0 default disabled\n1 enabled\n" "2 enabled with short read/write and renegotiation simulation\n" "3 enabled with error simulation.") + ("check-sequential-consistency", + po::value(¶ms.check_sequential_consistency), + "Check if the provider provides sequential consistency") + ("do-2pc", + po::value(¶ms.do_2pc), + "Run commits in 2pc") ; try { diff --git a/dbsim/db_params.hpp b/dbsim/db_params.hpp index e5df8062..6e7a4188 100644 --- a/dbsim/db_params.hpp +++ b/dbsim/db_params.hpp @@ -27,42 +27,27 @@ namespace db { struct params { - size_t n_servers; - size_t n_clients; - size_t n_transactions; - size_t n_rows; - size_t max_data_size; // Maximum size of write set data payload. - bool random_data_size; // If true, randomize data payload size. - size_t alg_freq; - bool sync_wait; - std::string topology; - std::string wsrep_provider; - std::string wsrep_provider_options; - std::string status_file; - int debug_log_level; - int fast_exit; - int thread_instrumentation; - bool cond_checks; - int tls_service; - params() - : n_servers(0) - , n_clients(0) - , n_transactions(0) - , n_rows(1000) - , max_data_size(8) - , random_data_size(false) - , alg_freq(0) - , sync_wait(false) - , topology() - , wsrep_provider() - , wsrep_provider_options() - , status_file("status.json") - , debug_log_level(0) - , fast_exit(0) - , thread_instrumentation() - , cond_checks() - , tls_service() - { } + size_t n_servers{0}; + size_t n_clients{0}; + size_t n_transactions{0}; + size_t n_rows{1000}; + size_t max_data_size{8}; // Maximum size of write set data payload. + bool random_data_size{false}; // If true, randomize data payload size. + /* Asymmetric lock granularity frequency. */ + size_t alg_freq{0}; + /* Whether to sync wait before start of transaction. */ + bool sync_wait{false}; + std::string topology{}; + std::string wsrep_provider{}; + std::string wsrep_provider_options{}; + std::string status_file{"status.json"}; + int debug_log_level{0}; + int fast_exit{0}; + int thread_instrumentation{0}; + bool cond_checks{false}; + int tls_service{0}; + bool check_sequential_consistency{false}; + bool do_2pc{false}; }; params parse_args(int argc, char** argv); diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index a54610d1..b9fd0ef7 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -68,6 +68,9 @@ db::server::server(simulator& simulator, , appliers_() , clients_() , client_threads_() + , commit_mutex_() + , next_commit_seqno_() + , committed_seqno_() { wsrep::log::logger_fn(logger_fn); } @@ -165,3 +168,16 @@ void db::server::log_state_change(enum wsrep::server_state::state from, wsrep::log_info() << "State changed " << from << " -> " << to; reporter_.report_state(to); } + +void db::server::check_sequential_consistency(wsrep::client_id client_id, + uint64_t commit_seqno) +{ + if (committed_seqno_ >= commit_seqno) + { + wsrep::log_error() << "Sequentiality violation for " << client_id + << " commit seqno " << commit_seqno << " previous " + << committed_seqno_; + ::abort(); + } + committed_seqno_ = commit_seqno; +} diff --git a/dbsim/db_server.hpp b/dbsim/db_server.hpp index 98b9a837..6aff7856 100644 --- a/dbsim/db_server.hpp +++ b/dbsim/db_server.hpp @@ -61,6 +61,27 @@ namespace db wsrep::high_priority_service* streaming_applier_service(); void log_state_change(enum wsrep::server_state::state, enum wsrep::server_state::state); + + /* Sequential consistency checks */ + struct commit_critical_section + { + wsrep::unique_lock lock; + uint64_t commit_seqno; + commit_critical_section(wsrep::default_mutex& mutex, + uint64_t& next_commit_seqno) + : lock{ mutex } + , commit_seqno{ ++next_commit_seqno } + { + } + commit_critical_section(commit_critical_section&&) = default; + }; + commit_critical_section get_commit_critical_section() { + return { commit_mutex_, next_commit_seqno_ }; + } + /* Check that commits remain sequential according commit_seqno. + * This method must be called inside commit order critical section. */ + void check_sequential_consistency(wsrep::client_id client_id, + uint64_t commit_seqno); private: void start_client(size_t id); @@ -76,6 +97,10 @@ namespace db std::vector appliers_; std::vector> clients_; std::vector client_threads_; + + wsrep::default_mutex commit_mutex_; + uint64_t next_commit_seqno_; + uint64_t committed_seqno_; }; }