Skip to content

Commit

Permalink
kvs: remove fence support
Browse files Browse the repository at this point in the history
Problem: The KVS fence implementation is virtually unused.  It also
has the potential for denial-of-service attacks.  Its implementation
is also quite "one off" within the KVS, limiting some ability to refactor
the code.  For all these reasons, it should be removed.  If fence-like
behavior is ever needed again in the future, it should be implemented
as a stand alone module/service on top of the KVS.

Solution: Remove all fence support in the KVS.

Fixes #6587
  • Loading branch information
chu11 committed Jan 30, 2025
1 parent 0ac3813 commit 044238c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 263 deletions.
264 changes: 3 additions & 261 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ struct kvs_ctx {
unsigned int seq; /* for commit transactions */
kvs_checkpoint_t *kcp;
tstat_t txn_commit_stats;
tstat_t txn_fence_stats;
zhashx_t *requests; /* track unfinished requests */
struct list_head work_queue;
};
Expand Down Expand Up @@ -966,7 +965,7 @@ static void kvstxn_apply_cb (flux_future_t *f, void *arg)
kvstxn_apply (kt);
}

/* Write all the ops for a particular commit/fence request (rank 0
/* Write all the ops for a particular commit request (rank 0
* only). The setroot event will cause responses to be sent to the
* transaction requests and clean up the treq_t state. This
* function is idempotent.
Expand Down Expand Up @@ -1848,243 +1847,6 @@ static void commit_request_cb (flux_t *h,
request_tracking_remove (ctx, msg);
}


/* kvs.relayfence (rank 0 only, no response).
*/
static void relayfence_request_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct kvs_ctx *ctx = arg;
struct kvsroot *root;
const char *ns;
const char *name;
int saved_errno, nprocs, flags;
json_t *ops = NULL;
treq_t *tr;

if (flux_request_unpack (msg,
NULL,
"{ s:o s:s s:s s:i s:i }",
"ops", &ops,
"name", &name,
"namespace", &ns,
"flags", &flags,
"nprocs", &nprocs) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
return;
}

/* namespace must exist given we are on rank 0 */
if (!(root = kvsroot_mgr_lookup_root_safe (ctx->krm, ns))) {
errno = ENOTSUP;
goto error;
}

if (!(tr = treq_mgr_lookup_transaction (root->trm, name))) {
if (!(tr = treq_create (name, nprocs, flags))) {
flux_log_error (h, "%s: treq_create", __FUNCTION__);
goto error;
}
if (treq_mgr_add_transaction (root->trm, tr) < 0) {
saved_errno = errno;
flux_log_error (h, "%s: treq_mgr_add_transaction", __FUNCTION__);
treq_destroy (tr);
errno = saved_errno;
goto error;
}
}

if (treq_get_flags (tr) != flags
|| treq_get_nprocs (tr) != nprocs) {
errno = EINVAL;
goto error;
}

if (treq_add_request_ops (tr, ops) < 0) {
flux_log_error (h, "%s: treq_add_request_ops", __FUNCTION__);
goto error;
}

if (treq_count_reached (tr)) {

/* If user called fence > nprocs time, should have been caught
* earlier */
assert (!treq_get_processed (tr));

/* we use this flag to indicate if a treq has been added to
* the ready queue */
treq_mark_processed (tr);

if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr),
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction", __FUNCTION__);
goto error;
}

tstat_push (&ctx->txn_fence_stats,
(double)json_array_size (treq_get_ops (tr)));

work_queue_check_append (ctx, root);
}

/* N.B. no request tracking for relay. The relay does not get a
* response, only the original via finalize_transaction_bynames().
*/
return;

error:
/* An error has occurred, so we will return an error similarly to
* how an error would be returned via a transaction error in
* kvstxn_apply().
*/
if (error_event_send_to_name (ctx, ns, name, errno) < 0)
flux_log_error (h, "%s: error_event_send_to_name", __FUNCTION__);
}

/* kvs.fence
* Sent from users to local kvs module.
*/
static void fence_request_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct kvs_ctx *ctx = arg;
struct kvsroot *root;
const char *ns;
const char *name;
int saved_errno, nprocs, flags;
bool stall = false;
json_t *ops = NULL;
treq_t *tr;
flux_error_t error;
const char *errmsg = NULL;

if (flux_request_unpack (msg,
NULL,
"{ s:o s:s s:s s:i s:i }",
"ops", &ops,
"name", &name,
"namespace", &ns,
"flags", &flags,
"nprocs", &nprocs) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
goto error;
}
if (flux_msg_authorize (msg, FLUX_USERID_UNKNOWN) < 0
&& guest_commit_authorize (ops, &error) < 0) {
errno = EPERM;
errmsg = error.text;
goto error;
}

if (!(root = getroot (ctx, ns, mh, msg, NULL, &stall))) {
if (stall) {
request_tracking_add (ctx, msg);
goto stall;
}
goto error;
}

if (!(tr = treq_mgr_lookup_transaction (root->trm, name))) {
if (!(tr = treq_create (name, nprocs, flags))) {
flux_log_error (h, "%s: treq_create", __FUNCTION__);
goto error;
}
if (treq_mgr_add_transaction (root->trm, tr) < 0) {
saved_errno = errno;
flux_log_error (h, "%s: treq_mgr_add_transaction", __FUNCTION__);
treq_destroy (tr);
errno = saved_errno;
goto error;
}
}

if (treq_get_flags (tr) != flags
|| treq_get_nprocs (tr) != nprocs) {
errno = EINVAL;
goto error;
}

/* save copy of request, will be used later via
* finalize_transaction_bynames() to send error code to original
* send.
*/
if (treq_add_request_copy (tr, msg) < 0)
goto error;

/* If we happen to be on rank 0, perform equivalent of
* relayfence_request_cb() here instead of sending an RPC
*/
if (ctx->rank == 0) {

if (treq_add_request_ops (tr, ops) < 0) {
flux_log_error (h, "%s: treq_add_request_ops", __FUNCTION__);
goto error;
}

if (treq_count_reached (tr)) {

/* If user called fence > nprocs time, should have been caught
* earlier */
assert (!treq_get_processed (tr));

/* we use this flag to indicate if a treq has been added to
* the ready queue */
treq_mark_processed (tr);

if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr),
0) < 0) {
flux_log_error (h,
"%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
}

tstat_push (&ctx->txn_fence_stats,
(double)json_array_size (treq_get_ops (tr)));

work_queue_check_append (ctx, root);
}
}
else {
flux_future_t *f;

/* route to rank 0 as instance owner */
if (!(f = flux_rpc_pack (h,
"kvs.relayfence",
0,
FLUX_RPC_NORESPONSE,
"{ s:O s:s s:s s:i s:i }",
"ops", ops,
"name", name,
"namespace", ns,
"flags", flags,
"nprocs", nprocs))) {
flux_log_error (h, "%s: flux_rpc_pack", __FUNCTION__);
goto error;
}
flux_future_destroy (f);
}
request_tracking_add (ctx, msg);
return;

error:
if (flux_respond_error (h, msg, errno, errmsg) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
request_tracking_remove (ctx, msg);
stall:
return;
}

static void wait_version_request_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -2283,7 +2045,7 @@ static void setroot_event_cb (flux_t *h,
/* if root not initialized, nothing to do
* - small chance we could receive setroot event on namespace that
* is being removed. Would require events to be received out of
* order (commit/fence completes before namespace removed, but
* order (commit completes before namespace removed, but
* namespace remove event received before setroot).
*/
if (!(root = kvsroot_mgr_lookup_root (ctx->krm, ns)))
Expand Down Expand Up @@ -2385,7 +2147,6 @@ static void stats_get_cb (flux_t *h,
json_t *tstats = NULL;
json_t *cstats = NULL;
json_t *txncstats = NULL;
json_t *txnfstats = NULL;
json_t *nsstats = NULL;
tstat_t ts = { 0 };
int size = 0, incomplete = 0, dirty = 0;
Expand Down Expand Up @@ -2414,9 +2175,6 @@ static void stats_get_cb (flux_t *h,
if (!(txncstats = get_tstat_obj (&ctx->txn_commit_stats, 1.0)))
goto nomem;

if (!(txnfstats = get_tstat_obj (&ctx->txn_fence_stats, 1.0)))
goto nomem;

if (!(nsstats = json_object ()))
goto nomem;

Expand Down Expand Up @@ -2445,18 +2203,16 @@ static void stats_get_cb (flux_t *h,

if (flux_respond_pack (h,
msg,
"{ s:O s:O s:{s:O s:O} s:i }",
"{ s:O s:O s:{s:O} s:i }",
"cache", cstats,
"namespace", nsstats,
"transaction-opcount",
"commit", txncstats,
"fence", txnfstats,
"pending_requests", zhashx_size (ctx->requests)) < 0)
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
json_decref (tstats);
json_decref (cstats);
json_decref (txncstats);
json_decref (txnfstats);
json_decref (nsstats);
return;
nomem:
Expand All @@ -2467,7 +2223,6 @@ static void stats_get_cb (flux_t *h,
json_decref (tstats);
json_decref (cstats);
json_decref (txncstats);
json_decref (txnfstats);
json_decref (nsstats);
}

Expand All @@ -2481,7 +2236,6 @@ static void stats_clear (struct kvs_ctx *ctx)
{
ctx->faults = 0;
memset (&ctx->txn_commit_stats, '\0', sizeof (ctx->txn_commit_stats));
memset (&ctx->txn_fence_stats, '\0', sizeof (ctx->txn_fence_stats));

if (kvsroot_mgr_iter_roots (ctx->krm, stats_clear_root_cb, NULL) < 0)
flux_log_error (ctx->h, "%s: kvsroot_mgr_iter_roots", __FUNCTION__);
Expand Down Expand Up @@ -3045,18 +2799,6 @@ static const struct flux_msg_handler_spec htab[] = {
relaycommit_request_cb,
0
},
{
FLUX_MSGTYPE_REQUEST,
"kvs.fence",
fence_request_cb,
FLUX_ROLE_USER
},
{
FLUX_MSGTYPE_REQUEST,
"kvs.relayfence",
relayfence_request_cb,
0
},
{
FLUX_MSGTYPE_REQUEST,
"kvs.namespace-create",
Expand Down
3 changes: 1 addition & 2 deletions src/modules/kvs/kvstxn.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ void kvstxn_mgr_destroy (kvstxn_mgr_t *ktm);
/* kvstxn_mgr_add_transaction() will internally create a kvstxn_t and
* store it in the queue of ready to process transactions.
*
* This should be called once per transaction (commit or fence)
* request.
* This should be called once per transaction request.
*/
int kvstxn_mgr_add_transaction (kvstxn_mgr_t *ktm,
const char *name,
Expand Down

0 comments on commit 044238c

Please sign in to comment.