Skip to content

Commit

Permalink
Merge pull request #6615 from chu11/kvs_cleanup3
Browse files Browse the repository at this point in the history
kvs: misc cleanups
  • Loading branch information
mergify[bot] authored Feb 8, 2025
2 parents ef29673 + a1b2c06 commit c205af7
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 268 deletions.
147 changes: 53 additions & 94 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "src/common/libutil/tstat.h"
#include "src/common/libutil/timestamp.h"
#include "src/common/libutil/errprintf.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libkvs/treeobj.h"
#include "src/common/libkvs/kvs_checkpoint.h"
#include "src/common/libkvs/kvs_txn_private.h"
Expand Down Expand Up @@ -328,7 +329,6 @@ static void getroot_completion (flux_future_t *f, void *arg)
uint32_t owner;
const char *ref;
struct kvsroot *root;
int save_errno;

msg = flux_future_aux_get (f, "msg");
assert (msg);
Expand Down Expand Up @@ -361,7 +361,7 @@ static void getroot_completion (flux_future_t *f, void *arg)
}

if (event_subscribe (ctx, ns) < 0) {
save_errno = errno;
int save_errno = errno;
kvsroot_mgr_remove_root (ctx->krm, ns);
errno = save_errno;
flux_log_error (ctx->h, "%s: event_subscribe", __FUNCTION__);
Expand All @@ -380,7 +380,6 @@ static void getroot_completion (flux_future_t *f, void *arg)
goto error;
}

flux_msg_destroy (msg);
flux_future_destroy (f);
return;

Expand All @@ -394,18 +393,15 @@ static void getroot_completion (flux_future_t *f, void *arg)
* will deal with the success case.
*/
request_tracking_remove (ctx, msg);
flux_msg_destroy (msg);
flux_future_destroy (f);
}

static int getroot_request_send (struct kvs_ctx *ctx,
const char *ns,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
lookup_t *lh)
const flux_msg_t *msg)
{
flux_future_t *f = NULL;
flux_msg_t *msgcpy = NULL;
int saved_errno;

if (!(f = flux_rpc_pack (ctx->h,
Expand All @@ -416,20 +412,12 @@ static int getroot_request_send (struct kvs_ctx *ctx,
"namespace", ns)))
goto error;

if (!(msgcpy = flux_msg_copy (msg, true))) {
flux_log_error (ctx->h, "%s: flux_msg_copy", __FUNCTION__);
goto error;
}

if (lh
&& flux_msg_aux_set (msgcpy, "lookup_handle", lh, NULL) < 0) {
flux_log_error (ctx->h, "%s: flux_msg_aux_set", __FUNCTION__);
goto error;
}

/* we will manage destruction of the 'msg' on errors */
if (flux_future_aux_set (f, "msg", msgcpy, NULL) < 0) {
if (flux_future_aux_set (f,
"msg",
(void *)flux_msg_incref (msg),
(flux_free_f)flux_msg_decref) < 0) {
flux_log_error (ctx->h, "%s: flux_future_aux_set", __FUNCTION__);
flux_msg_decref (msg);
goto error;
}

Expand All @@ -439,7 +427,6 @@ static int getroot_request_send (struct kvs_ctx *ctx,
return 0;
error:
saved_errno = errno;
flux_msg_destroy (msgcpy);
flux_future_destroy (f);
errno = saved_errno;
return -1;
Expand All @@ -449,7 +436,6 @@ static struct kvsroot *getroot (struct kvs_ctx *ctx,
const char *ns,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
lookup_t *lh,
bool *stall)
{
struct kvsroot *root;
Expand All @@ -462,7 +448,7 @@ static struct kvsroot *getroot (struct kvs_ctx *ctx,
return NULL;
}
else {
if (getroot_request_send (ctx, ns, mh, msg, lh) < 0) {
if (getroot_request_send (ctx, ns, mh, msg) < 0) {
flux_log_error (ctx->h, "getroot_request_send");
return NULL;
}
Expand Down Expand Up @@ -738,26 +724,19 @@ static int content_store_request_send (struct kvs_ctx *ctx,
int len)
{
flux_future_t *f;
int saved_errno, rc = -1;

if (!(f = content_store (ctx->h, data, len, 0)))
return -1;
if (flux_future_aux_set (f, "cache_blobref", (void *)blobref, NULL) < 0)
goto error;
if (flux_future_aux_set (f, "cache_blobref", (void *)blobref, NULL) < 0) {
saved_errno = errno;
flux_future_destroy (f);
errno = saved_errno;
goto error;
}
if (flux_future_then (f, -1., content_store_completion, ctx) < 0) {
saved_errno = errno;
flux_future_destroy (f);
errno = saved_errno;
if (flux_future_then (f, -1., content_store_completion, ctx) < 0)
goto error;
}

rc = 0;
return 0;

error:
return rc;
flux_future_destroy (f);
return -1;
}

static int kvstxn_load_cb (kvstxn_t *kt, const char *ref, void *data)
Expand Down Expand Up @@ -826,14 +805,13 @@ static int setroot_event_send (struct kvs_ctx *ctx,
{
flux_msg_t *msg = NULL;
char *setroot_topic = NULL;
int saved_errno, rc = -1;
int rc = -1;

assert (ctx->rank == 0);

if (asprintf (&setroot_topic,
"kvs.namespace-%s-setroot",
root->ns_name) < 0) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: asprintf", __FUNCTION__);
goto done;
}
Expand All @@ -846,24 +824,17 @@ static int setroot_event_send (struct kvs_ctx *ctx,
"names", names,
"keys", keys,
"owner", root->owner))) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: flux_event_pack", __FUNCTION__);
goto done;
}
if (flux_msg_set_private (msg) < 0) {
saved_errno = errno;
if (flux_msg_set_private (msg) < 0)
goto done;
}
if (flux_send (ctx->h, msg, 0) < 0) {
saved_errno = errno;
if (flux_send (ctx->h, msg, 0) < 0)
goto done;
}
rc = 0;
done:
free (setroot_topic);
ERRNO_SAFE_WRAP (free, setroot_topic);
flux_msg_destroy (msg);
if (rc < 0)
errno = saved_errno;
return rc;
}

Expand All @@ -874,10 +845,9 @@ static int error_event_send (struct kvs_ctx *ctx,
{
flux_msg_t *msg = NULL;
char *error_topic = NULL;
int saved_errno, rc = -1;
int rc = -1;

if (asprintf (&error_topic, "kvs.namespace-%s-error", ns) < 0) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: asprintf", __FUNCTION__);
goto done;
}
Expand All @@ -887,24 +857,17 @@ static int error_event_send (struct kvs_ctx *ctx,
"namespace", ns,
"names", names,
"errnum", errnum))) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: flux_event_pack", __FUNCTION__);
goto done;
}
if (flux_msg_set_private (msg) < 0) {
saved_errno = errno;
if (flux_msg_set_private (msg) < 0)
goto done;
}
if (flux_send (ctx->h, msg, 0) < 0) {
saved_errno = errno;
if (flux_send (ctx->h, msg, 0) < 0)
goto done;
}
rc = 0;
done:
free (error_topic);
ERRNO_SAFE_WRAP (free, error_topic);
flux_msg_destroy (msg);
if (rc < 0)
errno = saved_errno;
return rc;
}

Expand All @@ -914,16 +877,15 @@ static int error_event_send_to_name (struct kvs_ctx *ctx,
int errnum)
{
json_t *names = NULL;
int rc = -1;
int rc;

if (!(names = json_pack ("[ s ]", name))) {
errno = ENOMEM;
flux_log_error (ctx->h, "%s: json_pack", __FUNCTION__);
goto done;
return -1;
}

rc = error_event_send (ctx, ns, names, errnum);
done:
json_decref (names);
return rc;
}
Expand Down Expand Up @@ -1438,25 +1400,32 @@ static lookup_t *lookup_common (flux_t *h,
ns = lookup_missing_namespace (lh);
assert (ns);

root = getroot (ctx, ns, mh, msg, lh, &stall);
root = getroot (ctx, ns, mh, msg, &stall);
assert (!root);

if (stall)
if (stall) {
if (flux_msg_aux_set (msg, "lookup_handle", lh, NULL) < 0) {
flux_log_error (ctx->h, "%s: flux_msg_aux_set", __FUNCTION__);
goto done;
}
goto stall;
}
goto done;
}
else if (lret == LOOKUP_PROCESS_LOAD_MISSING_REFS) {
struct kvs_cb_data cbd;

if (!(wait = wait_create_msg_handler (h, mh, msg, ctx, replay_cb)))
/* do not destroy lookup_handle on message destruction, we
* manage it in here */
if (flux_msg_aux_set (msg, "lookup_handle", lh, NULL) < 0) {
flux_log_error (ctx->h, "%s: flux_msg_aux_set", __FUNCTION__);
goto done;
}

if (wait_set_error_cb (wait, lookup_wait_error_cb, lh) < 0)
if (!(wait = wait_create_msg_handler (h, mh, msg, ctx, replay_cb)))
goto done;

/* do not destroy lookup_handle on message destruction, we
* manage it in here */
if (wait_msg_aux_set (wait, "lookup_handle", lh, NULL) < 0)
if (wait_set_error_cb (wait, lookup_wait_error_cb, lh) < 0)
goto done;

cbd.ctx = ctx;
Expand Down Expand Up @@ -1764,7 +1733,7 @@ static void commit_request_cb (flux_t *h,
goto error;
}

if (!(root = getroot (ctx, ns, mh, msg, NULL, &stall))) {
if (!(root = getroot (ctx, ns, mh, msg, &stall))) {
if (stall) {
request_tracking_add (ctx, msg);
return;
Expand Down Expand Up @@ -1852,7 +1821,7 @@ static void wait_version_request_cb (flux_t *h,
goto error;
}

if (!(root = getroot (ctx, ns, mh, msg, NULL, &stall))) {
if (!(root = getroot (ctx, ns, mh, msg, &stall))) {
if (stall) {
request_tracking_add (ctx, msg);
return;
Expand Down Expand Up @@ -1918,7 +1887,7 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
* first.
*/
bool stall = false;
if (!(root = getroot (ctx, ns, mh, msg, NULL, &stall))) {
if (!(root = getroot (ctx, ns, mh, msg, &stall))) {
if (stall) {
request_tracking_add (ctx, msg);
return;
Expand Down Expand Up @@ -2382,7 +2351,7 @@ static void start_root_remove (struct kvs_ctx *ctx, const char *ns)
static int namespace_remove (struct kvs_ctx *ctx, const char *ns)
{
flux_msg_t *msg = NULL;
int saved_errno, rc = -1;
int rc = -1;
char *topic = NULL;

/* Namespace doesn't exist or is already in process of being
Expand All @@ -2392,32 +2361,23 @@ static int namespace_remove (struct kvs_ctx *ctx, const char *ns)
goto done;
}

if (asprintf (&topic, "kvs.namespace-%s-removed", ns) < 0) {
saved_errno = errno;
if (asprintf (&topic, "kvs.namespace-%s-removed", ns) < 0)
goto cleanup;
}
if (!(msg = flux_event_pack (topic, "{ s:s }", "namespace", ns))) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: flux_event_pack", __FUNCTION__);
goto cleanup;
}
if (flux_msg_set_private (msg) < 0) {
saved_errno = errno;
if (flux_msg_set_private (msg) < 0)
goto cleanup;
}
if (flux_send (ctx->h, msg, 0) < 0) {
saved_errno = errno;
if (flux_send (ctx->h, msg, 0) < 0)
goto cleanup;
}

start_root_remove (ctx, ns);
done:
rc = 0;
cleanup:
flux_msg_destroy (msg);
free (topic);
if (rc < 0)
errno = saved_errno;
ERRNO_SAFE_WRAP (free, topic);
return rc;
}

Expand Down Expand Up @@ -2543,7 +2503,7 @@ static void setroot_pause_request_cb (flux_t *h,
goto error;
}

if (!(root = getroot (ctx, ns, mh, msg, NULL, &stall))) {
if (!(root = getroot (ctx, ns, mh, msg, &stall))) {
if (stall)
return;
goto error;
Expand Down Expand Up @@ -2608,7 +2568,7 @@ static void setroot_unpause_request_cb (flux_t *h,
goto error;
}

if (!(root = getroot (ctx, ns, mh, msg, NULL, &stall))) {
if (!(root = getroot (ctx, ns, mh, msg, &stall))) {
if (stall)
return;
goto error;
Expand Down Expand Up @@ -2947,12 +2907,11 @@ static int store_initial_rootdir (struct kvs_ctx *ctx, char *ref, int ref_len)
saved_errno = errno;
ret = cache_remove_entry (ctx->cache, ref);
assert (ret == 1);
errno = saved_errno;
error:
saved_errno = errno;
free (data);
ERRNO_SAFE_WRAP (free, data);
flux_future_destroy (f);
json_decref (rootdir);
errno = saved_errno;
ERRNO_SAFE_WRAP (json_decref, rootdir);
return -1;
}

Expand Down
Loading

0 comments on commit c205af7

Please sign in to comment.