Skip to content

Commit

Permalink
Add support for custom provider implementation
Browse files Browse the repository at this point in the history
- Add a set_provider_factory() method to server_state to allow
  injecting provider factory which will be called when the
  provider is loaded.

Other related changes:
- Implement to_string() helper method for id class.
- Fix id ostream operator human readable id printing.
- Pass victim client_service as an argument to provider::bf_abort()
  to allow passing victim context to custom provider.
- Implement prev() helper method for seqno class.
- Make server_state recover_streaming_appliers_if_not_recovered()
  public. In some recovery scenarios the method must be called outside
  of server_state internal code paths.
- Add storage_service requires_globals() method. The storage service
  implementation may override this to return false if changing to
  storage service context does not require store/reset globals.
- Change view final() to also require that the view status is not
  primary for the view to be final. Also change the method name to
  is_final() to avoid confusion with C++ final identifier.
- Fixes to server state handling in disconnecting and disconnected
  states.

Co-authored-by: Denis Protivensky <[email protected]>
  • Loading branch information
temeo and denis-protivensky committed Jan 14, 2025
1 parent 51a0b0a commit 2303146
Show file tree
Hide file tree
Showing 26 changed files with 240 additions and 132 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ wsrep-API/libwsrep_api_v26.a

# Gcov generated files
*.dgcov

# Test logs
wsrep-lib_test.log
6 changes: 4 additions & 2 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ namespace wsrep
* @param lock Lock to protect client state.
* @param bf_seqno Seqno of the BF aborter.
*/
int bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, wsrep::seqno bf_seqno);
int bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep::seqno bf_seqno);
/**
* Wrapper to bf_abort() call, grabs lock internally.
*/
Expand All @@ -593,7 +594,8 @@ namespace wsrep
* should be called by the TOI operation which needs to
* BF abort a transaction.
*/
int total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, wsrep::seqno bf_seqno);
int total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep::seqno bf_seqno);

/**
* Wrapper to total_order_bf_abort(), grabs lock internally.
Expand Down
5 changes: 5 additions & 0 deletions include/wsrep/id.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ namespace wsrep
{
return undefined_;
}

/**
* Return id in string representation.
*/
std::string to_string() const;
private:
static const wsrep::id undefined_;
native_type data_;
Expand Down
17 changes: 10 additions & 7 deletions include/wsrep/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <cstring>

#include <memory>
#include <string>
#include <vector>
#include <ostream>
Expand All @@ -47,7 +48,7 @@ namespace wsrep
class tls_service;
class allowlist_service;
class event_service;

class client_service;
class stid
{
public:
Expand Down Expand Up @@ -283,7 +284,6 @@ namespace wsrep
static const int streaming = (1 << 15);
static const int snapshot = (1 << 16);
static const int nbo = (1 << 17);

/** decipher capability bitmask */
static std::string str(int);
};
Expand Down Expand Up @@ -375,6 +375,7 @@ namespace wsrep
*/
virtual enum status bf_abort(wsrep::seqno bf_seqno,
wsrep::transaction_id victim_trx,
wsrep::client_service& victim_ctx,
wsrep::seqno& victim_seqno) = 0;
virtual enum status rollback(wsrep::transaction_id) = 0;
virtual enum status commit_order_enter(const wsrep::ws_handle&,
Expand Down Expand Up @@ -407,6 +408,7 @@ namespace wsrep
* Leave total order isolation critical section
*/
virtual enum status leave_toi(wsrep::client_id,
const wsrep::ws_meta& ws_meta,
const wsrep::mutable_buffer& err) = 0;

/**
Expand Down Expand Up @@ -509,11 +511,12 @@ namespace wsrep
* @param provider_options Initial options to provider
* @param thread_service Optional thread service implementation.
*/
static provider* make_provider(wsrep::server_state&,
const std::string& provider_spec,
const std::string& provider_options,
const wsrep::provider::services& services
= wsrep::provider::services());
static std::unique_ptr<provider> make_provider(
wsrep::server_state&,
const std::string& provider_spec,
const std::string& provider_options,
const wsrep::provider::services& services
= wsrep::provider::services());

protected:
wsrep::server_state& server_state_;
Expand Down
5 changes: 5 additions & 0 deletions include/wsrep/seqno.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ namespace wsrep
return (seqno_ == -1);
}

wsrep::seqno prev() const
{
return seqno{seqno_ - 1};
}

bool operator<(seqno other) const
{
return (seqno_ < other.seqno_);
Expand Down
44 changes: 31 additions & 13 deletions include/wsrep/server_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
#include "compiler.hpp"
#include "xid.hpp"

#include <memory>
#include <deque>
#include <functional>
#include <vector>
#include <string>
#include <map>
Expand Down Expand Up @@ -188,8 +190,6 @@ namespace wsrep
rm_sync
};

virtual ~server_state();

wsrep::encryption_service* encryption_service()
{ return encryption_service_; }

Expand Down Expand Up @@ -299,6 +299,17 @@ namespace wsrep
const wsrep::provider::services& services
= wsrep::provider::services());

using provider_factory_func =
std::function<decltype(wsrep::provider::make_provider)>;

/**
* Set provider factory method.
*
* @param Factory method to create a provider.
*/
void set_provider_factory(const provider_factory_func&);

/** Unload/unset provider. */
void unload_provider();

bool is_provider_loaded() const { return provider_ != 0; }
Expand All @@ -310,12 +321,8 @@ namespace wsrep
*
* @throw wsrep::runtime_error if provider has not been loaded
*
* @todo This should not be virtual. However, currently there
* is no mechanism for tests and integrations to provide
* their own provider implementations, so this is kept virtual
* for time being.
*/
virtual wsrep::provider& provider() const
wsrep::provider& provider() const
{
if (provider_ == 0)
{
Expand Down Expand Up @@ -529,6 +536,19 @@ namespace wsrep
return init_initialized_;
}

/** Recover streaming appliers if not already recoverd yet.
*
* This method recovers streaming appliers from streaming log.
* It must be called before starting to apply events after
* connecting to the cluster.
*
* @param lock Lock object holding server_state mutex.
* @param service Either client or high priority service.
*/
template <class C>
void recover_streaming_appliers_if_not_recovered(
wsrep::unique_lock<wsrep::mutex>& lock, C& service);

/**
* This method will be called by the provider when
* a remote write set is being applied. It is the responsibility
Expand Down Expand Up @@ -618,6 +638,7 @@ namespace wsrep
, streaming_appliers_()
, streaming_appliers_recovered_()
, provider_()
, provider_factory_(wsrep::provider::make_provider)
, name_(name)
, id_(wsrep::id::undefined())
, incoming_address_(incoming_address)
Expand Down Expand Up @@ -645,11 +666,7 @@ namespace wsrep
// Interrupt all threads which are waiting for state
void interrupt_state_waiters(wsrep::unique_lock<wsrep::mutex>&);

// Recover streaming appliers if not already recoverd
template <class C>
void recover_streaming_appliers_if_not_recovered(
wsrep::unique_lock<wsrep::mutex>&, C&);

private:
// Close SR transcations whose origin is outside of current
// cluster view.
void close_orphaned_sr_transactions(
Expand Down Expand Up @@ -702,7 +719,8 @@ namespace wsrep
wsrep::high_priority_service*> streaming_appliers_map;
streaming_appliers_map streaming_appliers_;
bool streaming_appliers_recovered_;
wsrep::provider* provider_;
std::unique_ptr<wsrep::provider> provider_;
provider_factory_func provider_factory_;
std::string name_;
wsrep::id id_;
std::string incoming_address_;
Expand Down
11 changes: 11 additions & 0 deletions include/wsrep/storage_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ namespace wsrep

virtual void store_globals() = 0;
virtual void reset_globals() = 0;

/**
* Return true if the implementation requires storing
* and restoring global state. Return true by default
* since this is the original behavior. Stateless
* implementations may override.
*/
virtual bool requires_globals() const {
return true;
}

};
}

Expand Down
6 changes: 4 additions & 2 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ namespace wsrep
void after_applying();

bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep::seqno bf_seqno);
wsrep::seqno bf_seqno,
wsrep::client_service&);
bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&,
wsrep::seqno bf_seqno);
wsrep::seqno bf_seqno,
wsrep::client_service&);

void clone_for_replay(const wsrep::transaction& other);

Expand Down
4 changes: 2 additions & 2 deletions include/wsrep/view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ namespace wsrep
/**
* Return true if the view is final
*/
bool final() const
bool is_final() const
{
return (members_.empty() && own_index_ == -1);
return (status_ != status::primary && members_.empty() && own_index_ == -1);
}

/**
Expand Down
16 changes: 9 additions & 7 deletions src/client_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ int wsrep::client_state::bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
{
assert(lock.owns_lock());
assert(mode_ == m_local || transaction_.is_streaming());
auto ret = transaction_.bf_abort(lock, bf_seqno);
auto ret = transaction_.bf_abort(lock, bf_seqno, client_service_);
assert(lock.owns_lock());
return ret;
}
Expand All @@ -527,7 +527,7 @@ int wsrep::client_state::total_order_bf_abort(
{
assert(lock.owns_lock());
assert(mode_ == m_local || transaction_.is_streaming());
auto ret = transaction_.total_order_bf_abort(lock, bf_seqno);
auto ret = transaction_.total_order_bf_abort(lock, bf_seqno, client_service_);
assert(lock.owns_lock());
return ret;
}
Expand Down Expand Up @@ -585,7 +585,7 @@ wsrep::client_state::poll_enter_toi(
// Successfully entered TOI, but the provider reported failure.
// This may happen for example if certification fails.
// Leave TOI before proceeding.
if (provider().leave_toi(id_, wsrep::mutable_buffer()))
if (provider().leave_toi(id_, poll_meta, wsrep::mutable_buffer()))
{
wsrep::log_warning()
<< "Failed to leave TOI after failure in "
Expand Down Expand Up @@ -689,10 +689,12 @@ int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
{
debug_log_state("leave_toi_local: enter");
assert(toi_mode_ == m_local);
leave_toi_common();

auto ret = (provider().leave_toi(id_, toi_meta_, err) == provider::success ? 0 : 1);
leave_toi_common();
debug_log_state("leave_toi_local: leave");
return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);

return ret;
}

void wsrep::client_state::leave_toi_mode()
Expand Down Expand Up @@ -809,7 +811,7 @@ int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
assert(mode_ == m_nbo);
assert(in_toi());

enum wsrep::provider::status status(provider().leave_toi(id_, err));
enum wsrep::provider::status status(provider().leave_toi(id_, toi_meta_, err));
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
int ret;
switch (status)
Expand Down Expand Up @@ -910,7 +912,7 @@ int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
assert(toi_mode_ == m_local);
assert(in_toi());
enum wsrep::provider::status status(
provider().leave_toi(id_, err));
provider().leave_toi(id_, toi_meta_, err));
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
int ret;
switch (status)
Expand Down
5 changes: 5 additions & 0 deletions src/config_service_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ int wsrep::config_service_v1_fetch(wsrep::provider& provider,
wsrep::provider_options* options)
{
struct wsrep_st* wsrep = (struct wsrep_st*)provider.native();
if (wsrep == nullptr)
{
// Not a provider which was loaded via wsrep-API
return 0;
}
if (config_service_v1_probe(wsrep->dlh))
{
wsrep::log_warning() << "Provider does not support config service v1";
Expand Down
14 changes: 12 additions & 2 deletions src/id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,23 @@ wsrep::id::id(const std::string& str)
}
}

std::string wsrep::id::to_string() const
{
std::ostringstream os;
os << *this;
return os.str();
}

std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id)
{
const char* ptr(static_cast<const char*>(id.data()));
size_t size(id.size());
if (static_cast<size_t>(std::count_if(ptr, ptr + size, ::isalnum)) == size)
if (static_cast<size_t>(
std::count_if(ptr, ptr + size,
[](char c) { return (::isalnum(c) || c == '\0'); }))
== size)
{
return (os << std::string(ptr, size));
return (os << std::string(ptr, ::strnlen(ptr, size)));
}
else
{
Expand Down
7 changes: 3 additions & 4 deletions src/provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@
#include <cassert>
#include <memory>

wsrep::provider* wsrep::provider::make_provider(
std::unique_ptr<wsrep::provider> wsrep::provider::make_provider(
wsrep::server_state& server_state,
const std::string& provider_spec,
const std::string& provider_options,
const wsrep::provider::services& services)
{
try
{
return new wsrep::wsrep_provider_v26(
server_state, provider_options, provider_spec, services);
return std::unique_ptr<wsrep::provider>(new wsrep::wsrep_provider_v26(
server_state, provider_options, provider_spec, services));
}
catch (const wsrep::runtime_error& e)
{
Expand Down Expand Up @@ -120,7 +120,6 @@ std::string wsrep::provider::capability::str(int caps)
WSREP_PRINT_CAPABILITY(streaming, "STREAMING");
WSREP_PRINT_CAPABILITY(snapshot, "READ_VIEW");
WSREP_PRINT_CAPABILITY(nbo, "NBO");

#undef WSREP_PRINT_CAPABILITY

if (caps)
Expand Down
Loading

0 comments on commit 2303146

Please sign in to comment.