diff --git a/doc/man5/flux-config-resource.rst b/doc/man5/flux-config-resource.rst index 0f5e8ee9dbe0..e792b67b79c2 100644 --- a/doc/man5/flux-config-resource.rst +++ b/doc/man5/flux-config-resource.rst @@ -79,8 +79,14 @@ rediscover (optional) If true, force rediscovery of resources using HWLOC, rather then using the R and HWLOC XML from the enclosing instance. -Note that updates to the resource table are ignored until the next Flux -restart. +journal-max + (optional) An integer containing the maximum number of resource eventlog + events held in the resource module for the ``resource.journal`` RPC. The + default is 100,000. This value takes immediate effect on a configuration + update. + +Note that, except where noted above, updates to the resource table are +ignored until the next Flux restart. EXAMPLE ======= diff --git a/src/modules/resource/reslog.c b/src/modules/resource/reslog.c index 513d54cc6ef6..5563028b724e 100644 --- a/src/modules/resource/reslog.c +++ b/src/modules/resource/reslog.c @@ -36,7 +36,8 @@ struct reslog { struct resource_ctx *ctx; zlist_t *pending; // list of pending futures zlist_t *watchers; - json_t *eventlog; + zlistx_t *eventlog; + int journal_max; struct flux_msglist *consumers; flux_msg_handler_t **handlers; }; @@ -196,6 +197,45 @@ int reslog_sync (struct reslog *reslog) return 0; } +/* Truncate resource journal if needed to reslog->journal_max + */ +static int reslog_truncate (struct reslog *reslog) +{ + int rc = -1; + int count; + double timestamp; + json_t *event = NULL; + + if ((count = zlistx_size (reslog->eventlog)) <= reslog->journal_max) + return 0; + + /* Detach events until count is decreased to max - 1. + * Save timestamps for the truncate event. + */ + while (count-- >= reslog->journal_max) { + event = zlistx_first (reslog->eventlog); + if (eventlog_entry_parse (event, ×tamp, NULL, NULL) < 0) { + /* Unlikely: failed to parse timestamp from first event, but + * timestamp of truncate event needs to come before any other + * event, so set it to a small value (not 0., because this will + * cause eventlog_entry_create() to use current timestamp.) + */ + timestamp = 0.1; + } + zlistx_delete (reslog->eventlog, zlistx_cursor (reslog->eventlog)); + } + + /* Push truncate event onto front of list + */ + if (!(event = eventlog_entry_create (timestamp, "truncate", NULL)) + || !(zlistx_add_start (reslog->eventlog, event))) + goto out; + rc = 0; +out: + json_decref (event); + return rc; +} + int reslog_post_pack (struct reslog *reslog, const flux_msg_t *request, double timestamp, @@ -217,11 +257,12 @@ int reslog_post_pack (struct reslog *reslog, va_end (ap); if (!event) return -1; - if (json_array_append (reslog->eventlog, event) < 0) { - json_decref (event); + if (!zlistx_add_end (reslog->eventlog, event)) { errno = ENOMEM; return -1; } + if (reslog_truncate (reslog) < 0) + flux_log_error (h, "failed to truncate eventlog"); if ((flags & EVENT_NO_COMMIT)) { if (!(f = flux_future_create (NULL, NULL))) goto error; @@ -299,11 +340,11 @@ int reslog_add_callback (struct reslog *reslog, reslog_cb_f cb, void *arg) static bool send_backlog (struct reslog *reslog, const flux_msg_t *msg) { flux_t *h = reslog->ctx->h; - size_t index; - json_t *entry; - json_array_foreach (reslog->eventlog, index, entry) { + json_t *entry = zlistx_first (reslog->eventlog); + while (entry) { if (notify_one_consumer (reslog, msg, entry) < 0) goto error; + entry = zlistx_next (reslog->eventlog); } if (flux_respond_pack (h, msg, "{s:[]}", "events") < 0) // delimiter goto error; @@ -398,25 +439,53 @@ void reslog_destroy (struct reslog *reslog) flux_msglist_destroy (reslog->consumers); } zlist_destroy (&reslog->watchers); - json_decref (reslog->eventlog); + zlistx_destroy (&reslog->eventlog); free (reslog); errno = saved_errno; } } -struct reslog *reslog_create (struct resource_ctx *ctx, json_t *eventlog) +static void entry_destructor (void **item) +{ + if (*item) { + json_decref (*item); + *item = NULL; + } +} + +static void *entry_duplicator (const void *item) +{ + return json_incref ((json_t *) item); +} + +void reslog_set_journal_max (struct reslog *reslog, int max) +{ + if (reslog) { + reslog->journal_max = max; + if (reslog_truncate (reslog) < 0) + flux_log_error (reslog->ctx->h, + "resource eventlog truncation failed"); + } +} + +struct reslog *reslog_create (struct resource_ctx *ctx, + json_t *eventlog, + int journal_max) { struct reslog *reslog; if (!(reslog = calloc (1, sizeof (*reslog)))) return NULL; reslog->ctx = ctx; + reslog->journal_max = journal_max; if (!(reslog->pending = zlist_new ()) || !(reslog->watchers = zlist_new ())) goto nomem; zlist_comparefn (reslog->watchers, watcher_compare); - if (!(reslog->eventlog = json_array ())) + if (!(reslog->eventlog = zlistx_new ())) goto nomem; + zlistx_set_destructor (reslog->eventlog, entry_destructor); + zlistx_set_duplicator (reslog->eventlog, entry_duplicator); if (eventlog) { size_t index; json_t *entry; @@ -424,8 +493,10 @@ struct reslog *reslog_create (struct resource_ctx *ctx, json_t *eventlog) // historical resource-define events are not helpful if (match_event (entry, "resource-define")) continue; - if (json_array_append (reslog->eventlog, entry) < 0) + if (!zlistx_add_end (reslog->eventlog, entry)) goto nomem; + if (reslog_truncate (reslog) < 0) + flux_log_error (ctx->h, "eventlog truncate failed"); } } if (!(reslog->consumers = flux_msglist_create ())) diff --git a/src/modules/resource/reslog.h b/src/modules/resource/reslog.h index 4814cf14ce6f..a582a9f909bf 100644 --- a/src/modules/resource/reslog.h +++ b/src/modules/resource/reslog.h @@ -22,9 +22,13 @@ typedef void (*reslog_cb_f)(struct reslog *reslog, json_t *context, void *arg); -struct reslog *reslog_create (struct resource_ctx *ctx, json_t *eventlog); +struct reslog *reslog_create (struct resource_ctx *ctx, + json_t *eventlog, + int journal_max); void reslog_destroy (struct reslog *reslog); +void reslog_set_journal_max (struct reslog *reslog, int max); + /* Post an event to the eventlog. This function returns immediately, * and the commit to the eventlog completes asynchronously. * If 'request' is non-NULL, a success/fail response is sent upon commit diff --git a/src/modules/resource/resource.c b/src/modules/resource/resource.c index 09b675c901b9..06b64adeac88 100644 --- a/src/modules/resource/resource.c +++ b/src/modules/resource/resource.c @@ -61,7 +61,11 @@ * rediscover = false * Force rediscovery of local resources via hwloc. Do not fetch R or hwloc * XML from the enclosing instance. + * + * journal-max = 100000 + * Maximum size allowed of the resource journal before it is truncated. */ + static int parse_config (struct resource_ctx *ctx, const flux_conf_t *conf, struct resource_config *rconfig, @@ -75,12 +79,13 @@ static int parse_config (struct resource_ctx *ctx, int norestrict = 0; int no_update_watch = 0; int rediscover = 0; + int journal_max = 100000; json_t *o = NULL; json_t *config = NULL; if (flux_conf_unpack (conf, &error, - "{s?{s?s s?s s?o s?s s?b s?b s?b s?b !}}", + "{s?{s?s s?s s?o s?s s?b s?b s?b s?b s?i !}}", "resource", "path", &path, "scheduling", &scheduling_path, @@ -89,7 +94,8 @@ static int parse_config (struct resource_ctx *ctx, "norestrict", &norestrict, "noverify", &noverify, "no-update-watch", &no_update_watch, - "rediscover", &rediscover) < 0) { + "rediscover", &rediscover, + "journal-max", &journal_max) < 0) { errprintf (errp, "error parsing [resource] configuration: %s", error.text); @@ -146,6 +152,7 @@ static int parse_config (struct resource_ctx *ctx, } } if (rconfig) { + rconfig->journal_max = journal_max; rconfig->exclude_idset = exclude; rconfig->noverify = noverify ? true : false; rconfig->norestrict = norestrict ? true : false; @@ -171,13 +178,15 @@ static void config_reload_cb (flux_t *h, const flux_conf_t *conf; flux_error_t error; const char *errstr = NULL; + struct resource_config config = {0}; if (flux_conf_reload_decode (msg, &conf) < 0) goto error; - if (parse_config (ctx, conf, NULL, &error) < 0) { + if (parse_config (ctx, conf, &config, &error) < 0) { errstr = error.text; goto error; } + reslog_set_journal_max (ctx->reslog, config.journal_max); if (flux_set_conf (h, flux_conf_incref (conf)) < 0) { errstr = "error updating cached configuration"; goto error; @@ -372,7 +381,7 @@ int mod_main (flux_t *h, int argc, char **argv) */ if (upgrade_eventlog (h, &eventlog) < 0) goto error; - if (!(ctx->reslog = reslog_create (ctx, eventlog))) + if (!(ctx->reslog = reslog_create (ctx, eventlog, config.journal_max))) goto error; if (!(ctx->acquire = acquire_create (ctx))) goto error; diff --git a/src/modules/resource/resource.h b/src/modules/resource/resource.h index 41e627a209a6..4e1693ed6e84 100644 --- a/src/modules/resource/resource.h +++ b/src/modules/resource/resource.h @@ -19,6 +19,7 @@ struct resource_config { bool norestrict; bool no_update_watch; bool monitor_force_up; + int journal_max; }; struct resource_ctx { diff --git a/t/t2355-resource-journal.t b/t/t2355-resource-journal.t index ccdc7c019c93..c81632d03f77 100755 --- a/t/t2355-resource-journal.t +++ b/t/t2355-resource-journal.t @@ -158,7 +158,55 @@ test_expect_success 'last event: resource-define method=kvs' ' jq -e ".name == \"resource-define\"" restartlog.3 && jq -e ".context.method == \"kvs\"" restartlog.3 ' - +test_expect_success 'ensure all ranks are undrained' ' + ranks=$(flux resource status -no {ranks} -s drain) && + if test -n "$ranks"; then + flux resource undrain $ranks + fi +' +test_expect_success 'capture eventlog before truncation' ' + flux resource eventlog -H && + flux resource eventlog -f json > eventlog.pre.out +' +test_expect_success 'set a journal size limit 1 less than current entries' ' + limit=$(($(flux resource eventlog | wc -l) - 1)) && + test_debug "echo limiting resource.journal to $limit entries" && + echo resource.journal-max=$limit | flux config load +' +test_expect_success 'eventlog is now truncated' ' + flux resource eventlog -H && + flux resource eventlog -f json > eventlog.trunc +' +test_expect_success 'truncated eventlog has expected number of entries' ' + test_debug "wc -l eventlog.trunc" && + test $(wc -l < eventlog.trunc) -eq $limit +' +test_expect_success '1st event is a truncate event' ' + head -1 eventlog.trunc > eventlog.trunc.1 && + jq -e ".name == \"truncate\"" eventlog.trunc.1 +' +test_expect_success 'cause another event to be posted to the eventlog' ' + flux resource drain 0 +' +test_expect_success '1st event is still a truncate event' ' + flux resource eventlog -H && + flux resource eventlog -f json > eventlog2.trunc && + head -1 eventlog2.trunc | jq -e ".name == \"truncate\"" +' +test_expect_success 'truncated eventlog has expected number of entries' ' + test $(wc -l < eventlog2.trunc) -eq $limit +' +test_expect_success 'cause another event to be posted to the eventlog' ' + flux resource undrain 0 +' +test_expect_success '1st event is still a truncate event' ' + flux resource eventlog -H && + flux resource eventlog -f json > eventlog3.trunc && + head -1 eventlog3.trunc | jq -e ".name == \"truncate\"" +' +test_expect_success 'truncated eventlog has expected number of entries' ' + test $(wc -l < eventlog3.trunc) -eq $limit +' test_expect_success 'reload the scheduler' ' flux module load sched-simple '