Skip to content

Commit

Permalink
Merge pull request #6633 from grondo/reslog-truncate-simple
Browse files Browse the repository at this point in the history
truncate resource journal at configurable size
  • Loading branch information
mergify[bot] authored Feb 14, 2025
2 parents 04c71be + 6f79323 commit f6bef37
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 18 deletions.
10 changes: 8 additions & 2 deletions doc/man5/flux-config-resource.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ rediscover
(optional) If true, force rediscovery of resources using HWLOC, rather
then using the R and HWLOC XML from the enclosing instance.

Note that updates to the resource table are ignored until the next Flux
restart.
journal-max
(optional) An integer containing the maximum number of resource eventlog
events held in the resource module for the ``resource.journal`` RPC. The
default is 100,000. This value takes immediate effect on a configuration
update.

Note that, except where noted above, updates to the resource table are
ignored until the next Flux restart.

EXAMPLE
=======
Expand Down
91 changes: 81 additions & 10 deletions src/modules/resource/reslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ struct reslog {
struct resource_ctx *ctx;
zlist_t *pending; // list of pending futures
zlist_t *watchers;
json_t *eventlog;
zlistx_t *eventlog;
int journal_max;
struct flux_msglist *consumers;
flux_msg_handler_t **handlers;
};
Expand Down Expand Up @@ -196,6 +197,45 @@ int reslog_sync (struct reslog *reslog)
return 0;
}

/* Truncate resource journal if needed to reslog->journal_max
*/
static int reslog_truncate (struct reslog *reslog)
{
int rc = -1;
int count;
double timestamp;
json_t *event = NULL;

if ((count = zlistx_size (reslog->eventlog)) <= reslog->journal_max)
return 0;

/* Detach events until count is decreased to max - 1.
* Save timestamps for the truncate event.
*/
while (count-- >= reslog->journal_max) {
event = zlistx_first (reslog->eventlog);
if (eventlog_entry_parse (event, &timestamp, NULL, NULL) < 0) {
/* Unlikely: failed to parse timestamp from first event, but
* timestamp of truncate event needs to come before any other
* event, so set it to a small value (not 0., because this will
* cause eventlog_entry_create() to use current timestamp.)
*/
timestamp = 0.1;
}
zlistx_delete (reslog->eventlog, zlistx_cursor (reslog->eventlog));
}

/* Push truncate event onto front of list
*/
if (!(event = eventlog_entry_create (timestamp, "truncate", NULL))
|| !(zlistx_add_start (reslog->eventlog, event)))
goto out;
rc = 0;
out:
json_decref (event);
return rc;
}

int reslog_post_pack (struct reslog *reslog,
const flux_msg_t *request,
double timestamp,
Expand All @@ -217,11 +257,12 @@ int reslog_post_pack (struct reslog *reslog,
va_end (ap);
if (!event)
return -1;
if (json_array_append (reslog->eventlog, event) < 0) {
json_decref (event);
if (!zlistx_add_end (reslog->eventlog, event)) {
errno = ENOMEM;
return -1;
}
if (reslog_truncate (reslog) < 0)
flux_log_error (h, "failed to truncate eventlog");
if ((flags & EVENT_NO_COMMIT)) {
if (!(f = flux_future_create (NULL, NULL)))
goto error;
Expand Down Expand Up @@ -299,11 +340,11 @@ int reslog_add_callback (struct reslog *reslog, reslog_cb_f cb, void *arg)
static bool send_backlog (struct reslog *reslog, const flux_msg_t *msg)
{
flux_t *h = reslog->ctx->h;
size_t index;
json_t *entry;
json_array_foreach (reslog->eventlog, index, entry) {
json_t *entry = zlistx_first (reslog->eventlog);
while (entry) {
if (notify_one_consumer (reslog, msg, entry) < 0)
goto error;
entry = zlistx_next (reslog->eventlog);
}
if (flux_respond_pack (h, msg, "{s:[]}", "events") < 0) // delimiter
goto error;
Expand Down Expand Up @@ -398,34 +439,64 @@ void reslog_destroy (struct reslog *reslog)
flux_msglist_destroy (reslog->consumers);
}
zlist_destroy (&reslog->watchers);
json_decref (reslog->eventlog);
zlistx_destroy (&reslog->eventlog);
free (reslog);
errno = saved_errno;
}
}

struct reslog *reslog_create (struct resource_ctx *ctx, json_t *eventlog)
static void entry_destructor (void **item)
{
if (*item) {
json_decref (*item);
*item = NULL;
}
}

static void *entry_duplicator (const void *item)
{
return json_incref ((json_t *) item);
}

void reslog_set_journal_max (struct reslog *reslog, int max)
{
if (reslog) {
reslog->journal_max = max;
if (reslog_truncate (reslog) < 0)
flux_log_error (reslog->ctx->h,
"resource eventlog truncation failed");
}
}

struct reslog *reslog_create (struct resource_ctx *ctx,
json_t *eventlog,
int journal_max)
{
struct reslog *reslog;

if (!(reslog = calloc (1, sizeof (*reslog))))
return NULL;
reslog->ctx = ctx;
reslog->journal_max = journal_max;
if (!(reslog->pending = zlist_new ())
|| !(reslog->watchers = zlist_new ()))
goto nomem;
zlist_comparefn (reslog->watchers, watcher_compare);
if (!(reslog->eventlog = json_array ()))
if (!(reslog->eventlog = zlistx_new ()))
goto nomem;
zlistx_set_destructor (reslog->eventlog, entry_destructor);
zlistx_set_duplicator (reslog->eventlog, entry_duplicator);
if (eventlog) {
size_t index;
json_t *entry;
json_array_foreach (eventlog, index, entry) {
// historical resource-define events are not helpful
if (match_event (entry, "resource-define"))
continue;
if (json_array_append (reslog->eventlog, entry) < 0)
if (!zlistx_add_end (reslog->eventlog, entry))
goto nomem;
if (reslog_truncate (reslog) < 0)
flux_log_error (ctx->h, "eventlog truncate failed");
}
}
if (!(reslog->consumers = flux_msglist_create ()))
Expand Down
6 changes: 5 additions & 1 deletion src/modules/resource/reslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ typedef void (*reslog_cb_f)(struct reslog *reslog,
json_t *context,
void *arg);

struct reslog *reslog_create (struct resource_ctx *ctx, json_t *eventlog);
struct reslog *reslog_create (struct resource_ctx *ctx,
json_t *eventlog,
int journal_max);
void reslog_destroy (struct reslog *reslog);

void reslog_set_journal_max (struct reslog *reslog, int max);

/* Post an event to the eventlog. This function returns immediately,
* and the commit to the eventlog completes asynchronously.
* If 'request' is non-NULL, a success/fail response is sent upon commit
Expand Down
17 changes: 13 additions & 4 deletions src/modules/resource/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@
* rediscover = false
* Force rediscovery of local resources via hwloc. Do not fetch R or hwloc
* XML from the enclosing instance.
*
* journal-max = 100000
* Maximum size allowed of the resource journal before it is truncated.
*/

static int parse_config (struct resource_ctx *ctx,
const flux_conf_t *conf,
struct resource_config *rconfig,
Expand All @@ -75,12 +79,13 @@ static int parse_config (struct resource_ctx *ctx,
int norestrict = 0;
int no_update_watch = 0;
int rediscover = 0;
int journal_max = 100000;
json_t *o = NULL;
json_t *config = NULL;

if (flux_conf_unpack (conf,
&error,
"{s?{s?s s?s s?o s?s s?b s?b s?b s?b !}}",
"{s?{s?s s?s s?o s?s s?b s?b s?b s?b s?i !}}",
"resource",
"path", &path,
"scheduling", &scheduling_path,
Expand All @@ -89,7 +94,8 @@ static int parse_config (struct resource_ctx *ctx,
"norestrict", &norestrict,
"noverify", &noverify,
"no-update-watch", &no_update_watch,
"rediscover", &rediscover) < 0) {
"rediscover", &rediscover,
"journal-max", &journal_max) < 0) {
errprintf (errp,
"error parsing [resource] configuration: %s",
error.text);
Expand Down Expand Up @@ -146,6 +152,7 @@ static int parse_config (struct resource_ctx *ctx,
}
}
if (rconfig) {
rconfig->journal_max = journal_max;
rconfig->exclude_idset = exclude;
rconfig->noverify = noverify ? true : false;
rconfig->norestrict = norestrict ? true : false;
Expand All @@ -171,13 +178,15 @@ static void config_reload_cb (flux_t *h,
const flux_conf_t *conf;
flux_error_t error;
const char *errstr = NULL;
struct resource_config config = {0};

if (flux_conf_reload_decode (msg, &conf) < 0)
goto error;
if (parse_config (ctx, conf, NULL, &error) < 0) {
if (parse_config (ctx, conf, &config, &error) < 0) {
errstr = error.text;
goto error;
}
reslog_set_journal_max (ctx->reslog, config.journal_max);
if (flux_set_conf (h, flux_conf_incref (conf)) < 0) {
errstr = "error updating cached configuration";
goto error;
Expand Down Expand Up @@ -372,7 +381,7 @@ int mod_main (flux_t *h, int argc, char **argv)
*/
if (upgrade_eventlog (h, &eventlog) < 0)
goto error;
if (!(ctx->reslog = reslog_create (ctx, eventlog)))
if (!(ctx->reslog = reslog_create (ctx, eventlog, config.journal_max)))
goto error;
if (!(ctx->acquire = acquire_create (ctx)))
goto error;
Expand Down
1 change: 1 addition & 0 deletions src/modules/resource/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct resource_config {
bool norestrict;
bool no_update_watch;
bool monitor_force_up;
int journal_max;
};

struct resource_ctx {
Expand Down
50 changes: 49 additions & 1 deletion t/t2355-resource-journal.t
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,55 @@ test_expect_success 'last event: resource-define method=kvs' '
jq -e ".name == \"resource-define\"" restartlog.3 &&
jq -e ".context.method == \"kvs\"" restartlog.3
'

test_expect_success 'ensure all ranks are undrained' '
ranks=$(flux resource status -no {ranks} -s drain) &&
if test -n "$ranks"; then
flux resource undrain $ranks
fi
'
test_expect_success 'capture eventlog before truncation' '
flux resource eventlog -H &&
flux resource eventlog -f json > eventlog.pre.out
'
test_expect_success 'set a journal size limit 1 less than current entries' '
limit=$(($(flux resource eventlog | wc -l) - 1)) &&
test_debug "echo limiting resource.journal to $limit entries" &&
echo resource.journal-max=$limit | flux config load
'
test_expect_success 'eventlog is now truncated' '
flux resource eventlog -H &&
flux resource eventlog -f json > eventlog.trunc
'
test_expect_success 'truncated eventlog has expected number of entries' '
test_debug "wc -l eventlog.trunc" &&
test $(wc -l < eventlog.trunc) -eq $limit
'
test_expect_success '1st event is a truncate event' '
head -1 eventlog.trunc > eventlog.trunc.1 &&
jq -e ".name == \"truncate\"" eventlog.trunc.1
'
test_expect_success 'cause another event to be posted to the eventlog' '
flux resource drain 0
'
test_expect_success '1st event is still a truncate event' '
flux resource eventlog -H &&
flux resource eventlog -f json > eventlog2.trunc &&
head -1 eventlog2.trunc | jq -e ".name == \"truncate\""
'
test_expect_success 'truncated eventlog has expected number of entries' '
test $(wc -l < eventlog2.trunc) -eq $limit
'
test_expect_success 'cause another event to be posted to the eventlog' '
flux resource undrain 0
'
test_expect_success '1st event is still a truncate event' '
flux resource eventlog -H &&
flux resource eventlog -f json > eventlog3.trunc &&
head -1 eventlog3.trunc | jq -e ".name == \"truncate\""
'
test_expect_success 'truncated eventlog has expected number of entries' '
test $(wc -l < eventlog3.trunc) -eq $limit
'
test_expect_success 'reload the scheduler' '
flux module load sched-simple
'
Expand Down

0 comments on commit f6bef37

Please sign in to comment.