Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flux-exec: use subprocess credits to avoid overflowing stdin buffers #6370

Merged
merged 5 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 148 additions & 25 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "src/common/libsubprocess/fbuf_watcher.h"
#include "ccan/str/str.h"

#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1))

static struct optparse_option cmdopts[] = {
{ .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "IDSET",
.usage = "Specify target ranks. Default is \"all\"" },
Expand All @@ -53,6 +55,9 @@
{ .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Set subprocess option NAME to VALUE (multiple use ok)" },
{ .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Forcibly enable or disable stdin flow control" },
{ .name = "with-imp", .has_arg = 0,
.usage = "Run args under 'flux-imp run'" },
{ .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID",
Expand All @@ -72,12 +77,22 @@
zhashx_t *exitsets;
struct idset *hanging;

zlist_t *subprocesses;
zlistx_t *subprocesses;
/* subprocess credits ordered low to high. Exited and failed
* subprocesses are removed from the list.
*/
zlistx_t *subprocess_credits;

struct subproc_credit {
void *handle; /* handle to subprocess in credits list */
int credits;
};

optparse_t *opts = NULL;

int stdin_flags;
flux_watcher_t *stdin_w;
bool stdin_enable_flow_control = true;

/* time to wait in between SIGINTs */
#define INTERRUPT_MILLISECS 1000.0
Expand Down Expand Up @@ -146,15 +161,46 @@
log_err_exit ("idset_clear");
}

int subprocess_min_credits (void)
{
/* subprocess_credits ordered, min at head */
flux_subprocess_t *p = zlistx_head (subprocess_credits);
struct subproc_credit *spcred;
/* list possibly empty if all subprocesses failed, so return no
* credits so stdin watcher won't be started
*/
if (!p)
return 0;
spcred = flux_subprocess_aux_get (p, "credits");
return spcred->credits;
}

void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
spcred->credits += bytes;
if (reorder)
zlistx_reorder (subprocess_credits, spcred->handle, false);
}

void subprocess_remove_credits (flux_subprocess_t *p)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");

Check warning on line 190 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L190

Added line #L190 was not covered by tests
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
{
if (state == FLUX_SUBPROCESS_RUNNING) {
started++;
/* see FLUX_SUBPROCESS_FAILED case below */
(void)flux_subprocess_aux_set (p, "started", p, NULL);
}
else if (state == FLUX_SUBPROCESS_EXITED)
else if (state == FLUX_SUBPROCESS_EXITED) {
exited++;
subprocess_remove_credits (p);
}
else if (state == FLUX_SUBPROCESS_FAILED) {
/* FLUX_SUBPROCESS_FAILED is a catch all error case, no way to
* know if process started or not. So we cheat with a
Expand All @@ -163,11 +209,21 @@
if (flux_subprocess_aux_get (p, "started") == NULL)
started++;
exited++;
subprocess_remove_credits (p);
}

if (stdin_w) {
if (started == rank_count)
flux_watcher_start (stdin_w);
if (started == rank_count) {
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (stdin_enable_flow_control) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
else
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -218,6 +274,16 @@
}
}

void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
subprocess_update_credits (p, bytes, true);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
}

static void stdin_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
Expand All @@ -226,28 +292,44 @@
struct fbuf *fb = fbuf_read_watcher_get_buffer (w);
flux_subprocess_t *p;
const char *ptr;
int lenp;
int len, lenp;
int min_credits = -1;

if (stdin_enable_flow_control)
min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, -1, &lenp)))
if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
Comment on lines 300 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a yes but are we certain min_credits is always > 0 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmmm. It should be impossible, discounting unknown or future bugs. Think we should assert check just in case/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe nothing is needed, but I suppose it could just stop the watcher and return if credits is zero.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds like a decent idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait! i think it is possible to be 0, although extremely unlikely, when the stream is closed. should add some smarts in there for that case.

Copy link
Member Author

@chu11 chu11 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i had to think about it a bit. I think it is possible min_credits could be zero if the remote subprocess has a fatal error (like libsubprocess ENOMEM kinda thing), so we're at 0 credits locally and never get credits back. But since this is only in an error scenario, I don't think it matters. And since the stdin_w is already stopped, it doesn't matter in another way :-)

Although, this did make me think of a corner case to fix. If I call subprocess_remove_credits(), it's possible that min_credits may no longer be zero. So I should re-check and start the stdin_w if necessary.

Edit: actually i don't need to make a change, the code actually does this already by chance

log_err_exit ("fbuf_read");

if (lenp) {
p = zlist_first (subprocesses);
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
if (stdin_enable_flow_control) {
/* N.B. since we are subtracting the same number
* of credits from all subprocesses, the sorted
* order in the credits list should not change
*/
subprocess_update_credits (p, -1*len, false);
}
}
p = zlist_next (subprocesses);
p = zlistx_next (subprocesses);
}
if (stdin_enable_flow_control) {
min_credits = subprocess_min_credits ();
if (min_credits == 0)
flux_watcher_stop (stdin_w);
}
}
else {
p = zlist_first (subprocesses);
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "stdin") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
p = zlistx_next (subprocesses);
}
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -291,9 +373,9 @@
&ops);
}

static void killall (zlist_t *l, int signum)
static void killall (zlistx_t *l, int signum)
{
flux_subprocess_t *p = zlist_first (l);
flux_subprocess_t *p = zlistx_first (l);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (use_imp) {
Expand All @@ -316,7 +398,7 @@
flux_future_destroy (f);
}
}
p = zlist_next (l);
p = zlistx_next (l);
}
}

Expand Down Expand Up @@ -361,12 +443,21 @@
}
}

void subprocess_destroy (void *arg)
void subprocess_destroy (void **arg)
{
flux_subprocess_t *p = arg;
flux_subprocess_t *p = *arg;
flux_subprocess_destroy (p);
}
Comment on lines -364 to 450
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set *arg = NULL after free.


int subprocess_credits_compare (const void *item1, const void *item2)
{
flux_subprocess_t *p1 = (flux_subprocess_t *) item1;
flux_subprocess_t *p2 = (flux_subprocess_t *) item2;
struct subproc_credit *spcred1 = flux_subprocess_aux_get (p1, "credits");
struct subproc_credit *spcred2 = flux_subprocess_aux_get (p2, "credits");
return NUMCMP (spcred1->credits, spcred2->credits);
}

/* atexit handler
* This is a good faith attempt to restore stdin flags to what they were
* before we set O_NONBLOCK per bug #1803.
Expand Down Expand Up @@ -575,6 +666,7 @@
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = output_cb,
.on_credit = credit_cb,
};
struct timespec t0;
const char *service_name;
Expand Down Expand Up @@ -704,18 +796,41 @@
free (nodeset);
}

if (!(subprocesses = zlist_new ()))
log_err_exit ("zlist_new");
if (!(subprocesses = zlistx_new ()))
log_err_exit ("zlistx_new");

Check warning on line 800 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L800

Added line #L800 was not covered by tests
zlistx_set_destructor (subprocesses, subprocess_destroy);

if (!(subprocess_credits = zlistx_new ()))
log_err_exit ("zlistx_new");

Check warning on line 804 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L804

Added line #L804 was not covered by tests
zlistx_set_comparator (subprocess_credits, subprocess_credits_compare);

if (!(exitsets = zhashx_new ()))
log_err_exit ("zhashx_new()");

service_name = optparse_get_str (opts,
"service",
job_service ? job_service : "rexec");

// sdexec stdin flow is disabled by default
if (streq (service_name, "sdexec"))
stdin_enable_flow_control = false;

const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL);
if (stdin_flow) {
if (streq (stdin_flow, "off"))
stdin_enable_flow_control = false;
else if (streq (stdin_flow, "on"))
stdin_enable_flow_control = true;

Check warning on line 823 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L820-L823

Added lines #L820 - L823 were not covered by tests
else
log_msg_exit ("Set --stdin-flow to on or off");

Check warning on line 825 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L825

Added line #L825 was not covered by tests
}
if (!stdin_enable_flow_control)
ops.on_credit = NULL;

rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
struct subproc_credit *spcred;
if (!(p = flux_rexec_ex (h,
service_name,
rank,
Expand All @@ -725,10 +840,17 @@
NULL,
NULL)))
log_err_exit ("flux_rexec");
if (zlist_append (subprocesses, p) < 0)
log_err_exit ("zlist_append");
if (!zlist_freefn (subprocesses, p, subprocess_destroy, true))
log_err_exit ("zlist_freefn");
if (!(spcred = calloc (1, sizeof (*spcred))))
log_err_exit ("calloc");

Check warning on line 844 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L844

Added line #L844 was not covered by tests
if (!zlistx_add_end (subprocesses, p))
log_err_exit ("zlistx_add_end");

Check warning on line 846 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L846

Added line #L846 was not covered by tests
if (!(spcred->handle = zlistx_add_end (subprocess_credits, p)))
log_err_exit ("zlistx_add_end");

Check warning on line 848 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L848

Added line #L848 was not covered by tests
if (flux_subprocess_aux_set (p,
"credits",
spcred,
(flux_free_f) free) < 0)
log_err_exit ("flux_subprocess_aux_set");

Check warning on line 853 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L853

Added line #L853 was not covered by tests
rank = idset_next (targets, rank);
}

Expand All @@ -739,11 +861,11 @@
*/
if (optparse_getopt (opts, "noinput", NULL) > 0) {
flux_subprocess_t *p;
p = zlist_first (subprocesses);
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "stdin") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
p = zlistx_next (subprocesses);
}
}
/* configure stdin watcher
Expand Down Expand Up @@ -800,7 +922,8 @@
log_fini ();

zhashx_destroy (&exitsets);
zlist_destroy (&subprocesses);
zlistx_destroy (&subprocesses);
zlistx_destroy (&subprocess_credits);

return exit_code;
}
Expand Down
52 changes: 51 additions & 1 deletion t/t0005-exec.t
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,57 @@ test_expect_success 'stdin redirect from /dev/null works without -n' '
'

test_expect_success 'stdin redirect from /dev/null works with -n' '
test_expect_code 0 run_timeout 10 flux exec -n -r0-3 cat
test_expect_code 0 run_timeout 10 flux exec -n -r0-3 cat
'

test_expect_success 'create large file for tests' '
dd if=/dev/urandom of=5Mfile bs=5M count=1
'

test_expect_success 'create test script to redirect stdin to a file' '
cat <<-EOT >stdin2file &&
#!/bin/bash
rank=\$(flux getattr rank)
dd of=cpy.\$rank
EOT
chmod +x stdin2file
'

# piping a 5M file using a 4K buffer should overflow if flow control
# is not functioning correctly
test_expect_success 'stdin flow control works (1 rank)' '
cat 5Mfile | flux exec -r 0 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
cmp 5Mfile cpy.0 &&
rm cpy.0
'

test_expect_success 'stdin flow control works (all ranks)' '
cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
cmp 5Mfile cpy.0 &&
cmp 5Mfile cpy.1 &&
cmp 5Mfile cpy.2 &&
cmp 5Mfile cpy.3 &&
rm cpy.*
'

test_expect_success 'create test script to redirect stdin to a file, one rank exits early' '
cat <<-EOT >stdin2file &&
#!/bin/bash
rank=\$(flux getattr rank)
if test \$rank -ne 0; then
dd of=cpy.\$rank
fi
EOT
chmod +x stdin2file
'

test_expect_success 'stdin flow control works (all ranks, one rank will exit early)' '
cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
test_must_fail ls cpy.0 &&
cmp 5Mfile cpy.1 &&
cmp 5Mfile cpy.2 &&
cmp 5Mfile cpy.3 &&
rm cpy.*
'

test_expect_success 'stdin broadcast -- multiple lines' '
Expand Down
Loading