Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support shrink key added to the RFC 28 scheduler acquisition protocol #1344

Open
grondo opened this issue Feb 18, 2025 · 19 comments
Open

support shrink key added to the RFC 28 scheduler acquisition protocol #1344

grondo opened this issue Feb 18, 2025 · 19 comments

Comments

@grondo
Copy link
Contributor

grondo commented Feb 18, 2025

flux-framework/rfc#447 adds a new shrink key in the RFC 28 resource acquisition protocol. The key contains an idset of execution targets which have been removed from the instance and should no longer be considered for scheduling, feasibility etc.

This is one step in solving flux-framework/flux-core#6641, in which nodes lost during a resilient batch job are still considered schedulable at some point in the future, when in reality they will never come back online, so a job can block in pending state indefinitely.

What would it take to support the shrink key in Fluxion @trws @milroy? If we can get this enabled by the next release that would be ideal.

@grondo
Copy link
Contributor Author

grondo commented Feb 19, 2025

A WIP PR is now posted to flux-core (flux-framework/flux-core#6652) which adds support to the resource acquisition protocol for the shrink key, along with corresponding functionality in sched-simple to shrink the resource set available for scheduling.

Before moving on to testing and merging the PR before the next release, I'd like to know if it is possible to get similar functionality in Fluxion quickly. I'm willing to poke at it if someone could give me a pointer where to start looking.

The functionality we'll need is to remove or otherwise mark "permanently unavailable" execution targets (ranks) provided in the shrink key of the acquisition protocol response.

@grondo
Copy link
Contributor Author

grondo commented Feb 19, 2025

This is my first stab at support for the shrink key - I definitely got something wrong here, but could use some feedback:
However, in testing this seems to "work" (for some definitely thereof). There isn't even an error when a job running on shrink ranks is finally freed:

diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp
index 9edda2d8..5e607b1e 100644
--- a/resource/modules/resource_match.cpp
+++ b/resource/modules/resource_match.cpp
@@ -1258,23 +1258,54 @@ done:
     return rc;
 }
 
+static int shrink_resources (std::shared_ptr<resource_ctx_t> &ctx,
+                             const char *ids)
+{
+    int rc = -1;
+    std::set<int64_t> ranks;
+
+    if (!ids) {
+        errno = EINVAL;
+        goto done;
+    }
+    if ((rc = decode_rankset (ctx, ids, ranks)) < 0)
+        goto done;
+    if ((rc = ctx->traverser->shrink (ranks))) {
+        flux_log (ctx->h,
+                  LOG_ERR,
+                  "shrink %s failed: %s",
+                  ids,
+                  ctx->traverser->err_message ().c_str ());
+        goto done;
+    }
+    flux_log (ctx->h,
+              LOG_DEBUG,
+              "removed ranks %s from resource set",
+              ids);
+done:
+    return rc;
+}
+
 static void update_resource (flux_future_t *f, void *arg)
 {
     int rc = -1;
     const char *up = NULL;
     const char *down = NULL;
+    const char *shrink = NULL;
     double expiration = -1.;
     json_t *resources = NULL;
     std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
 
     if ((rc = flux_rpc_get_unpack (f,
-                                   "{s?:o s?:s s?:s s?:F}",
+                                   "{s?:o s?:s s?:s s?:s s?:F}",
                                    "resources",
                                    &resources,
                                    "up",
                                    &up,
                                    "down",
                                    &down,
+                                   "shrink",
+                                   &shrink,
                                    "expiration",
                                    &expiration))
         < 0) {
@@ -1286,6 +1317,8 @@ static void update_resource (flux_future_t *f, void *arg)
         flux_log_error (ctx->h, "%s: update_resource_db", __FUNCTION__);
         goto done;
     }
+    if (shrink && shrink_resources (ctx, shrink) < 0)
+        goto done;
     if (expiration >= 0.) {
         /*  Update graph duration:
          */
diff --git a/resource/traversers/dfu.cpp b/resource/traversers/dfu.cpp
index 9241a3f1..8a60f1f4 100644
--- a/resource/traversers/dfu.cpp
+++ b/resource/traversers/dfu.cpp
@@ -509,6 +509,12 @@ int dfu_traverser_t::mark (std::set<int64_t> &ranks, resource_pool_t::status_t s
     return detail::dfu_impl_t::mark (ranks, status);
 }
 
+int dfu_traverser_t::shrink (std::set<int64_t> &ranks)
+{
+    clear_err_message ();
+    return detail::dfu_impl_t::shrink (ranks);
+}
+
 /*
  * vi:tabstop=4 shiftwidth=4 expandtab
  */
diff --git a/resource/traversers/dfu.hpp b/resource/traversers/dfu.hpp
index 1db95343..4cca16b0 100644
--- a/resource/traversers/dfu.hpp
+++ b/resource/traversers/dfu.hpp
@@ -199,6 +199,9 @@ class dfu_traverser_t : protected detail::dfu_impl_t {
      */
     int mark (std::set<int64_t> &ranks, resource_pool_t::status_t status);
 
+
+    int shrink (std::set<int64_t> &ranks);
+
    private:
     int is_satisfiable (Jobspec::Jobspec &jobspec,
                         detail::jobmeta_t &meta,
diff --git a/resource/traversers/dfu_impl.hpp b/resource/traversers/dfu_impl.hpp
index 8369fa09..aff94e8e 100644
--- a/resource/traversers/dfu_impl.hpp
+++ b/resource/traversers/dfu_impl.hpp
@@ -324,6 +324,8 @@ class dfu_impl_t {
      */
     int mark (std::set<int64_t> &ranks, resource_pool_t::status_t status);
 
+    int shrink (std::set<int64_t> &ranks);
+
    private:
     /************************************************************************
      *                                                                      *
diff --git a/resource/traversers/dfu_impl_update.cpp b/resource/traversers/dfu_impl_update.cpp
index 9cbf9466..5bffb023 100644
--- a/resource/traversers/dfu_impl_update.cpp
+++ b/resource/traversers/dfu_impl_update.cpp
@@ -15,6 +15,7 @@ extern "C" {
 }
 
 #include "resource/traversers/dfu_impl.hpp"
+#include <readers/resource_reader_factory.hpp>
 
 using namespace Flux::Jobspec;
 using namespace Flux::resource_model;
@@ -908,6 +909,47 @@ int dfu_impl_t::mark (std::set<int64_t> &ranks, resource_pool_t::status_t status
     return 0;
 }
 
+int dfu_impl_t::shrink (std::set<int64_t> &ranks)
+{
+    std::shared_ptr<resource_reader_base_t> rd;
+    if ((rd = create_resource_reader ("jgf")) == nullptr)
+        return -1;
+
+    try {
+        std::map<int64_t, std::vector<vtx_t>>::iterator vit;
+        std::string subtree_path = "", tmp_path = "";
+        subsystem_t dom = m_match->dom_subsystem ();
+        vtx_t subtree_root;
+
+        int total = 0;
+        for (auto &rank : ranks) {
+            // Now iterate through subgraphs keyed by rank and
+            // set status appropriately
+            vit = m_graph_db->metadata.by_rank.find (rank);
+            if (vit == m_graph_db->metadata.by_rank.end ())
+                continue;
+
+            subtree_root = vit->second.front ();
+            subtree_path = (*m_graph)[subtree_root].paths.at (dom);
+            for (vtx_t v : vit->second) {
+                // The shortest path string is the subtree root.
+                tmp_path = (*m_graph)[v].paths.at (dom);
+                if (tmp_path.length () < subtree_path.length ()) {
+                    subtree_path = tmp_path;
+                    subtree_root = v;
+                }
+            }
+            rd->remove_subgraph (*m_graph, m_graph_db->metadata, subtree_path);
+            // TODO reinit traverser?
+            ++total;
+        }
+    } catch (std::out_of_range &) {
+        errno = ENOENT;
+        return -1;
+    }
+    return 0;
+}
+
 /*
  * vi:tabstop=4 shiftwidth=4 expandtab
  */

@trws
Copy link
Member

trws commented Feb 19, 2025

Discussed in fluxion meeting making a new status for nodes that are never going to become usable.

Prune:
https://github.com/flux-framework/flux-sched/blob/master/resource/traversers/dfu_impl.cpp#L219
Request feasible:
https://github.com/flux-framework/flux-sched/blob/master/resource/traversers/dfu.cpp#L109
Status string:
https://github.com/flux-framework/flux-sched/blob/master/resource/schema/resource_data.cpp#L47

@grondo
Copy link
Contributor Author

grondo commented Feb 20, 2025

Thanks @trws and @milroy: I added a LOST member of the resource_pool_t::status_t enum, then added a handler to mark ranks as LOST when they are in the shrink key of an acquire response.

However, the bit in dfu_traverser_t::request_feasible() is not working. I was confused so I started adding some print debugging and found that this function is short-circuiting here:

if ((!meta.constraint) && (target_nodes <= get_graph_db ()->metadata.nodes_up))
return 0;

This seems to be because target_nodes in this function is being set to 0, even for flux run -N4

The test case I'm using is this simple script:

#!/bin/sh
flux module remove sched-simple
flux module load resource/modules/sched-fluxion-resource.so
flux module load qmanager/modules/sched-fluxion-qmanager.so
flux run -N4 hostname
flux overlay disconnect 3
flux run -N4 hostname

This script is run with flux run -N4 -o exit-timeout=none flux start -Stbon.topo=kary:0 test.sh. The test ensures that 1) the instance survives the forced disconnect of rank 3, 2) the second request for 4 nodes gets an unsatisfiable exception instead of hanging in the pending state. A subsequent flux run -N3 hostname should work, however.

Here's the result of running this test with my printf debugging enabled:

$ flux run -o exit-timeout=none -N4 flux start -Stbon.topo=kary:0 ./test.sh
request_feasible target_nodes=0 nodes_up=4
{node: 0} {core: 4} 

corona212
corona212
corona212
corona212
flux-overlay: asking corona212 (rank 0) to disconnect child corona212 (rank 3)
Feb 19 20:16:41.510085 PST 2025 broker.err[0]: corona212 (rank 3) disconnected by request
Feb 19 20:16:41.510337 PST 2025 broker.crit[3]: corona212 (rank 0) sent disconnect control message
Feb 19 20:16:41.611441 PST 2025 broker.err[0]: dead to Flux: corona212 (rank 3)
request_feasible target_nodes=0 nodes_up=2
{node: 0} {core: 4} 

And the second job hangs instead of getting an exception.

Here's the diff including the debugging:

diff --git a/resource/traversers/dfu.cpp b/resource/traversers/dfu.cpp
index 9241a3f1..25658478 100644
--- a/resource/traversers/dfu.cpp
+++ b/resource/traversers/dfu.cpp
@@ -71,6 +71,16 @@ int dfu_traverser_t::request_feasible (detail::jobmeta_t const &meta,
     const bool checking_satisfiability =
         op == match_op_t::MATCH_ALLOCATE_W_SATISFIABILITY || op == match_op_t::MATCH_SATISFIABILITY;
 
+    std::cerr << "request_feasible"
+        << " target_nodes=" << target_nodes
+        << " nodes_up=" <<  get_graph_db ()->metadata.nodes_up
+        << std::endl;
+
+    for (const auto& pair : dfv) {
+        std::cerr << "{" << pair.first << ": " << pair.second << "} ";
+    }
+    std::cout << std::endl;
+
     if ((!meta.constraint) && (target_nodes <= get_graph_db ()->metadata.nodes_up))
         return 0;
 
@@ -88,6 +98,8 @@ int dfu_traverser_t::request_feasible (detail::jobmeta_t const

I admit I'm a bit confused, and did not expect flux run -N4 to result in dfv["node"] = 0, so perhaps I'm doing something completely wrong here. I'm also a bit confused why nodes_up = 2 after one rank goes down.

@grondo
Copy link
Contributor Author

grondo commented Feb 20, 2025

I'm also a bit confused why nodes_up = 2 after one rank goes down.

I did figure this part out. It seems traverser->mark () is not idempotent and marking ranks DOWN then LOST double decrements the nodes_up counter.

@grondo
Copy link
Contributor Author

grondo commented Feb 20, 2025

Ok, this was my fault, somehow I missed modifying prune(). With this addition, the test works as expected:

diff --git a/resource/traversers/dfu_impl.cpp b/resource/traversers/dfu_impl.cpp
index 4103a2fb..5f6f69e8 100644
--- a/resource/traversers/dfu_impl.cpp
+++ b/resource/traversers/dfu_impl.cpp
@@ -213,6 +213,12 @@ int dfu_impl_t::prune (const jobmeta_t &meta,
                        const std::vector<Jobspec::Resource> &resources)
 {
     int rc = 0;
+
+    // Prune LOST resources
+    if ((*m_graph)[u].status == resource_pool_t::status_t::LOST) {
+        rc = -1;
+        goto done;
+    }
     // Prune by the visiting resource vertex's availability
     // If resource is not UP, no reason to descend further.
     if (meta.alloc_type != jobmeta_t::alloc_type_t::AT_SATISFIABILITY

I'm still a bit confused why request_satisfiable() always sees target_nodes == 0, but I'll ignore that for now.

grondo added a commit to grondo/flux-sched that referenced this issue Feb 20, 2025
Problem: RFC 28 specifies an optional `shrink` key in an RFC 28
resource acquisition protocol response, but this is not handled
by Fluxion.

Mark resources in any `shrink` key in a resource acquisition response
as LOST. This treats the resources as permanently unavailable for
scheduling without removing them from the resource graph.

Fixes flux-framework#1344
@trws
Copy link
Member

trws commented Feb 20, 2025

The target_nodes == 0 distresses me a bit, that should be the number of nodes in the aggregated resource request. Maybe satisfiability doesn't populate that the same way but if it's working with the appropriate prune change I wouldn't block this for that issue.

@grondo
Copy link
Contributor Author

grondo commented Feb 20, 2025

Thanks @trws, once I propose a PR I'll open a separate issue on what I observed with request_satisfiable()

grondo added a commit to grondo/flux-sched that referenced this issue Feb 21, 2025
Problem: RFC 28 specifies an optional `shrink` key in an RFC 28
resource acquisition protocol response, but this is not handled
by Fluxion.

Mark resources in any `shrink` key in a resource acquisition response
as LOST. This treats the resources as permanently unavailable for
scheduling without removing them from the resource graph.

Fixes flux-framework#1344
grondo added a commit to grondo/flux-sched that referenced this issue Feb 21, 2025
Problem: RFC 28 specifies an optional `shrink` key in an RFC 28
resource acquisition protocol response, but this is not handled
by Fluxion.

Mark resources in any `shrink` key in a resource acquisition response
as LOST. This treats the resources as permanently unavailable for
scheduling without removing them from the resource graph.

Fixes flux-framework#1344
grondo added a commit to grondo/flux-sched that referenced this issue Feb 21, 2025
Problem: RFC 28 specifies an optional `shrink` key in an RFC 28
resource acquisition protocol response, but this is not handled
by Fluxion.

Mark resources in any `shrink` key in a resource acquisition response
as LOST. This treats the resources as permanently unavailable for
scheduling without removing them from the resource graph.

Fixes flux-framework#1344
@milroy
Copy link
Member

milroy commented Feb 21, 2025

If you want to remove the subgraph, you'll need to partial cancel the ranks in the shrink key first. The reason is that partial cancel updates the pruning/aggregate filter counts. Deleting the subgraph first will reduce the total counts of resources but will not update the counts of allocated resources. It won't be possible to update the counts of allocated resources after the subgraph is deleted.

After partial cancel and shrink, you can update the total counts by reinitializing the traverser (e.g., in resource_match.cpp):

ctx->traverser->initialize ();

You could also do the reinitialization in the traverser itself.

Partial cancel currently only supports rv1exec and JGF, but it would be straightforward to add support for an idset. Let me know if that's needed and I'll get to work on it.

@grondo
Copy link
Contributor Author

grondo commented Feb 21, 2025

Thanks @milroy! After the meeting I dropped the code above in favor of the suggested approach of a new vertex state (implemented in #1345). Let me know if you think we should lean towards subgraph removal instead. Addition of the LOST state was trivial, but it does seem like true subgraph removal would be superior.

What will happen though, when the job manager sends a free response which contains removed resources? (We do not shrink the resources assigned to jobs when a node fails, so when the job releases resources, this will include the resources corresponding to the shrink idset) Does Fluxion already ignore missing resources in partial and full cancel?

@grondo
Copy link
Contributor Author

grondo commented Feb 21, 2025

The target_nodes == 0 distresses me a bit, that should be the number of nodes in the aggregated resource request. Maybe satisfiability doesn't populate that the same way but if it's working with the appropriate prune change I wouldn't block this for that issue.

target_nodes == 0 seems to be the case in my testing because dfu_impl_t::prime_jobspec() only appears to aggregate counts of resource types that return true for m_match->is_pruning_type(), which doesn't appear to be the case for type "node" in my test environment, whereas it is true for type "core" 🤷. I don't really understand the pruners, so maybe working as designed.

@milroy
Copy link
Member

milroy commented Feb 21, 2025

What will happen though, when the job manager sends a free response which contains removed resources? ... Does Fluxion already ignore missing resources in partial and full cancel?

Currently that would result in an error here:

if (rank_vector == m.by_rank.end ()) {
m_err_msg += __FUNCTION__;
m_err_msg += ": rank not found in by_rank map.\n";
return -1;

It would be easy to change the behavior to ignore the missing ranks. Implementing shrink via subgraph removal isn't trivial, but it is straightforward.

@milroy
Copy link
Member

milroy commented Feb 21, 2025

types that return true for m_match->is_pruning_type(), which doesn't appear to be the case for type "node" in my test environment, whereas it is true for type "core" 🤷. I don't really understand the pruners, so maybe working as designed.

It sounds like it's working as designed. You can try setting the pruning filter configuration to be "ALL:core,ALL:node" and see if target_nodes has the value you expect.

@grondo
Copy link
Contributor Author

grondo commented Feb 21, 2025

It sounds like it's working as designed. You can try setting the pruning filter configuration to be "ALL:core,ALL:node" and see if target_nodes has the value you expect.

Yes, that worked. For my own edification, can you explain why "ALL:core" is the default and not "ALL:core,ALL:node"?

@milroy
Copy link
Member

milroy commented Feb 21, 2025

For my own edification, can you explain why "ALL:core" is the default and not "ALL:core,ALL:node"?

That's a good question! After searching a bit, it looks like the "ALL:core" default was set in this PR: #401 for expediency. It may have been the intention to revisit the default settings.

I don't see why we can't make the default "ALL:core,ALL:node". The disadvantage is increased memory footprint, but it's a small increase relative to the current default.

@trws
Copy link
Member

trws commented Feb 21, 2025

We probably should, there are certain things that don't work as expected otherwise. @milroy would you be willing to put together a PR for that?

@milroy
Copy link
Member

milroy commented Feb 21, 2025

Yeah, I can do that this afternoon.

@grondo
Copy link
Contributor Author

grondo commented Feb 22, 2025

Partial cancel currently only supports rv1exec and JGF, but it would be straightforward to add support for an idset. Let me know if that's needed and I'll get to work on it.

Sounds like we want to at least try this approach. I can try to implement the partial cancel by idset, but if you have time to throw it together @milroy feel free! Thanks.

@milroy
Copy link
Member

milroy commented Feb 22, 2025

I can try to implement the partial cancel by idset, but if you have time to throw it together @milroy feel free! Thanks.

On second thought, we don't need a separate implementation. You could pack the idset into the rank key of a skeleton Rlite (as part of a skeleton rv1exec) and pass that to partial cancel (remove) with the rv1exec reader. The JSON just needs this structure to successfully unpack:

if (json_unpack (rv1, "{s:i s:{s:o}}", "version", &version, "execution", "R_lite", &rlite)
< 0) {
errno = EINVAL;
goto error;
}
if (version != 1) {
errno = EINVAL;
goto error;
}
if (!(r_ids = idset_create (0, IDSET_FLAG_AUTOGROW)))
goto error;
json_array_foreach (rlite, index, entry) {
if (json_unpack (entry, "{s:s%}", "rank", &ranks, &len) < 0) {
errno = EINVAL;
goto error;
}
if (idset_decode_add (r_ids, ranks, len, NULL) < 0)
goto error;
}
rank = idset_first (r_ids);

Not sure how elegant that is, but it will work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants