Skip to content

Removed SR store implementation detail from wsrep-lib #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 10.7-sr-speedup
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions dbsim/db_client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ namespace db
void debug_sync(const char*) override { }
void debug_crash(const char*) override { }

void *get_binlog_cache() override
{
return (NULL);
}

int fragment_cache_remove_transaction(
const wsrep::id&,
wsrep::transaction_id) override
Expand Down
14 changes: 12 additions & 2 deletions dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
}

int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool)
const wsrep::ws_meta& ws_meta)
{
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
int ret(client_.client_state_.before_rollback());
Expand All @@ -100,6 +99,17 @@ int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
return ret;
}

int db::high_priority_service::rollback_sr_on_disconnect()
{

auto ret = client_.client_state_.before_rollback();
assert(ret == 0);
client_.se_trx_.rollback();
ret = client_.client_state_.after_rollback();
assert(ret == 0);
return ret;
}

void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
{
client_.client_state_.adopt_apply_error(err);
Expand Down
6 changes: 3 additions & 3 deletions dbsim/db_high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ namespace db
const wsrep::const_buffer&,
wsrep::mutable_buffer&) override;
int append_fragment_and_commit(
wsrep::high_priority_service&,
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&,
int,
const wsrep::xid&) override
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&,
bool skip_rollback = false) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback_sr_on_disconnect() override;
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) override;
int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&,
Expand Down
7 changes: 1 addition & 6 deletions dbsim/db_storage_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,12 @@ namespace db
wsrep::transaction_id,
int,
const wsrep::const_buffer&,
int,
size_t,
const wsrep::xid&,
void *) override
const wsrep::xid&) override
{ throw wsrep::not_implemented_error(); }
int update_fragment_meta(const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int remove_fragments() override
{ throw wsrep::not_implemented_error(); }
int set_fragments_from_table() override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)
Expand Down
6 changes: 0 additions & 6 deletions include/wsrep/client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,6 @@ namespace wsrep
*/
virtual void debug_crash(const char* crash_point) = 0;

/**
* Return the binlog cache for the currently executing
* transaction or a NULL pointer if no such cache exists.
*/
virtual void *get_binlog_cache() = 0;

/**
* Remove the given transaction from the fragment cache.
*/
Expand Down
9 changes: 0 additions & 9 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,15 +943,6 @@ namespace wsrep
return 1;
}

/**
* Return a reference to the transaction associated
* with the client state.
*/
wsrep::transaction& transaction()
{
return transaction_;
}

const wsrep::ws_meta& toi_meta() const
{
return toi_meta_;
Expand Down
20 changes: 17 additions & 3 deletions include/wsrep/high_priority_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,18 @@ namespace wsrep
*
* Note that the call is not done from streaming transaction
* context, but from applier context.
*
* @param sr_hps Object that is hosting the streaming transaction.
* @param ws_handle Write set handle corresponding to fragment.
* @param ws_meta Write set meta data corresponding to fragment.
* @param data Fragment data.
* @param xid XID corresponding to streaming transaction.
*/
virtual int append_fragment_and_commit(
high_priority_service& sr_hps,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
int sr_store,
const wsrep::xid& xid) = 0;

/**
Expand Down Expand Up @@ -146,8 +152,16 @@ namespace wsrep
* @return Zero in case of success, non-zero in case of failure
*/
virtual int rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool skip_rollback = false) = 0;
const wsrep::ws_meta& ws_meta) = 0;

/**
* Roll back a SR transaction when disconnecting from cluster.
*
* The implementation is supposed to roll back the transaction, but
* keep the fragments in fragment store intact to allow recovering
* the ongoing SR transactions on reconnect.
*/
virtual int rollback_sr_on_disconnect() = 0;

/**
* Apply a TOI operation.
Expand Down
12 changes: 1 addition & 11 deletions include/wsrep/storage_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ namespace wsrep
wsrep::transaction_id client_id,
int flags,
const wsrep::const_buffer& data,
int sr_store,
size_t offset,
const wsrep::xid& xid,
void *binlog_cache) = 0;
const wsrep::xid& xid) = 0;
/**
* Update fragment meta data after certification process.
*/
Expand All @@ -80,13 +77,6 @@ namespace wsrep
*/
virtual int remove_fragments() = 0;

/**
* Update the list of fragments in the streaming context by
* adding all fragments in the streaming log table for the given
* transaction.
*/
virtual int set_fragments_from_table() = 0;

/**
* Commit the transaction.
*/
Expand Down
13 changes: 0 additions & 13 deletions include/wsrep/streaming_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ namespace wsrep
, fragment_size_()
, unit_counter_()
, log_position_()
, sr_store_(0)
{ }

/**
Expand Down Expand Up @@ -190,17 +189,6 @@ namespace wsrep
unit_counter_ = 0;
log_position_ = 0;
}

void set_sr_store(int store_type)
{
sr_store_ = store_type;
}

int get_sr_store() const
{
return (sr_store_);
}

private:

void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
Expand All @@ -216,7 +204,6 @@ namespace wsrep
size_t fragment_size_;
size_t unit_counter_;
size_t log_position_;
int sr_store_;
};
}

Expand Down
43 changes: 0 additions & 43 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,48 +142,6 @@ namespace wsrep
void xa_detach();

int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
int fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id);
void *get_binlog_cache();
/* state of Streaming Replication Speedup feature for the
transaction. This describes the relationship of this WSREP
transaction and the underlying InnoDB transaction.
*/
enum sr_state
{
/* this is not an SR Speedup transaction */
sr_state_none,
/* this is an SR Speedup transaction, but SR XID is not set
for the underlying InnoDB transaction
*/
sr_state_require_xid,
/* this is an SR Speedup transaction, and SR XID is set
for the underlying InnoDB transaction
*/
sr_state_xid_set
};
static const int n_sr_states = sr_state_xid_set + 1;
enum sr_state sr_state() const
{ return sr_state_; }
void require_sr_xid()
{
if (sr_state_ == sr_state_none) {
sr_state_ = sr_state_require_xid;
}
}
void sr_xid_was_set()
{
sr_state_ = sr_state_xid_set;
}
bool sr_xid_is_required()
{
return sr_state_ == sr_state_require_xid;
}
bool sr_xid_is_set()
{
return sr_state_ == sr_state_xid_set;
}

bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); }
void pa_unsafe(bool pa_unsafe) {
if (pa_unsafe) {
Expand Down Expand Up @@ -323,7 +281,6 @@ namespace wsrep
wsrep::mutable_buffer apply_error_buf_;
wsrep::xid xid_;
bool streaming_rollback_in_progress_;
enum sr_state sr_state_;
};

static inline const char* to_c_string(enum wsrep::transaction::state state)
Expand Down
9 changes: 2 additions & 7 deletions src/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ static int apply_fragment(wsrep::server_state& server_state,
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
Expand Down Expand Up @@ -134,7 +132,7 @@ static int apply_fragment(wsrep::server_state& server_state,
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
const wsrep::xid xid(streaming_applier->transaction().xid());
ret = high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data, sr_store, xid);
*streaming_applier, ws_handle, ws_meta, data, xid);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}
Expand Down Expand Up @@ -1555,10 +1553,7 @@ void wsrep::server_state::close_transactions_at_disconnect(
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
streaming_applier->rollback(
wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0);
streaming_applier->rollback_sr_on_disconnect();
streaming_applier->after_apply();
}
streaming_appliers_.erase(i++);
Expand Down
52 changes: 6 additions & 46 deletions src/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ wsrep::transaction::transaction(
, apply_error_buf_()
, xid_()
, streaming_rollback_in_progress_(false)
, sr_state_(sr_state_none)
{ }


Expand All @@ -127,7 +126,6 @@ int wsrep::transaction::start_transaction(
server_id_ = client_state_.server_state().id();
id_ = id;
state_ = s_executing;
sr_state_ = sr_state_none;
state_hist_.clear();
ws_handle_ = wsrep::ws_handle(id);
flags(wsrep::provider::flag::start_transaction);
Expand Down Expand Up @@ -1282,26 +1280,6 @@ int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
return ret;
}

int wsrep::transaction::fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id)
{
int rcode = client_service_.fragment_cache_remove_transaction(
server_id, transaction_id);

return (rcode);
}

void *wsrep::transaction::get_binlog_cache()
{
void *cache = client_service_.get_binlog_cache();

assert(cache);

return (cache);
}



////////////////////////////////////////////////////////////////////////////////
// Private //
////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1468,8 +1446,6 @@ int wsrep::transaction::certify_fragment(
assert(streaming_context_.rolled_back() == false ||
state() == s_must_abort);

int sr_store = streaming_context_.get_sr_store();

client_service_.wait_for_replayers(lock);
if (abort_or_interrupt(lock))
{
Expand Down Expand Up @@ -1571,24 +1547,11 @@ int wsrep::transaction::certify_fragment(
error = wsrep::e_append_fragment_error;
}

if (ret == 0 &&
(storage_service.start_transaction(ws_handle_) ||
(sr_store == 0 ?
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
0, 0, xid(), nullptr)
:
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
streaming_context_.get_sr_store(),
log_position - data.size(),
xid(), get_binlog_cache()))))
if (ret == 0
&& (storage_service.start_transaction(ws_handle_)
|| storage_service.append_fragment(
server_id, id(), flags(),
wsrep::const_buffer(data.data(), data.size()), xid())))
{
ret = 1;
error = wsrep::e_append_fragment_error;
Expand Down Expand Up @@ -1719,9 +1682,6 @@ int wsrep::transaction::certify_fragment(
flags(flags() & ~wsrep::provider::flag::start_transaction);
flags(flags() & ~wsrep::provider::flag::pa_unsafe);
}
if (sr_store != 0) {
require_sr_xid();
}

return ret;
}
Expand Down Expand Up @@ -2082,7 +2042,7 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
void wsrep::transaction::clear_fragments()
{
streaming_context_.cleanup();
fragment_cache_remove_transaction(server_id_, id_);
client_service_.fragment_cache_remove_transaction(server_id_, id_);
}

void wsrep::transaction::cleanup()
Expand Down
Loading