Skip to content

This is the first version of the SR speedup feature for MariaDB 10.7. #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions dbsim/db_client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ namespace db

void debug_sync(const char*) override { }
void debug_crash(const char*) override { }

void *get_binlog_cache() override
{
return (NULL);
}

int fragment_cache_remove_transaction(
const wsrep::id&,
wsrep::transaction_id) override
{
return (0);
}

private:
db::client& client_;
wsrep::client_state& client_state_;
Expand Down
3 changes: 2 additions & 1 deletion dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
}

int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
const wsrep::ws_meta& ws_meta,
bool)
{
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
int ret(client_.client_state_.before_rollback());
Expand Down
4 changes: 3 additions & 1 deletion dbsim/db_high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ namespace db
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&,
int,
const wsrep::xid&) override
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&,
bool skip_rollback = false) override;
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) override;
int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&,
Expand Down
7 changes: 6 additions & 1 deletion dbsim/db_storage_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ namespace db
wsrep::transaction_id,
int,
const wsrep::const_buffer&,
const wsrep::xid&) override
int,
size_t,
const wsrep::xid&,
void *) override
{ throw wsrep::not_implemented_error(); }
int update_fragment_meta(const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int remove_fragments() override
{ throw wsrep::not_implemented_error(); }
int set_fragments_from_table() override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)
Expand Down
14 changes: 14 additions & 0 deletions include/wsrep/client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,20 @@ namespace wsrep
* been enabled.
*/
virtual void debug_crash(const char* crash_point) = 0;

/**
* Return the binlog cache for the currently executing
* transaction or a NULL pointer if no such cache exists.
*/
virtual void *get_binlog_cache() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method exists only to pass the void pointer as argument for one of the service interface calls. This could be handled on MariaDB side, by storing the binlog cache pointer or client session THD pointer when constructing Wsrep_storage_service object.


/**
* Remove the given transaction from the fragment cache.
*/
virtual int fragment_cache_remove_transaction(
const wsrep::id& server_id,
wsrep::transaction_id transaction_id) = 0;

};
}

Expand Down
9 changes: 9 additions & 0 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,15 @@ namespace wsrep
return 1;
}

/**
* Return a reference to the transaction associated
* with the client state.
*/
wsrep::transaction& transaction()
{
return transaction_;
}

const wsrep::ws_meta& toi_meta() const
{
return toi_meta_;
Expand Down
4 changes: 3 additions & 1 deletion include/wsrep/high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ namespace wsrep
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
int sr_store,
const wsrep::xid& xid) = 0;

/**
Expand Down Expand Up @@ -145,7 +146,8 @@ namespace wsrep
* @return Zero in case of success, non-zero in case of failure
*/
virtual int rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta) = 0;
const wsrep::ws_meta& ws_meta,
bool skip_rollback = false) = 0;

/**
* Apply a TOI operation.
Expand Down
13 changes: 11 additions & 2 deletions include/wsrep/storage_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ namespace wsrep
wsrep::transaction_id client_id,
int flags,
const wsrep::const_buffer& data,
const wsrep::xid& xid) = 0;

int sr_store,
size_t offset,
const wsrep::xid& xid,
void *binlog_cache) = 0;
/**
* Update fragment meta data after certification process.
*/
Expand All @@ -78,6 +80,13 @@ namespace wsrep
*/
virtual int remove_fragments() = 0;

/**
* Update the list of fragments in the streaming context by
* adding all fragments in the streaming log table for the given
* transaction.
*/
virtual int set_fragments_from_table() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is never called from wsrep-lib side, so it does not have to be in the service interface.

Copy link
Author

@plampio plampio Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, set_fragments_from_table() method is not unused and hence not needed in wsrep-lib. I will remove it, as you suggested.


/**
* Commit the transaction.
*/
Expand Down
13 changes: 13 additions & 0 deletions include/wsrep/streaming_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace wsrep
, fragment_size_()
, unit_counter_()
, log_position_()
, sr_store_(0)
{ }

/**
Expand Down Expand Up @@ -189,6 +190,17 @@ namespace wsrep
unit_counter_ = 0;
log_position_ = 0;
}

void set_sr_store(int store_type)
{
sr_store_ = store_type;
}

int get_sr_store() const
{
return (sr_store_);
}

private:

void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
Expand All @@ -204,6 +216,7 @@ namespace wsrep
size_t fragment_size_;
size_t unit_counter_;
size_t log_position_;
int sr_store_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable could perhaps be moved into storage_service implementation on MariaDB side.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can not be moved to Wsrep_storage_service on the MariaDB side because the lifetime of a Wsrep_storage_service object is too short: the sr_store value should have the same lifetime as the transaction, but on the master an Wsrep_storage_service object exists only for the duration of the append_fragment() call that adds a fragment to the fragment table. In addition to this, on the slave side the Wsrep_storage_service objects are not used at all for SR transactions.

But maybe there is some other class on the MariaDB side where sr_store could be moved.

};
}

Expand Down
43 changes: 43 additions & 0 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,47 @@ namespace wsrep
void xa_detach();

int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
int fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id);
void *get_binlog_cache();
/* state of Streaming Replication Speedup feature for the
transaction. This describes the relationship of this WSREP
transaction and the underlying InnoDB transaction.
*/
enum sr_state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SR state handling could perhaps be moved to MariaDB side.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have the same question as above with sr_store. We need to find a suitable place on the MariaDB side with a lifetime that is the same as the lifetime of the transaction.

{
/* this is not an SR Speedup transaction */
sr_state_none,
/* this is an SR Speedup transaction, but SR XID is not set
for the underlying InnoDB transaction
*/
sr_state_require_xid,
/* this is an SR Speedup transaction, and SR XID is set
for the underlying InnoDB transaction
*/
sr_state_xid_set
};
static const int n_sr_states = sr_state_xid_set + 1;
enum sr_state sr_state() const
{ return sr_state_; }
void require_sr_xid()
{
if (sr_state_ == sr_state_none) {
sr_state_ = sr_state_require_xid;
}
}
void sr_xid_was_set()
{
sr_state_ = sr_state_xid_set;
}
bool sr_xid_is_required()
{
return sr_state_ == sr_state_require_xid;
}
bool sr_xid_is_set()
{
return sr_state_ == sr_state_xid_set;
}

bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); }
void pa_unsafe(bool pa_unsafe) {
Expand Down Expand Up @@ -253,6 +294,7 @@ namespace wsrep
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&);
int replay(wsrep::unique_lock<wsrep::mutex>&);
void clear_fragments();
void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&);
int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&);
void cleanup();
Expand Down Expand Up @@ -281,6 +323,7 @@ namespace wsrep
wsrep::mutable_buffer apply_error_buf_;
wsrep::xid xid_;
bool streaming_rollback_in_progress_;
enum sr_state sr_state_;
};

static inline const char* to_c_string(enum wsrep::transaction::state state)
Expand Down
8 changes: 6 additions & 2 deletions src/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ static int apply_fragment(wsrep::server_state& server_state,
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
Expand Down Expand Up @@ -132,7 +134,7 @@ static int apply_fragment(wsrep::server_state& server_state,
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
const wsrep::xid xid(streaming_applier->transaction().xid());
ret = high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data, xid);
ws_handle, ws_meta, data, sr_store, xid);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}
Expand Down Expand Up @@ -1553,8 +1555,10 @@ void wsrep::server_state::close_transactions_at_disconnect(
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
streaming_applier->rollback(
wsrep::ws_handle(), wsrep::ws_meta());
wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0);
streaming_applier->after_apply();
}
streaming_appliers_.erase(i++);
Expand Down
Loading