Skip to content

Commit a029cfd

Browse files
Add support for custom provider implementation
- Add a set_provider_factory() method to server_state to allow injecting provider factory which will be called when the provider is loaded. Other related changes: - Implement to_string() helper method for id class. - Fix id ostream operator human readable id printing. - Pass victim client_service as an argument to provider::bf_abort() to allow passing victim context to custom provider. - Implement prev() helper method for seqno class. - Make server_state recover_streaming_appliers_if_not_recovered() public. In some recovery scenarios the method must be called outside of server_state internal code paths. - Add storage_service requires_globals() method. The storage service implementation may override this to return false if changing to storage service context does not require store/reset globals. - Change view final() to also require that the view status is not primary for the view to be final. Also change the method name to is_final() to avoid confusion with C++ final identifier. - Fixes to server state handling in disconnecting and disconnected states. Co-authored-by: Denis Protivensky <[email protected]>
1 parent 51a0b0a commit a029cfd

26 files changed

+242
-131
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@ wsrep-API/libwsrep_api_v26.a
1313

1414
# Gcov generated files
1515
*.dgcov
16+
17+
# Test logs
18+
wsrep-lib_test.log

include/wsrep/client_state.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,8 @@ namespace wsrep
582582
* @param lock Lock to protect client state.
583583
* @param bf_seqno Seqno of the BF aborter.
584584
*/
585-
int bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, wsrep::seqno bf_seqno);
585+
int bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
586+
wsrep::seqno bf_seqno);
586587
/**
587588
* Wrapper to bf_abort() call, grabs lock internally.
588589
*/
@@ -593,7 +594,8 @@ namespace wsrep
593594
* should be called by the TOI operation which needs to
594595
* BF abort a transaction.
595596
*/
596-
int total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>& lock, wsrep::seqno bf_seqno);
597+
int total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
598+
wsrep::seqno bf_seqno);
597599

598600
/**
599601
* Wrapper to total_order_bf_abort(), grabs lock internally.

include/wsrep/id.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ namespace wsrep
9393
{
9494
return undefined_;
9595
}
96+
97+
/**
98+
* Return id in string representation.
99+
*/
100+
std::string to_string() const;
96101
private:
97102
static const wsrep::id undefined_;
98103
native_type data_;

include/wsrep/provider.hpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include <cstring>
3131

32+
#include <memory>
3233
#include <string>
3334
#include <vector>
3435
#include <ostream>
@@ -47,7 +48,7 @@ namespace wsrep
4748
class tls_service;
4849
class allowlist_service;
4950
class event_service;
50-
51+
class client_service;
5152
class stid
5253
{
5354
public:
@@ -283,7 +284,6 @@ namespace wsrep
283284
static const int streaming = (1 << 15);
284285
static const int snapshot = (1 << 16);
285286
static const int nbo = (1 << 17);
286-
287287
/** decipher capability bitmask */
288288
static std::string str(int);
289289
};
@@ -375,6 +375,7 @@ namespace wsrep
375375
*/
376376
virtual enum status bf_abort(wsrep::seqno bf_seqno,
377377
wsrep::transaction_id victim_trx,
378+
wsrep::client_service& victim_ctx,
378379
wsrep::seqno& victim_seqno) = 0;
379380
virtual enum status rollback(wsrep::transaction_id) = 0;
380381
virtual enum status commit_order_enter(const wsrep::ws_handle&,
@@ -407,6 +408,7 @@ namespace wsrep
407408
* Leave total order isolation critical section
408409
*/
409410
virtual enum status leave_toi(wsrep::client_id,
411+
const wsrep::ws_meta& ws_meta,
410412
const wsrep::mutable_buffer& err) = 0;
411413

412414
/**
@@ -509,11 +511,12 @@ namespace wsrep
509511
* @param provider_options Initial options to provider
510512
* @param thread_service Optional thread service implementation.
511513
*/
512-
static provider* make_provider(wsrep::server_state&,
513-
const std::string& provider_spec,
514-
const std::string& provider_options,
515-
const wsrep::provider::services& services
516-
= wsrep::provider::services());
514+
static std::unique_ptr<provider> make_provider(
515+
wsrep::server_state&,
516+
const std::string& provider_spec,
517+
const std::string& provider_options,
518+
const wsrep::provider::services& services
519+
= wsrep::provider::services());
517520

518521
protected:
519522
wsrep::server_state& server_state_;

include/wsrep/seqno.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ namespace wsrep
5151
return (seqno_ == -1);
5252
}
5353

54+
wsrep::seqno prev() const
55+
{
56+
return seqno{seqno_ - 1};
57+
}
58+
5459
bool operator<(seqno other) const
5560
{
5661
return (seqno_ < other.seqno_);

include/wsrep/server_state.hpp

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@
9292
#include "compiler.hpp"
9393
#include "xid.hpp"
9494

95+
#include <memory>
9596
#include <deque>
97+
#include <functional>
9698
#include <vector>
9799
#include <string>
98100
#include <map>
@@ -188,8 +190,6 @@ namespace wsrep
188190
rm_sync
189191
};
190192

191-
virtual ~server_state();
192-
193193
wsrep::encryption_service* encryption_service()
194194
{ return encryption_service_; }
195195

@@ -299,6 +299,17 @@ namespace wsrep
299299
const wsrep::provider::services& services
300300
= wsrep::provider::services());
301301

302+
using provider_factory_func =
303+
std::function<decltype(wsrep::provider::make_provider)>;
304+
305+
/**
306+
* Set provider factory method.
307+
*
308+
* @param Factory method to create a provider.
309+
*/
310+
void set_provider_factory(const provider_factory_func&);
311+
312+
/** Unload/unset provider. */
302313
void unload_provider();
303314

304315
bool is_provider_loaded() const { return provider_ != 0; }
@@ -310,12 +321,8 @@ namespace wsrep
310321
*
311322
* @throw wsrep::runtime_error if provider has not been loaded
312323
*
313-
* @todo This should not be virtual. However, currently there
314-
* is no mechanism for tests and integrations to provide
315-
* their own provider implementations, so this is kept virtual
316-
* for time being.
317324
*/
318-
virtual wsrep::provider& provider() const
325+
wsrep::provider& provider() const
319326
{
320327
if (provider_ == 0)
321328
{
@@ -529,6 +536,19 @@ namespace wsrep
529536
return init_initialized_;
530537
}
531538

539+
/** Recover streaming appliers if not already recoverd yet.
540+
*
541+
* This method recovers streaming appliers from streaming log.
542+
* It must be called before starting to apply events after
543+
* connecting to the cluster.
544+
*
545+
* @param lock Lock object holding server_state mutex.
546+
* @param service Either client or high priority service.
547+
*/
548+
template <class C>
549+
void recover_streaming_appliers_if_not_recovered(
550+
wsrep::unique_lock<wsrep::mutex>& lock, C& service);
551+
532552
/**
533553
* This method will be called by the provider when
534554
* a remote write set is being applied. It is the responsibility
@@ -618,6 +638,7 @@ namespace wsrep
618638
, streaming_appliers_()
619639
, streaming_appliers_recovered_()
620640
, provider_()
641+
, provider_factory_(wsrep::provider::make_provider)
621642
, name_(name)
622643
, id_(wsrep::id::undefined())
623644
, incoming_address_(incoming_address)
@@ -645,11 +666,7 @@ namespace wsrep
645666
// Interrupt all threads which are waiting for state
646667
void interrupt_state_waiters(wsrep::unique_lock<wsrep::mutex>&);
647668

648-
// Recover streaming appliers if not already recoverd
649-
template <class C>
650-
void recover_streaming_appliers_if_not_recovered(
651-
wsrep::unique_lock<wsrep::mutex>&, C&);
652-
669+
private:
653670
// Close SR transcations whose origin is outside of current
654671
// cluster view.
655672
void close_orphaned_sr_transactions(
@@ -702,7 +719,8 @@ namespace wsrep
702719
wsrep::high_priority_service*> streaming_appliers_map;
703720
streaming_appliers_map streaming_appliers_;
704721
bool streaming_appliers_recovered_;
705-
wsrep::provider* provider_;
722+
std::unique_ptr<wsrep::provider> provider_;
723+
provider_factory_func provider_factory_;
706724
std::string name_;
707725
wsrep::id id_;
708726
std::string incoming_address_;

include/wsrep/storage_service.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,17 @@ namespace wsrep
9292

9393
virtual void store_globals() = 0;
9494
virtual void reset_globals() = 0;
95+
96+
/**
97+
* Return true if the implementation requires storing
98+
* and restoring global state. Return true by default
99+
* since this is the original behavior. Stateless
100+
* implementations may override.
101+
*/
102+
virtual bool requires_globals() const {
103+
return true;
104+
}
105+
95106
};
96107
}
97108

include/wsrep/transaction.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,11 @@ namespace wsrep
200200
void after_applying();
201201

202202
bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
203-
wsrep::seqno bf_seqno);
203+
wsrep::seqno bf_seqno,
204+
wsrep::client_service&);
204205
bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&,
205-
wsrep::seqno bf_seqno);
206+
wsrep::seqno bf_seqno,
207+
wsrep::client_service&);
206208

207209
void clone_for_replay(const wsrep::transaction& other);
208210

include/wsrep/view.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ namespace wsrep
117117
/**
118118
* Return true if the view is final
119119
*/
120-
bool final() const
120+
bool is_final() const
121121
{
122-
return (members_.empty() && own_index_ == -1);
122+
return (status_ != status::primary && members_.empty() && own_index_ == -1);
123123
}
124124

125125
/**

src/client_state.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ int wsrep::client_state::bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
511511
{
512512
assert(lock.owns_lock());
513513
assert(mode_ == m_local || transaction_.is_streaming());
514-
auto ret = transaction_.bf_abort(lock, bf_seqno);
514+
auto ret = transaction_.bf_abort(lock, bf_seqno, client_service_);
515515
assert(lock.owns_lock());
516516
return ret;
517517
}
@@ -527,7 +527,7 @@ int wsrep::client_state::total_order_bf_abort(
527527
{
528528
assert(lock.owns_lock());
529529
assert(mode_ == m_local || transaction_.is_streaming());
530-
auto ret = transaction_.total_order_bf_abort(lock, bf_seqno);
530+
auto ret = transaction_.total_order_bf_abort(lock, bf_seqno, client_service_);
531531
assert(lock.owns_lock());
532532
return ret;
533533
}
@@ -585,7 +585,7 @@ wsrep::client_state::poll_enter_toi(
585585
// Successfully entered TOI, but the provider reported failure.
586586
// This may happen for example if certification fails.
587587
// Leave TOI before proceeding.
588-
if (provider().leave_toi(id_, wsrep::mutable_buffer()))
588+
if (provider().leave_toi(id_, poll_meta, wsrep::mutable_buffer()))
589589
{
590590
wsrep::log_warning()
591591
<< "Failed to leave TOI after failure in "
@@ -689,10 +689,12 @@ int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
689689
{
690690
debug_log_state("leave_toi_local: enter");
691691
assert(toi_mode_ == m_local);
692-
leave_toi_common();
693692

693+
auto ret = (provider().leave_toi(id_, toi_meta_, err) == provider::success ? 0 : 1);
694+
leave_toi_common();
694695
debug_log_state("leave_toi_local: leave");
695-
return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);
696+
697+
return ret;
696698
}
697699

698700
void wsrep::client_state::leave_toi_mode()
@@ -809,7 +811,7 @@ int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
809811
assert(mode_ == m_nbo);
810812
assert(in_toi());
811813

812-
enum wsrep::provider::status status(provider().leave_toi(id_, err));
814+
enum wsrep::provider::status status(provider().leave_toi(id_, toi_meta_, err));
813815
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
814816
int ret;
815817
switch (status)
@@ -910,7 +912,7 @@ int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
910912
assert(toi_mode_ == m_local);
911913
assert(in_toi());
912914
enum wsrep::provider::status status(
913-
provider().leave_toi(id_, err));
915+
provider().leave_toi(id_, toi_meta_, err));
914916
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
915917
int ret;
916918
switch (status)

src/config_service_v1.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ int wsrep::config_service_v1_fetch(wsrep::provider& provider,
151151
wsrep::provider_options* options)
152152
{
153153
struct wsrep_st* wsrep = (struct wsrep_st*)provider.native();
154+
if (wsrep == nullptr)
155+
{
156+
// Not a provider which was loaded via wsrep-API
157+
return 0;
158+
}
154159
if (config_service_v1_probe(wsrep->dlh))
155160
{
156161
wsrep::log_warning() << "Provider does not support config service v1";

src/id.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,23 @@ wsrep::id::id(const std::string& str)
5050
}
5151
}
5252

53+
std::string wsrep::id::to_string() const
54+
{
55+
std::ostringstream os;
56+
os << *this;
57+
return os.str();
58+
}
59+
5360
std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id)
5461
{
5562
const char* ptr(static_cast<const char*>(id.data()));
5663
size_t size(id.size());
57-
if (static_cast<size_t>(std::count_if(ptr, ptr + size, ::isalnum)) == size)
64+
if (static_cast<size_t>(
65+
std::count_if(ptr, ptr + size,
66+
[](char c) { return (::isalnum(c) || c == '\0'); }))
67+
== size)
5868
{
59-
return (os << std::string(ptr, size));
69+
return (os << std::string(ptr, ::strnlen(ptr, size)));
6070
}
6171
else
6272
{

src/provider.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@
2626
#include <cassert>
2727
#include <memory>
2828

29-
wsrep::provider* wsrep::provider::make_provider(
29+
std::unique_ptr<wsrep::provider> wsrep::provider::make_provider(
3030
wsrep::server_state& server_state,
3131
const std::string& provider_spec,
3232
const std::string& provider_options,
3333
const wsrep::provider::services& services)
3434
{
3535
try
3636
{
37-
return new wsrep::wsrep_provider_v26(
38-
server_state, provider_options, provider_spec, services);
37+
return std::unique_ptr<wsrep::provider>(new wsrep::wsrep_provider_v26(
38+
server_state, provider_options, provider_spec, services));
3939
}
4040
catch (const wsrep::runtime_error& e)
4141
{
@@ -120,7 +120,6 @@ std::string wsrep::provider::capability::str(int caps)
120120
WSREP_PRINT_CAPABILITY(streaming, "STREAMING");
121121
WSREP_PRINT_CAPABILITY(snapshot, "READ_VIEW");
122122
WSREP_PRINT_CAPABILITY(nbo, "NBO");
123-
124123
#undef WSREP_PRINT_CAPABILITY
125124

126125
if (caps)

0 commit comments

Comments
 (0)