diff --git a/doc/man3/flux_kvs_lookup.rst b/doc/man3/flux_kvs_lookup.rst index 04b81f2d4ce4..2c42d9a5b877 100644 --- a/doc/man3/flux_kvs_lookup.rst +++ b/doc/man3/flux_kvs_lookup.rst @@ -151,10 +151,9 @@ FLUX_KVS_WATCH_UNIQ FLUX_KVS_WATCH_APPEND Specified along with FLUX_KVS_WATCH, this flag will alter watch behavior to only respond when :var:`key` is mentioned verbatim in a - committed transaction and the key has been appended to. The response - will only contain the additional appended data. Note that only data - length is considered for appends and no guarantee is made that prior - data hasn't been overwritten. + committed transaction and the key has been appended to. The + response will only contain the additional appended data. If the + value is overwritten, the lookup fails with EINVAL. FLUX_KVS_WATCH_FULL Specified along with FLUX_KVS_WATCH, this flag will alter watch diff --git a/src/cmd/job/eventlog.c b/src/cmd/job/eventlog.c index 9e2febbfcbbc..8e9fa8becd86 100644 --- a/src/cmd/job/eventlog.c +++ b/src/cmd/job/eventlog.c @@ -360,6 +360,7 @@ static void wait_event_continuation (flux_future_t *f, void *arg) if (!ctx->quiet) { if (eventlog_entry_dumpf (ctx->evf, stdout, &error, o) < 0) log_err ("failed to print eventlog entry: %s", error.text); + fflush (stdout); } if (flux_job_event_watch_cancel (f) < 0) log_err_exit ("flux_job_event_watch_cancel"); @@ -367,6 +368,7 @@ static void wait_event_continuation (flux_future_t *f, void *arg) if (!ctx->got_event) { if (eventlog_entry_dumpf (ctx->evf, stdout, &error, o) < 0) log_err ("failed to print eventlog entry: %s", error.text); + fflush (stdout); } } diff --git a/src/common/libkvs/test/treeobj.c b/src/common/libkvs/test/treeobj.c index c257317f1913..d94a45aad55c 100644 --- a/src/common/libkvs/test/treeobj.c +++ b/src/common/libkvs/test/treeobj.c @@ -807,6 +807,45 @@ void test_corner_cases (void) json_decref (symlink); } +void test_type_name (void) +{ + json_t *val, *valref, *dir, *dirref, *symlink, *notatreeobj; + const char *s; + + val = treeobj_create_val ("a", 1); + valref = treeobj_create_valref (NULL); + dir = treeobj_create_dir (); + dirref = treeobj_create_dirref (NULL); + symlink = treeobj_create_symlink (NULL, "some-string"); + notatreeobj = json_object (); + if (!val || !valref || !dir || !dirref || !symlink || !notatreeobj) + BAIL_OUT ("can't continue without test value"); + + s = treeobj_type_name (val); + ok (streq (s, "val"), "treeobj_type_name returns val correctly"); + s = treeobj_type_name (valref); + ok (streq (s, "valref"), "treeobj_type_name returns valref correctly"); + s = treeobj_type_name (dir); + ok (streq (s, "dir"), "treeobj_type_name returns dir correctly"); + s = treeobj_type_name (dirref); + ok (streq (s, "dirref"), "treeobj_type_name returns dirref correctly"); + s = treeobj_type_name (symlink); + ok (streq (s, "symlink"), "treeobj_type_name returns symlink correctly"); + s = treeobj_type_name (notatreeobj); + ok (streq (s, "unknown"), + "treeobj_type_name returns unknown for non-treeobj"); + s = treeobj_type_name (NULL); + ok (streq (s, "unknown"), + "treeobj_type_name returns unknown on invalid input"); + + json_decref (val); + json_decref (valref); + json_decref (dir); + json_decref (dirref); + json_decref (symlink); + json_decref (notatreeobj); +} + int main(int argc, char** argv) { plan (NO_PLAN); @@ -820,6 +859,7 @@ int main(int argc, char** argv) test_deep_copy (); test_symlink (); test_corner_cases (); + test_type_name (); test_codec (); diff --git a/src/common/libkvs/treeobj.c b/src/common/libkvs/treeobj.c index ba5201a0d6e4..e9e167b70979 100644 --- a/src/common/libkvs/treeobj.c +++ b/src/common/libkvs/treeobj.c @@ -648,6 +648,21 @@ char *treeobj_encode (const json_t *obj) return json_dumps (obj, JSON_COMPACT|JSON_SORT_KEYS); } +const char *treeobj_type_name (const json_t *obj) +{ + if (treeobj_is_symlink (obj)) + return "symlink"; + else if (treeobj_is_val (obj)) + return "val"; + else if (treeobj_is_valref (obj)) + return "valref"; + else if (treeobj_is_dir (obj)) + return "dir"; + else if (treeobj_is_dirref (obj)) + return "dirref"; + return "unknown"; +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/common/libkvs/treeobj.h b/src/common/libkvs/treeobj.h index 9df1dd392379..867aaf00302b 100644 --- a/src/common/libkvs/treeobj.h +++ b/src/common/libkvs/treeobj.h @@ -138,6 +138,12 @@ json_t *treeobj_decode (const char *buf); json_t *treeobj_decodeb (const char *buf, size_t buflen); char *treeobj_encode (const json_t *obj); +/* Get treeobj type name + * Returns "symlink", "val", "valref", "dir", "dirref" or NULL if + * invalid treeobj. + */ +const char *treeobj_type_name (const json_t *obj); + #endif /* !_FLUX_KVS_TREEOBJ_H */ /* diff --git a/src/modules/kvs-watch/kvs-watch.c b/src/modules/kvs-watch/kvs-watch.c index 2c19a3ac576c..9de18ccacde2 100644 --- a/src/modules/kvs-watch/kvs-watch.c +++ b/src/modules/kvs-watch/kvs-watch.c @@ -21,6 +21,8 @@ #include "src/common/libkvs/treeobj.h" #include "src/common/libkvs/kvs_util_private.h" #include "src/common/libutil/blobref.h" +#include "src/common/libcontent/content.h" +#include "src/common/libutil/errprintf.h" /* State for one watcher */ struct watcher { @@ -37,10 +39,13 @@ struct watcher { char *key; // lookup key int flags; // kvs_lookup flags zlist_t *lookups; // list of futures, in commit order + zlist_t *loads; // list of futures, content loads in ref order struct ns_monitor *nsm; // back pointer for removal json_t *prev; // previous watch value for KVS_WATCH_FULL/UNIQ - int append_offset; // offset for KVS_WATCH_APPEND + bool index_valid; // flag if prev_start_index/prev_end_index set + int prev_start_index; // previous start index loaded + int prev_end_index; // previous end index loaded void *handle; // zlistx_t handle }; @@ -90,13 +95,20 @@ static void watcher_destroy (struct watcher *w) flux_future_destroy (f); zlist_destroy (&w->lookups); } + if (w->loads) { + flux_future_t *f; + while ((f = zlist_pop (w->loads))) + flux_future_destroy (f); + zlist_destroy (&w->loads); + } json_decref (w->prev); free (w); errno = saved_errno; } } -static struct watcher *watcher_create (const flux_msg_t *msg, const char *key, +static struct watcher *watcher_create (const flux_msg_t *msg, + const char *key, int flags) { struct watcher *w; @@ -110,6 +122,8 @@ static struct watcher *watcher_create (const flux_msg_t *msg, const char *key, goto error; if (!(w->lookups = zlist_new ())) goto error_nomem; + if (!(w->loads = zlist_new ())) + goto error_nomem; w->flags = flags; w->rootseq = -1; return w; @@ -132,7 +146,8 @@ static void commit_destroy (struct commit *commit) } } -static struct commit *commit_create (const char *rootref, int rootseq, +static struct commit *commit_create (const char *rootref, + int rootseq, json_t *keys) { struct commit *commit = calloc (1, sizeof (*commit)); @@ -228,7 +243,7 @@ static bool key_match (json_t *o, const char *key) static void watcher_cleanup (struct ns_monitor *nsm, struct watcher *w) { /* wait for all in flight lookups to complete before destroying watcher */ - if (zlist_size (w->lookups) == 0) + if (zlist_size (w->lookups) == 0 && zlist_size (w->loads) == 0) zlistx_delete (nsm->watchers, w->handle); /* if nsm->getrootf, destroy when getroot_continuation completes */ if (zlistx_size (nsm->watchers) == 0 @@ -236,11 +251,114 @@ static void watcher_cleanup (struct ns_monitor *nsm, struct watcher *w) zhash_delete (nsm->ctx->namespaces, nsm->ns_name); } +static void handle_load_response (flux_future_t *f, struct watcher *w) +{ + flux_t *h = flux_future_get_flux (f); + const void *data; + int size; + flux_error_t err; + + if (content_load_get (f, &data, &size) < 0) { + errprintf (&err, "failed to load content data"); + goto error_respond; + } + + if (!w->mute) { + json_t *val = treeobj_create_val (data, size); + if (!val) { + errprintf (&err, "failed to create treeobj value"); + goto error_respond; + } + if (flux_respond_pack (h, w->request, "{ s:o }", "val", val) < 0) { + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); + json_decref (val); + goto finished; + } + w->responded = true; + } + + return; +error_respond: + if (!w->mute) { + if (flux_respond_error (h, w->request, errno, err.text) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + } +finished: + w->finished = true; +} + +static void load_continuation (flux_future_t *f, void *arg) +{ + struct watcher *w = arg; + struct ns_monitor *nsm = w->nsm; + + while ((f = zlist_first (w->loads)) && flux_future_is_ready (f)) { + f = zlist_pop (w->loads); + if (!w->finished) + handle_load_response (f, w); + flux_future_destroy (f); + /* if WAITCREATE and !WATCH, then we only care about sending + * one response and being done. We can use the responded flag + * to indicate that condition. + */ + if (w->responded + && (w->flags & FLUX_KVS_WAITCREATE) + && !(w->flags & FLUX_KVS_WATCH)) + w->finished = true; + } + if (w->finished) + watcher_cleanup (nsm, w); +} + +static flux_future_t *load_ref (flux_t *h, struct watcher *w, const char *ref) +{ + flux_future_t *f = NULL; + + if (!(f = content_load_byblobref (h, ref, 0)) + || flux_future_then (f, -1., load_continuation, w) < 0) + goto error; + if (zlist_append (w->loads, f) < 0) { + errno = ENOMEM; + goto error; + } + + return f; + +error: + flux_future_destroy (f); + return NULL; +} + +static int load_range (flux_t *h, + struct watcher *w, + int start_index, + int end_index, + json_t *val) +{ + int i; + + for (i = start_index; i <= end_index; i++) { + flux_future_t *f; + const char *ref = treeobj_get_blobref (val, i); + if (!ref) + return -1; + if (!(f = load_ref (h, w, ref))) + return -1; + } + return 0; +} + static int handle_initial_response (flux_t *h, struct watcher *w, json_t *val, - int root_seq) + const char *root_ref, + int root_seq, + const char *namespace) { + flux_error_t err; + /* this is the first response case, store the first response * val */ if ((w->flags & FLUX_KVS_WATCH_FULL) @@ -248,14 +366,46 @@ static int handle_initial_response (flux_t *h, w->prev = json_incref (val); if ((w->flags & FLUX_KVS_WATCH_APPEND)) { - if (treeobj_decode_val (val, - NULL, - &w->append_offset) < 0) { - flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); + /* The very first response may be a 'val' treeobj instead of + * 'valref', if there have been no appends yet. + */ + if (treeobj_is_val (val)) { + w->index_valid = true; + w->prev_start_index = 0; + w->prev_end_index = 0; + /* since this is a val object, we can just return it */ + goto out; + } + else if (treeobj_is_valref (val)) { + w->index_valid = true; + w->prev_start_index = 0; + w->prev_end_index = treeobj_get_count (val) - 1; + } + else { + errprintf (&err, + "%s cannot be watched with WATCH_APPEND", + treeobj_type_name (val)); + errno = EINVAL; goto error_respond; } + + if (load_range (h, + w, + w->prev_start_index, + w->prev_end_index, + val) < 0) { + errprintf (&err, + "error sending request for content blobs [%d:%d]", + w->prev_start_index, + w->prev_end_index); + goto error_respond; + } + + w->initial_rootseq = root_seq; + return 0; } +out: if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { flux_log_error (h, "%s: failed to respond to kvs-watch.lookup", @@ -269,7 +419,7 @@ static int handle_initial_response (flux_t *h, error_respond: if (!w->mute) { - if (flux_respond_error (h, w->request, errno, NULL) < 0) + if (flux_respond_error (h, w->request, errno, err.text) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } return -1; @@ -316,78 +466,127 @@ static int handle_compare_response (flux_t *h, static int handle_append_response (flux_t *h, struct watcher *w, - json_t *val) + json_t *val, + const char *root_ref, + int root_seq, + const char *namespace) { + flux_error_t err; + if (!w->responded) { /* this is the first response case, store the first response * info. This is here b/c initial response could have been - * ENOENT case */ - if (treeobj_decode_val (val, - NULL, - &w->append_offset) < 0) { - flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - goto error_respond; + * ENOENT case. + * + * The very first response may be a 'val' treeobj instead of + * 'valref', if there have been no appends yet. + */ + if (treeobj_is_val (val)) { + w->index_valid = true; + w->prev_start_index = 0; + w->prev_end_index = 0; + /* since this is a val object, we can just return it */ + if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { + flux_log_error (h, + "%s: failed to respond to kvs-watch.lookup", + __FUNCTION__); + goto error_out; + } + w->responded = true; } + else if (treeobj_is_valref (val)) { + /* N.B. It may not be obvious why we have to check + * w->index_valid if we have not yet responded. It is + * possible we have received a setroot response and an + * updated valref before loads from the content store have + * returned to the caller. + */ + if (w->index_valid) { + int new_end_index = treeobj_get_count (val) - 1; + if (new_end_index > w->prev_end_index) { + w->prev_start_index = w->prev_end_index + 1; + w->prev_end_index = new_end_index; + } + else if (new_end_index < w->prev_end_index) { + errprintf (&err, "key watched with WATCH_APPEND truncated"); + errno = EINVAL; + goto error_respond; + } + else + goto out; + } + else { + w->index_valid = true; + w->prev_start_index = 0; + w->prev_end_index = treeobj_get_count (val) - 1; + } - if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, - "%s: failed to respond to kvs-watch.lookup", - __FUNCTION__); - return -1; + if (load_range (h, + w, + w->prev_start_index, + w->prev_end_index, + val) < 0) { + errprintf (&err, + "error sending request for content blobs [%d:%d]", + w->prev_start_index, + w->prev_end_index); + goto error_respond; + } + } + else { + errprintf (&err, + "%s cannot be watched with WATCH_APPEND", + treeobj_type_name (val)); + errno = EINVAL; + goto error_respond; } - - w->responded = true; } else { - json_t *new_val = NULL; - void *new_data = NULL; - int new_offset; - - if (treeobj_decode_val (val, - &new_data, - &new_offset) < 0) { - flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - goto error_respond; + if (treeobj_is_valref (val)) { + int new_end_index; + if (!w->index_valid) { + errno = EPROTO; + goto error_respond; + } + new_end_index = treeobj_get_count (val) - 1; + if (new_end_index > w->prev_end_index) { + w->prev_start_index = w->prev_end_index + 1; + w->prev_end_index = new_end_index; + } + else if (new_end_index < w->prev_end_index) { + errprintf (&err, "key watched with WATCH_APPEND shortened"); + errno = EINVAL; + goto error_respond; + } + else + goto out; + + if (load_range (h, + w, + w->prev_start_index, + w->prev_end_index, + val) < 0) { + errprintf (&err, "error loading reference"); + goto error_respond; + } } - - /* check length to determine if append actually happened, note - * that zero length append is legal - * - * Note that this check does not ensure that the key was not - * "fake" appended to. i.e. the key overwritten with data - * longer than the original. - */ - if (new_offset < w->append_offset) { - free (new_data); + else { + errprintf (&err, + "value of key watched with WATCH_APPEND overwritten"); errno = EINVAL; goto error_respond; } - - if (!(new_val = treeobj_create_val (new_data + w->append_offset, - new_offset - w->append_offset))) { - free (new_data); - goto error_respond; - } - - free (new_data); - w->append_offset = new_offset; - - if (flux_respond_pack (h, w->request, "{ s:o }", "val", new_val) < 0) { - json_decref (new_val); - flux_log_error (h, - "%s: failed to respond to kvs-watch.lookup", - __FUNCTION__); - return -1; - } } +out: return 0; error_respond: if (!w->mute) { - if (flux_respond_error (h, w->request, errno, NULL) < 0) + if (flux_respond_error (h, w->request, errno, err.text) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } +error_out: return -1; } @@ -418,6 +617,7 @@ static void handle_lookup_response (flux_future_t *f, { flux_t *h = flux_future_get_flux (f); int errnum; + const char *root_ref; int root_seq; json_t *val; @@ -426,7 +626,8 @@ static void handle_lookup_response (flux_future_t *f, w->initial_rpc_received = true; /* First check for ENOENT */ - if (!flux_rpc_get_unpack (f, "{ s:i s:i }", + if (!flux_rpc_get_unpack (f, + "{ s:i s:i }", "errno", &errnum, "rootseq", &root_seq)) { assert (errnum == ENOENT); @@ -439,8 +640,10 @@ static void handle_lookup_response (flux_future_t *f, goto error; } - if (flux_rpc_get_unpack (f, "{ s:o s:i }", + if (flux_rpc_get_unpack (f, + "{ s:o s:s s:i }", "val", &val, + "rootref", &root_ref, "rootseq", &root_seq) < 0) { /* It is worth mentioning ENOTSUP error conditions here. * @@ -462,7 +665,12 @@ static void handle_lookup_response (flux_future_t *f, goto error; } - if (handle_initial_response (h, w, val, root_seq) < 0) + if (handle_initial_response (h, + w, + val, + root_ref, + root_seq, + w->nsm->ns_name) < 0) goto finished; } else { @@ -473,8 +681,10 @@ static void handle_lookup_response (flux_future_t *f, goto error; } - if (flux_rpc_get_unpack (f, "{ s:o s:i }", + if (flux_rpc_get_unpack (f, + "{ s:o s:s s:i }", "val", &val, + "rootref", &root_ref, "rootseq", &root_seq) < 0) goto error; @@ -490,7 +700,12 @@ static void handle_lookup_response (flux_future_t *f, goto finished; } else if (w->flags & FLUX_KVS_WATCH_APPEND) { - if (handle_append_response (h, w, val) < 0) + if (handle_append_response (h, + w, + val, + root_ref, + root_seq, + w->nsm->ns_name) < 0) goto finished; } else { @@ -554,22 +769,27 @@ static flux_future_t *lookupat (flux_t *h, json_t *o = NULL; flux_future_t *f; int saved_errno; + int flags = w->flags; if (!(msg = flux_request_encode ("kvs.lookup-plus", NULL))) return NULL; + if (flags & FLUX_KVS_WATCH_APPEND) + flags |= FLUX_KVS_TREEOBJ; if (!w->initial_rpc_sent) { - if (flux_msg_pack (msg, "{s:s s:s s:i}", + if (flux_msg_pack (msg, + "{s:s s:s s:i}", "key", w->key, "namespace", ns, - "flags", w->flags) < 0) + "flags", flags) < 0) goto error; } else { if (!(o = treeobj_create_dirref (blobref))) goto error; - if (flux_msg_pack (msg, "{s:s s:i s:i s:O}", + if (flux_msg_pack (msg, + "{s:s s:i s:i s:O}", "key", w->key, - "flags", w->flags, + "flags", flags, "rootseq", root_seq, "rootdir", o) < 0) goto error; @@ -817,8 +1037,10 @@ static void watcher_cancel_all (struct watch_ctx *ctx, /* kvs.namespace-removed-* event * A namespace has been removed. All watchers should receive ENOTSUP. */ -static void removed_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void removed_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; const char *ns; @@ -838,8 +1060,10 @@ static void removed_cb (flux_t *h, flux_msg_handler_t *mh, * Update namespace with new namespace info. * N.B. commit->keys is empty in this case, in contrast setroot_cb(). */ -static void namespace_created_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void namespace_created_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; struct ns_monitor *nsm; @@ -849,7 +1073,9 @@ static void namespace_created_cb (flux_t *h, flux_msg_handler_t *mh, int owner; struct commit *commit; - if (flux_event_unpack (msg, NULL, "{s:s s:i s:s s:i}", + if (flux_event_unpack (msg, + NULL, + "{s:s s:i s:s s:i}", "namespace", &ns, "rootseq", &rootseq, "rootref", &rootref, @@ -876,8 +1102,10 @@ static void namespace_created_cb (flux_t *h, flux_msg_handler_t *mh, * Update namespace with new commit info. * Subscribe/unsubscribe is tied to 'struct ns_monitor' create/destroy. */ -static void setroot_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void setroot_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; struct ns_monitor *nsm; @@ -888,7 +1116,9 @@ static void setroot_cb (flux_t *h, flux_msg_handler_t *mh, json_t *keys; struct commit *commit; - if (flux_event_unpack (msg, NULL, "{s:s s:i s:s s:i s:o}", + if (flux_event_unpack (msg, + NULL, + "{s:s s:i s:s s:i s:o}", "namespace", &ns, "rootseq", &rootseq, "rootref", &rootref, @@ -989,8 +1219,10 @@ struct ns_monitor *namespace_monitor (struct watch_ctx *ctx, return nsm; } -static void lookup_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void lookup_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; const char *ns; @@ -1000,7 +1232,9 @@ static void lookup_cb (flux_t *h, flux_msg_handler_t *mh, struct watcher *w; const char *errmsg = NULL; - if (flux_request_unpack (msg, NULL, "{s:s s:s s:i}", + if (flux_request_unpack (msg, + NULL, + "{s:s s:s s:i}", "namespace", &ns, "key", &key, "flags", &flags) < 0) @@ -1038,8 +1272,10 @@ static void lookup_cb (flux_t *h, flux_msg_handler_t *mh, * The user called flux_kvs_lookup_cancel() which expects no response. * The watcher will receive an ENODATA response message. */ -static void cancel_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void cancel_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; watcher_cancel_all (ctx, msg, true); @@ -1049,8 +1285,10 @@ static void cancel_cb (flux_t *h, flux_msg_handler_t *mh, * This is sent automatically upon local connector disconnect. * The disconnect sender is used to find any watchers to be canceled. */ -static void disconnect_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void disconnect_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; watcher_cancel_all (ctx, msg, false); @@ -1058,8 +1296,10 @@ static void disconnect_cb (flux_t *h, flux_msg_handler_t *mh, /* kvs-watch.stats-get request */ -static void stats_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void stats_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct watch_ctx *ctx = arg; struct ns_monitor *nsm; @@ -1086,7 +1326,9 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh, watchers += zlistx_size (nsm->watchers); nsm = zhash_next (ctx->namespaces); } - if (flux_respond_pack (h, msg, "{s:i s:i s:O}", + if (flux_respond_pack (h, + msg, + "{s:i s:i s:O}", "watchers", watchers, "namespace-count", (int)zhash_size (ctx->namespaces), "namespaces", stats) < 0) diff --git a/t/t1007-kvs-lookup-watch.t b/t/t1007-kvs-lookup-watch.t index 992239101c99..a273f62b5b97 100755 --- a/t/t1007-kvs-lookup-watch.t +++ b/t/t1007-kvs-lookup-watch.t @@ -264,6 +264,31 @@ f test_cmp expected append1.out ' +# N.B. When the data is small `flux kvs put foo=...` create a "val" treeobj. +# when the value is larger, it creates a "valref" treeobj +largeval="abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + +test_expect_success NO_CHAIN_LINT 'flux kvs get: basic --watch & --append works (initial valref)' ' + flux kvs unlink -Rf test && + echo -n ${largeval} | flux kvs put --raw test.append.test=- && + flux kvs get --treeobj test.append.test | grep valref && + flux kvs get --watch --append --count=4 \ + test.append.test > append1.out 2>&1 & + pid=$! && + wait_watcherscount_nonzero primary && + flux kvs put --append test.append.test="1" && + flux kvs put --append test.append.test="2" && + flux kvs put --append test.append.test="3" && + wait $pid && + cat >expected <<-EOF && +abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz +1 +2 +3 + EOF + test_cmp expected append1.out +' + test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works with empty string' ' flux kvs unlink -Rf test && flux kvs put test.append.test="abc" && @@ -305,7 +330,7 @@ f test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works with multiple appends in a transaction' ' flux kvs unlink -Rf test && - flux kvs get --watch --waitcreate --append --count=4 \ + flux kvs get --watch --waitcreate --append --count=7 \ test.append.test > append4.out 2>&1 & pid=$! && wait_watcherscount_nonzero primary && @@ -316,9 +341,12 @@ test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works with multiple ap wait $pid && cat >expected <<-EOF && abc -de -fg -hi +d +e +f +g +h +i EOF test_cmp expected append4.out ' @@ -358,6 +386,7 @@ flux-kvs: test.append.test: No such file or directory test_cmp expected append5.out ' +# N.B. valref treeobj expected, but treeobj is now a dirref test_expect_success NO_CHAIN_LINT 'flux kvs get: --append fails on change to non-value' ' flux kvs unlink -Rf test && flux kvs put test.append.test="abc" && @@ -373,12 +402,13 @@ test_expect_success NO_CHAIN_LINT 'flux kvs get: --append fails on change to non abc d e -flux-kvs: test.append.test: Is a directory +flux-kvs: test.append.test: Invalid argument EOF test_cmp expected append6.out ' -test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works on fake append' ' +# N.B. valref treeobj expected, but treeobj is now a val +test_expect_success NO_CHAIN_LINT 'flux kvs get: --append fails on fake append' ' flux kvs unlink -Rf test && flux kvs put test.append.test="abc" && flux kvs get --watch --append --count=4 \ @@ -388,73 +418,22 @@ test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works on fake append' flux kvs put --append test.append.test="d" && flux kvs put --append test.append.test="e" && flux kvs put test.append.test="abcdef" && - wait $pid && - cat >expected <<-EOF && -abc -d -e -f - EOF - test_cmp expected append7.out + test_must_fail wait $pid ' -test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works on fake append wiping data' ' +# N.B. valref treeobj now has fewer entries +test_expect_success NO_CHAIN_LINT 'flux kvs get: --append fails on fake append (valref)' ' flux kvs unlink -Rf test && flux kvs put test.append.test="abc" && flux kvs get --watch --append --count=4 \ - test.append.test > append8.out 2>&1 & - pid=$! && - wait_watcherscount_nonzero primary && - flux kvs put --append test.append.test="d" && - flux kvs put --append test.append.test="e" && - flux kvs put test.append.test="foobar" && - wait $pid && - cat >expected <<-EOF && -abc -d -e -r - EOF - test_cmp expected append8.out -' - -test_expect_success NO_CHAIN_LINT 'flux kvs get: --append works on fake zero length append' ' - flux kvs unlink -Rf test && - flux kvs put test.append.test="abc" && - flux kvs get --watch --append --count=4 \ - test.append.test > append9.out 2>&1 & - pid=$! && - wait_watcherscount_nonzero primary && - flux kvs put --append test.append.test="d" && - flux kvs put --append test.append.test="e" && - flux kvs put test.append.test="abcde" && - wait $pid && - cat >expected <<-EOF && -abc -d -e - EOF - test_cmp expected append9.out -' - -test_expect_success NO_CHAIN_LINT 'flux kvs get: --append fails on shortened write' ' - flux kvs unlink -Rf test && - flux kvs put test.append.test="abc" && - flux kvs get --watch --append --count=4 \ - test.append.test > append10.out 2>&1 & + test.append.test > append7.out 2>&1 & pid=$! && wait_watcherscount_nonzero primary && flux kvs put --append test.append.test="d" && flux kvs put --append test.append.test="e" && - flux kvs put test.append.test="foo" && - ! wait $pid && - cat >expected <<-EOF && -abc -d -e -flux-kvs: test.append.test: Invalid argument - EOF - test_cmp expected append10.out + echo -n ${largeval} | flux kvs put --raw test.append.test=- && + flux kvs get --treeobj test.append.test | grep valref && + test_must_fail wait $pid ' # full checks