diff --git a/src/modules/kvs-watch/kvs-watch.c b/src/modules/kvs-watch/kvs-watch.c index 0329fa7dd5fa..6c4129c5492d 100644 --- a/src/modules/kvs-watch/kvs-watch.c +++ b/src/modules/kvs-watch/kvs-watch.c @@ -21,6 +21,7 @@ #include "src/common/libkvs/treeobj.h" #include "src/common/libkvs/kvs_util_private.h" #include "src/common/libutil/blobref.h" +#include "src/common/libcontent/content.h" /* State for one watcher */ struct watcher { @@ -37,10 +38,13 @@ struct watcher { char *key; // lookup key int flags; // kvs_lookup flags zlist_t *lookups; // list of futures, in commit order + zlist_t *loads; // list of futures, content loads in ref order struct ns_monitor *nsm; // back pointer for removal json_t *prev; // previous watch value for KVS_WATCH_FULL/UNIQ - int append_offset; // offset for KVS_WATCH_APPEND + bool index_set; // flag if prev_start_index/prev_end_index set + int prev_start_index; // previous start index loaded + int prev_end_index; // previous end index loaded void *handle; // zlistx_t handle }; @@ -90,6 +94,12 @@ static void watcher_destroy (struct watcher *w) flux_future_destroy (f); zlist_destroy (&w->lookups); } + if (w->loads) { + flux_future_t *f; + while ((f = zlist_pop (w->loads))) + flux_future_destroy (f); + zlist_destroy (&w->loads); + } json_decref (w->prev); free (w); errno = saved_errno; @@ -110,6 +120,8 @@ static struct watcher *watcher_create (const flux_msg_t *msg, const char *key, goto error; if (!(w->lookups = zlist_new ())) goto error_nomem; + if (!(w->loads = zlist_new ())) + goto error_nomem; w->flags = flags; w->rootseq = -1; return w; @@ -228,7 +240,7 @@ static bool key_match (json_t *o, const char *key) static void watcher_cleanup (struct ns_monitor *nsm, struct watcher *w) { /* wait for all in flight lookups to complete before destroying watcher */ - if (zlist_size (w->lookups) == 0) + if (zlist_size (w->lookups) == 0 && zlist_size (w->loads) == 0) zlistx_delete (nsm->watchers, w->handle); /* if nsm->getrootf, destroy when getroot_continuation completes */ if (zlistx_size (nsm->watchers) == 0 @@ -236,10 +248,120 @@ static void watcher_cleanup (struct ns_monitor *nsm, struct watcher *w) zhash_delete (nsm->ctx->namespaces, nsm->ns_name); } +static void handle_load_response (flux_future_t *f, struct watcher *w) +{ + flux_t *h = flux_future_get_flux (f); + const void *data; + int size; + + if (content_load_get (f, &data, &size) < 0) { + flux_log_error (h, "failed to load content data"); + goto error; + } + + if (!w->mute) { + json_t *val = treeobj_create_val (data, size); + if (!val) { + flux_log_error (h, "%s: treeobj_create_val", __FUNCTION__); + goto error; + } + if (flux_respond_pack (h, w->request, "{ s:o }", "val", val) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + json_decref (val); + goto error; + } + w->responded = true; + } + + return; +error: + if (!w->mute) { + if (flux_respond_error (h, w->request, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + } + w->finished = true; +} + +static void load_continuation (flux_future_t *f, void *arg) +{ + struct watcher *w = arg; + struct ns_monitor *nsm = w->nsm; + + while ((f = zlist_first (w->loads)) && flux_future_is_ready (f)) { + f = zlist_pop (w->loads); + if (!w->finished) + handle_load_response (f, w); + flux_future_destroy (f); + /* if WAITCREATE and !WATCH, then we only care about sending + * one response and being done. We can use the responded flag + * to indicate that condition. + */ + if (w->responded + && (w->flags & FLUX_KVS_WAITCREATE) + && !(w->flags & FLUX_KVS_WATCH)) + w->finished = true; + } + if (w->finished) + watcher_cleanup (nsm, w); +} + +static flux_future_t *load_ref (flux_t *h, + struct watcher *w, + const char *ref) +{ + flux_future_t *f = NULL; + int saved_errno; + + if (!(f = content_load_byblobref (h, ref, 0))) { + flux_log_error (h, "%s: content_load_byblobref", __FUNCTION__); + goto error; + } + if (flux_future_then (f, -1., load_continuation, w) < 0) { + flux_log_error (h, "%s: flux_future_then", __FUNCTION__); + goto error; + } + if (zlist_append (w->loads, f) < 0) { + errno = ENOMEM; + goto error; + } + + return f; + +error: + saved_errno = errno; + flux_future_destroy (f); + errno = saved_errno; + return NULL; +} + +static int load_range (flux_t *h, + struct watcher *w, + int start_index, + int end_index, + json_t *val) +{ + int i; + + for (i = start_index; i <= end_index; i++) { + flux_future_t *f; + const char *ref = treeobj_get_blobref (val, i); + if (!ref) { + flux_log_error (h, "%s: treeobj_get_blobref", __FUNCTION__); + return -1; + } + if (!(f = load_ref (h, w, ref))) + return -1; + } + return 0; +} + static int handle_initial_response (flux_t *h, struct watcher *w, json_t *val, - int root_seq) + const char *root_ref, + int root_seq, + const char *namespace, + const char **errmsg) { /* this is the first response case, store the first response * val */ @@ -248,14 +370,39 @@ static int handle_initial_response (flux_t *h, w->prev = json_incref (val); if ((w->flags & FLUX_KVS_WATCH_APPEND)) { - if (treeobj_decode_val (val, - NULL, - &w->append_offset) < 0) { - flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); + /* The very first response may be a 'val' treeobj instead of + * 'valref', if there have been no appends yet. + */ + if (treeobj_is_val (val)) { + w->index_set = true; + w->prev_start_index = 0; + w->prev_end_index = 0; + /* since this is a val object, we can just return it */ + goto out; + } + else if (treeobj_is_valref (val)) { + w->index_set = true; + w->prev_start_index = 0; + w->prev_end_index = treeobj_get_count (val) - 1; + } + else { + (*errmsg) = "key watched with WATCH_APPEND is not a value"; + errno = EINVAL; return -1; } + + if (load_range (h, + w, + w->prev_start_index, + w->prev_end_index, + val) < 0) + return -1; + + w->initial_rootseq = root_seq; + return 0; } +out: if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); return -1; @@ -303,67 +450,105 @@ static int handle_compare_response (flux_t *h, static int handle_append_response (flux_t *h, struct watcher *w, - json_t *val) + json_t *val, + const char *root_ref, + int root_seq, + const char *namespace, + const char **errmsg) { if (!w->responded) { /* this is the first response case, store the first response * info. This is here b/c initial response could have been - * ENOENT case */ - if (treeobj_decode_val (val, - NULL, - &w->append_offset) < 0) { - flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - return -1; + * ENOENT case + * + * The very first response may be a 'val' treeobj instead of + * 'valref', if there have been no appends yet. + */ + if (treeobj_is_val (val)) { + w->index_set = true; + w->prev_start_index = 0; + w->prev_end_index = 0; + /* since this is a val object, we can just return it */ + if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + return -1; + } + w->responded = true; } + else if (treeobj_is_valref (val)) { + /* N.B. It may not be obvious why we have to check w->index_set if we + * have not yet responded. It is possible we have received a setroot + * response and an updated valref before loads from the content store + * have returned to the caller. + */ + if (w->index_set) { + int count = treeobj_get_count (val); + if ((count - 1) > w->prev_end_index) { + w->prev_start_index = w->prev_end_index + 1; + w->prev_end_index = count - 1; + } + else if ((count - 1) < w->prev_end_index) { + (*errmsg) = "key watched with WATCH_APPEND shortened"; + errno = EINVAL; + return -1; + } + else + goto out; + } + else { + w->index_set = true; + w->prev_start_index = 0; + w->prev_end_index = treeobj_get_count (val) - 1; + } - if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + if (load_range (h, + w, + w->prev_start_index, + w->prev_end_index, + val) < 0) + return -1; + } + else { + (*errmsg) = "key watched with WATCH_APPEND is not a value"; + errno = EINVAL; return -1; } - - w->responded = true; } else { - json_t *new_val = NULL; - void *new_data = NULL; - int new_offset; - - if (treeobj_decode_val (val, - &new_data, - &new_offset) < 0) { - flux_log_error (h, "%s: treeobj_decode_val", __FUNCTION__); - return -1; + if (treeobj_is_valref (val)) { + int count; + if (!w->index_set) { + errno = EPROTO; + return -1; + } + count = treeobj_get_count (val); + if ((count - 1) > w->prev_end_index) { + w->prev_start_index = w->prev_end_index + 1; + w->prev_end_index = count - 1; + } + else if ((count - 1) < w->prev_end_index) { + (*errmsg) = "key watched with WATCH_APPEND shortened"; + errno = EINVAL; + return -1; + } + else + goto out; + + if (load_range (h, + w, + w->prev_start_index, + w->prev_end_index, + val) < 0) + return -1; } - - /* check length to determine if append actually happened, note - * that zero length append is legal - * - * Note that this check does not ensure that the key was not - * "fake" appended to. i.e. the key overwritten with data - * longer than the original. - */ - if (new_offset < w->append_offset) { - free (new_data); + else { + (*errmsg) = "value of key watched with WATCH_APPEND overwritten"; errno = EINVAL; return -1; } - - if (!(new_val = treeobj_create_val (new_data + w->append_offset, - new_offset - w->append_offset))) { - free (new_data); - return -1; - } - - free (new_data); - w->append_offset = new_offset; - - if (flux_respond_pack (h, w->request, "{ s:o }", "val", new_val) < 0) { - json_decref (new_val); - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - return -1; - } } +out: return 0; } @@ -392,8 +577,10 @@ static void handle_lookup_response (flux_future_t *f, { flux_t *h = flux_future_get_flux (f); int errnum; + const char *root_ref; int root_seq; json_t *val; + const char *errmsg = NULL; if (flux_future_aux_get (f, "initial")) { @@ -413,8 +600,9 @@ static void handle_lookup_response (flux_future_t *f, goto error; } - if (flux_rpc_get_unpack (f, "{ s:o s:i }", + if (flux_rpc_get_unpack (f, "{ s:o s:s s:i }", "val", &val, + "rootref", &root_ref, "rootseq", &root_seq) < 0) { /* It is worth mentioning ENOTSUP error conditions here. * @@ -436,7 +624,13 @@ static void handle_lookup_response (flux_future_t *f, goto error; } - if (handle_initial_response (h, w, val, root_seq) < 0) + if (handle_initial_response (h, + w, + val, + root_ref, + root_seq, + w->nsm->ns_name, + &errmsg) < 0) goto error; } else { @@ -447,8 +641,9 @@ static void handle_lookup_response (flux_future_t *f, goto error; } - if (flux_rpc_get_unpack (f, "{ s:o s:i }", + if (flux_rpc_get_unpack (f, "{ s:o s:s s:i }", "val", &val, + "rootref", &root_ref, "rootseq", &root_seq) < 0) goto error; @@ -464,7 +659,13 @@ static void handle_lookup_response (flux_future_t *f, goto error; } else if (w->flags & FLUX_KVS_WATCH_APPEND) { - if (handle_append_response (h, w, val) < 0) + if (handle_append_response (h, + w, + val, + root_ref, + root_seq, + w->nsm->ns_name, + &errmsg) < 0) goto error; } else { @@ -476,7 +677,7 @@ static void handle_lookup_response (flux_future_t *f, return; error: if (!w->mute) { - if (flux_respond_error (h, w->request, errno, NULL) < 0) + if (flux_respond_error (h, w->request, errno, errmsg) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } w->finished = true; @@ -527,14 +728,17 @@ static flux_future_t *lookupat (flux_t *h, json_t *o = NULL; flux_future_t *f; int saved_errno; + int flags = w->flags; if (!(msg = flux_request_encode ("kvs.lookup-plus", NULL))) return NULL; + if (flags & FLUX_KVS_WATCH_APPEND) + flags |= FLUX_KVS_TREEOBJ; if (!w->initial_rpc_sent) { if (flux_msg_pack (msg, "{s:s s:s s:i}", "key", w->key, "namespace", ns, - "flags", w->flags) < 0) + "flags", flags) < 0) goto error; } else { @@ -542,7 +746,7 @@ static flux_future_t *lookupat (flux_t *h, goto error; if (flux_msg_pack (msg, "{s:s s:i s:i s:O}", "key", w->key, - "flags", w->flags, + "flags", flags, "rootseq", root_seq, "rootdir", o) < 0) goto error;