Skip to content

Commit 3de594b

Browse files
committed
Add application defined sequential consistency for certification
Client state methods before_prepare() and before_commit() accept a callback which is called by the provider after it can guarantee sequential consistency. This allows application threads which wish to maintain sequential consistency to enter before_prepare() and before_commit() calls concurrently without waiting the prior call to finish.
1 parent 1c61b80 commit 3de594b

9 files changed

+113
-31
lines changed

include/wsrep/client_state.hpp

+40-2
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,49 @@ namespace wsrep
413413

414414
/** @name Commit ordering interface */
415415
/** @{ */
416-
int before_prepare();
416+
417+
/**
418+
* This method should be called before the transaction
419+
* is prepared. This call certifies the transaction and
420+
* assigns write set meta data.
421+
*
422+
* @param seq_cb Callback which is passed to underlying
423+
* certify() call. See wsrep::provider::certify().
424+
*
425+
* @return Zero on success, non-zero on failure.
426+
*/
427+
int before_prepare(const wsrep::provider::seq_cb_t* seq_cb);
428+
429+
/** Same as before_prepare() above, but nullptr is passed
430+
* to seq_cb. */
431+
int before_prepare()
432+
{
433+
return before_prepare(nullptr);
434+
}
417435

418436
int after_prepare();
419437

420-
int before_commit();
438+
/**
439+
* This method should be called before transaction is committed.
440+
* This call makes the transaction to enter commit time
441+
* critical section. The critical section is left by calling
442+
* ordered_commit().
443+
*
444+
* If before_prepare() is not called before this call, the
445+
* before_prepare() is called internally.
446+
*
447+
* @param seq_cb Callback which is passed to underlying
448+
* before_prepare() call.
449+
*
450+
* @return Zero on success, non-zero on failure.
451+
*/
452+
int before_commit(const wsrep::provider::seq_cb_t* seq_cb);
453+
454+
/** Same as before_commit(), but nullptr is passed to seq_cb. */
455+
int before_commit()
456+
{
457+
return before_commit(nullptr);
458+
}
421459

422460
int ordered_commit();
423461

include/wsrep/provider.hpp

+30-3
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,37 @@ namespace wsrep
332332
virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0;
333333
virtual enum status append_data(
334334
wsrep::ws_handle&, const wsrep::const_buffer&) = 0;
335+
336+
/**
337+
* Callback for application defined sequential consistency.
338+
* The provider will call
339+
* the callback once it can guarantee sequential consistency. */
340+
typedef struct seq_cb {
341+
/** Opaque caller context */
342+
void *ctx;
343+
/** Function to be called by the provider when sequential
344+
* consistency is guaranteed. */
345+
void (*fn)(void *ctx);
346+
} seq_cb_t;
347+
348+
/**
349+
* Certify the write set.
350+
*
351+
* @param client_id[in] Id of the client session.
352+
* @param ws_handle[in,out] Write set handle associated to the current
353+
* transaction.
354+
* @param flags[in] Flags associated to the write set (see struct flag).
355+
* @param ws_meta[out] Write set meta data associated to the
356+
* replicated write set.
357+
* @param seq_cb[in] Optional callback for application defined
358+
* sequential consistency.
359+
*
360+
* @return Status code defined in struct status.
361+
*/
335362
virtual enum status
336-
certify(wsrep::client_id, wsrep::ws_handle&,
337-
int,
338-
wsrep::ws_meta&) = 0;
363+
certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle,
364+
int flags, wsrep::ws_meta& ws_meta, const seq_cb_t* seq_cb)
365+
= 0;
339366
/**
340367
* BF abort a transaction inside provider.
341368
*

include/wsrep/transaction.hpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,12 @@ namespace wsrep
175175

176176
int after_row();
177177

178-
int before_prepare(wsrep::unique_lock<wsrep::mutex>&);
178+
int before_prepare(wsrep::unique_lock<wsrep::mutex>&,
179+
const wsrep::provider::seq_cb_t*);
179180

180181
int after_prepare(wsrep::unique_lock<wsrep::mutex>&);
181182

182-
int before_commit();
183+
int before_commit(const wsrep::provider::seq_cb_t*);
183184

184185
int ordered_commit();
185186

@@ -248,7 +249,8 @@ namespace wsrep
248249
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
249250
int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
250251
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
251-
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
252+
int certify_commit(wsrep::unique_lock<wsrep::mutex>&,
253+
const wsrep::provider::seq_cb_t*);
252254
int append_sr_keys_for_commit();
253255
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
254256
void remove_fragments_in_storage_service_scope(

src/client_state.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,12 @@ int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta)
365365
return transaction_.next_fragment(meta);
366366
}
367367

368-
int wsrep::client_state::before_prepare()
368+
int wsrep::client_state::before_prepare(const wsrep::provider::seq_cb_t* seq_cb)
369369
{
370370
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
371371
assert(owning_thread_id_ == wsrep::this_thread::get_id());
372372
assert(state_ == s_exec);
373-
return transaction_.before_prepare(lock);
373+
return transaction_.before_prepare(lock, seq_cb);
374374
}
375375

376376
int wsrep::client_state::after_prepare()
@@ -381,11 +381,11 @@ int wsrep::client_state::after_prepare()
381381
return transaction_.after_prepare(lock);
382382
}
383383

384-
int wsrep::client_state::before_commit()
384+
int wsrep::client_state::before_commit(const wsrep::provider::seq_cb_t* seq_cb)
385385
{
386386
assert(owning_thread_id_ == wsrep::this_thread::get_id());
387387
assert(state_ == s_exec || mode_ == m_local);
388-
return transaction_.before_commit();
388+
return transaction_.before_commit(seq_cb);
389389
}
390390

391391
int wsrep::client_state::ordered_commit()

src/transaction.cpp

+11-11
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ int wsrep::transaction::after_row()
273273
return ret;
274274
}
275275

276-
int wsrep::transaction::before_prepare(
277-
wsrep::unique_lock<wsrep::mutex>& lock)
276+
int wsrep::transaction::before_prepare(wsrep::unique_lock<wsrep::mutex>& lock,
277+
const wsrep::provider::seq_cb_t* seq_cb)
278278
{
279279
assert(lock.owns_lock());
280280
int ret(0);
@@ -349,7 +349,7 @@ int wsrep::transaction::before_prepare(
349349
}
350350
else
351351
{
352-
ret = certify_commit(lock);
352+
ret = certify_commit(lock, seq_cb);
353353
}
354354

355355
assert((ret == 0 && state() == s_preparing) ||
@@ -445,7 +445,7 @@ int wsrep::transaction::after_prepare(
445445
return ret;
446446
}
447447

448-
int wsrep::transaction::before_commit()
448+
int wsrep::transaction::before_commit(const wsrep::provider::seq_cb* seq_cb)
449449
{
450450
int ret(1);
451451

@@ -465,7 +465,7 @@ int wsrep::transaction::before_commit()
465465
case wsrep::client_state::m_local:
466466
if (state() == s_executing)
467467
{
468-
ret = before_prepare(lock) || after_prepare(lock);
468+
ret = before_prepare(lock, seq_cb) || after_prepare(lock);
469469
assert((ret == 0 &&
470470
(state() == s_committing || state() == s_prepared))
471471
||
@@ -495,7 +495,7 @@ int wsrep::transaction::before_commit()
495495

496496
if (ret == 0 && state() == s_prepared)
497497
{
498-
ret = certify_commit(lock);
498+
ret = certify_commit(lock, nullptr);
499499
assert((ret == 0 && state() == s_committing) ||
500500
(state() == s_must_abort ||
501501
state() == s_must_replay ||
@@ -543,7 +543,7 @@ int wsrep::transaction::before_commit()
543543
}
544544
else if (state() == s_executing || state() == s_replaying)
545545
{
546-
ret = before_prepare(lock) || after_prepare(lock);
546+
ret = before_prepare(lock, nullptr) || after_prepare(lock);
547547
}
548548
else
549549
{
@@ -1195,7 +1195,7 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
11951195
provider().certify(client_state_.id(),
11961196
ws_handle_,
11971197
flags(),
1198-
meta));
1198+
meta, nullptr));
11991199

12001200
int ret;
12011201
if (cert_ret == wsrep::provider::success)
@@ -1622,7 +1622,7 @@ int wsrep::transaction::certify_fragment(
16221622
cert_ret = provider().certify(client_state_.id(),
16231623
ws_handle_,
16241624
flags(),
1625-
sr_ws_meta);
1625+
sr_ws_meta, nullptr);
16261626
client_service_.debug_crash(
16271627
"crash_replicate_fragment_after_certify");
16281628

@@ -1744,7 +1744,7 @@ int wsrep::transaction::certify_fragment(
17441744
}
17451745

17461746
int wsrep::transaction::certify_commit(
1747-
wsrep::unique_lock<wsrep::mutex>& lock)
1747+
wsrep::unique_lock<wsrep::mutex>& lock, const provider::seq_cb_t* seq_cb)
17481748
{
17491749
assert(lock.owns_lock());
17501750
assert(active());
@@ -1828,7 +1828,7 @@ int wsrep::transaction::certify_commit(
18281828
cert_ret(provider().certify(client_state_.id(),
18291829
ws_handle_,
18301830
flags(),
1831-
ws_meta_));
1831+
ws_meta_, seq_cb));
18321832
client_service_.debug_sync("wsrep_after_certification");
18331833

18341834
lock.lock();

src/wsrep_provider_v26.cpp

+19-5
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,7 @@ namespace
674674
}
675675

676676
wsrep_node_isolation_mode_set_fn_v1 node_isolation_mode_set;
677+
wsrep_certify_fn_v1 certify_v1;
677678
}
678679

679680

@@ -721,6 +722,9 @@ void wsrep::wsrep_provider_v26::init_services(
721722
node_isolation_mode_set
722723
= wsrep_impl::resolve_function<wsrep_node_isolation_mode_set_fn_v1>(
723724
wsrep_->dlh, WSREP_NODE_ISOLATION_MODE_SET_V1);
725+
726+
certify_v1 = wsrep_impl::resolve_function<wsrep_certify_fn_v1>(
727+
wsrep_->dlh, WSREP_CERTIFY_V1);
724728
}
725729

726730
void wsrep::wsrep_provider_v26::deinit_services()
@@ -922,14 +926,24 @@ enum wsrep::provider::status
922926
wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
923927
wsrep::ws_handle& ws_handle,
924928
int flags,
925-
wsrep::ws_meta& ws_meta)
929+
wsrep::ws_meta& ws_meta,
930+
const seq_cb_t* seq_cb)
926931
{
927932
mutable_ws_handle mwsh(ws_handle);
928933
mutable_ws_meta mmeta(ws_meta, flags);
929-
return map_return_value(
930-
wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
931-
mmeta.native_flags(),
932-
mmeta.native()));
934+
if (seq_cb && certify_v1)
935+
{
936+
wsrep_seq_cb_t wseq_cb{seq_cb->ctx, seq_cb->fn};
937+
return map_return_value(certify_v1(wsrep_, client_id.get(),
938+
mwsh.native(), mmeta.native_flags(),
939+
mmeta.native(), &wseq_cb));
940+
}
941+
else
942+
{
943+
return map_return_value(
944+
wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
945+
mmeta.native_flags(), mmeta.native()));
946+
}
933947
}
934948

935949
enum wsrep::provider::status

src/wsrep_provider_v26.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ namespace wsrep
5959
enum wsrep::provider::status
6060
certify(wsrep::client_id, wsrep::ws_handle&,
6161
int,
62-
wsrep::ws_meta&) WSREP_OVERRIDE;
62+
wsrep::ws_meta&, const seq_cb_t*) WSREP_OVERRIDE;
6363
enum wsrep::provider::status
6464
bf_abort(wsrep::seqno,
6565
wsrep::transaction_id,

test/mock_provider.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ namespace wsrep
7979
certify(wsrep::client_id client_id,
8080
wsrep::ws_handle& ws_handle,
8181
int flags,
82-
wsrep::ws_meta& ws_meta)
82+
wsrep::ws_meta& ws_meta,
83+
const seq_cb* /* Ignored in unit tests. */)
8384
WSREP_OVERRIDE
8485
{
8586
ws_handle = wsrep::ws_handle(ws_handle.transaction_id(), (void*)1);

wsrep-API/v26

Submodule v26 updated 1 file

0 commit comments

Comments
 (0)