Skip to content

Commit c44d300

Browse files
committed
feat(nscale): implement topology provider
Signed-off-by: Dmitry Shmulevich <dshmulevich@nvidia.com>
1 parent 97c0aa3 commit c44d300

7 files changed

Lines changed: 630 additions & 1 deletion

File tree

internal/httpreq/httpreq.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ func DoRequestWithRetries(f RequestFunc, insecureSkipVerify bool) ([]byte, *http
125125
for {
126126
attempt++
127127
resp, body, err := DoRequest(f, insecureSkipVerify)
128+
// TODO: remove the line below after troubleshooting is completed
129+
klog.Infof("BODY: %s", string(body))
128130
if err == nil || attempt == maxRetries || !ShouldRetry(err.Code()) {
129131
return body, err
130132
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2026 NVIDIA CORPORATION
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package nscale
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"net/http"
12+
13+
"k8s.io/klog/v2"
14+
15+
"github.com/NVIDIA/topograph/internal/httperr"
16+
"github.com/NVIDIA/topograph/pkg/topology"
17+
)
18+
19+
const (
20+
defaultPageSize = 100
21+
)
22+
23+
func (p *baseProvider) generateInstanceTopology(ctx context.Context, pSize *int, cis []topology.ComputeInstances) (*topology.ClusterTopology, *httperr.Error) {
24+
topo := topology.NewClusterTopology()
25+
26+
pageSize := defaultPageSize
27+
if pSize != nil {
28+
pageSize = *pSize
29+
}
30+
31+
for _, ci := range cis {
32+
if err := p.generateRegionInstanceTopology(ctx, topo, pageSize, &ci); err != nil {
33+
return nil, err
34+
}
35+
}
36+
37+
return topo, nil
38+
}
39+
40+
func (p *baseProvider) generateRegionInstanceTopology(ctx context.Context, topo *topology.ClusterTopology, pageSize int, ci *topology.ComputeInstances) *httperr.Error {
41+
if len(ci.Region) == 0 {
42+
return httperr.NewError(http.StatusBadRequest, "must specify region")
43+
}
44+
klog.InfoS("Getting instance topology", "region", ci.Region)
45+
46+
offset := 0
47+
for {
48+
resp, err := p.client.Topology(ctx, ci.Region, pageSize, offset)
49+
if err != nil {
50+
return httperr.NewError(http.StatusBadGateway, fmt.Sprintf("failed to get topology: %v", err))
51+
}
52+
53+
n := len(resp)
54+
if n == 0 {
55+
klog.V(4).Infof("Total processed nodes: %d", topo.Len())
56+
return nil
57+
}
58+
offset += n
59+
60+
for _, inst := range resp {
61+
t := &topology.InstanceTopology{
62+
InstanceID: inst.ID,
63+
}
64+
65+
for indx := range minPathSize(inst.NetworkPath) {
66+
switch indx {
67+
case 0:
68+
t.CoreID = inst.NetworkPath[indx]
69+
case 1:
70+
t.SpineID = inst.NetworkPath[indx]
71+
case 2:
72+
t.LeafID = inst.NetworkPath[indx]
73+
default:
74+
klog.Warningf("unsupported size %d of topology path for instance %q", len(inst.NetworkPath), inst.ID)
75+
}
76+
}
77+
78+
if inst.BlockID != nil {
79+
t.AcceleratorID = *inst.BlockID
80+
}
81+
82+
klog.Infof("Adding topology: %s", t.String())
83+
topo.Append(t)
84+
}
85+
}
86+
}
87+
88+
func minPathSize(path []string) int {
89+
n := len(path)
90+
if n > 3 {
91+
// return one extra index to print warning
92+
return 4
93+
}
94+
return n
95+
}

pkg/providers/nscale/provider.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2026 NVIDIA CORPORATION
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package nscale
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"net/http"
13+
"strconv"
14+
15+
"github.com/NVIDIA/topograph/internal/config"
16+
"github.com/NVIDIA/topograph/internal/httperr"
17+
"github.com/NVIDIA/topograph/internal/httpreq"
18+
"github.com/NVIDIA/topograph/pkg/providers"
19+
"github.com/NVIDIA/topograph/pkg/topology"
20+
)
21+
22+
const (
23+
NAME = "nscale"
24+
25+
urlTopologyPath = "/v1/topology"
26+
)
27+
28+
type baseProvider struct {
29+
params *ProviderParams
30+
client Client
31+
}
32+
33+
type ProviderParams struct {
34+
BaseURL string `mapstructure:"baseUrl"`
35+
Region string `mapstructure:"region"`
36+
TrimTiers int `mapstructure:"trimTiers"`
37+
}
38+
39+
type Credentials struct {
40+
Org string `mapstructure:"org"`
41+
Token string `mapstructure:"token"`
42+
}
43+
44+
type Client interface {
45+
Topology(context.Context, string, int, int) ([]InstanceTopology, error)
46+
}
47+
48+
// nscaleClient is a Topology API client.
49+
type nscaleClient struct {
50+
baseURL string
51+
org string
52+
token string
53+
}
54+
55+
// InstanceTopology represents the topology of a single instance.
56+
type InstanceTopology struct {
57+
ID string `json:"instance_id"`
58+
NetworkPath []string `json:"network_node_path"`
59+
BlockID *string `json:"block_id,omitempty"`
60+
}
61+
62+
func (c *nscaleClient) Topology(ctx context.Context, region string, pageSize, offset int) ([]InstanceTopology, error) {
63+
headers := map[string]string{
64+
"Authorization": "Bearer " + c.token,
65+
"X-Organization": c.org,
66+
"X-Region": region,
67+
}
68+
query := map[string]string{
69+
"limit": strconv.Itoa(pageSize),
70+
"offset": strconv.Itoa(offset),
71+
}
72+
f := httpreq.GetRequestFunc(ctx, http.MethodGet, headers, query, nil, c.baseURL, urlTopologyPath)
73+
74+
body, httpErr := httpreq.DoRequestWithRetries(f, false)
75+
if httpErr != nil {
76+
return nil, httpErr
77+
}
78+
79+
resp := []InstanceTopology{}
80+
if err := json.Unmarshal(body, &resp); err != nil {
81+
return nil, httperr.NewError(http.StatusBadGateway, err.Error())
82+
}
83+
84+
return resp, nil
85+
}
86+
87+
type Provider struct {
88+
baseProvider
89+
}
90+
91+
func NamedLoader() (string, providers.Loader) {
92+
return NAME, Loader
93+
}
94+
95+
func Loader(ctx context.Context, config providers.Config) (providers.Provider, *httperr.Error) {
96+
params, err := getParams(config.Params)
97+
if err != nil {
98+
return nil, httperr.NewError(http.StatusBadRequest, err.Error())
99+
}
100+
101+
creds, err := getCreds(config.Creds)
102+
if err != nil {
103+
return nil, httperr.NewError(http.StatusBadRequest, err.Error())
104+
}
105+
106+
return &Provider{
107+
baseProvider: baseProvider{
108+
client: &nscaleClient{
109+
baseURL: params.BaseURL,
110+
org: creds.Org,
111+
token: creds.Token,
112+
},
113+
params: params,
114+
},
115+
}, nil
116+
}
117+
118+
func getParams(params map[string]any) (*ProviderParams, error) {
119+
p := &ProviderParams{}
120+
if err := config.Decode(params, p); err != nil {
121+
return nil, fmt.Errorf("failed to decode params: %v", err)
122+
}
123+
if len(p.BaseURL) == 0 {
124+
return nil, fmt.Errorf("missing 'baseUrl'")
125+
}
126+
127+
return p, nil
128+
}
129+
130+
func getCreds(creds map[string]any) (*Credentials, error) {
131+
c := &Credentials{}
132+
if err := config.Decode(creds, c); err != nil {
133+
return nil, fmt.Errorf("failed to decode creds: %v", err)
134+
}
135+
if len(c.Org) == 0 {
136+
return nil, fmt.Errorf("missing 'org'")
137+
}
138+
if len(c.Token) == 0 {
139+
return nil, fmt.Errorf("missing 'token'")
140+
}
141+
142+
return c, nil
143+
}
144+
145+
func (p *baseProvider) GenerateTopologyConfig(ctx context.Context, pageSize *int, instances []topology.ComputeInstances) (*topology.Graph, *httperr.Error) {
146+
topo, err := p.generateInstanceTopology(ctx, pageSize, instances)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
return topo.ToThreeTierGraph(NAME, instances, p.params.TrimTiers, false), nil
152+
}
153+
154+
// Instances2NodeMap implements slurm.instanceMapper
155+
func (p *Provider) Instances2NodeMap(ctx context.Context, nodes []string) (map[string]string, error) {
156+
i2n := make(map[string]string)
157+
for _, node := range nodes {
158+
i2n[node] = node
159+
}
160+
161+
return i2n, nil
162+
}
163+
164+
// GetInstancesRegions implements slurm.instanceMapper
165+
func (p *Provider) GetInstancesRegions(ctx context.Context, nodes []string) (map[string]string, error) {
166+
res := make(map[string]string)
167+
for _, node := range nodes {
168+
res[node] = p.params.Region
169+
}
170+
171+
return res, nil
172+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2026 NVIDIA CORPORATION
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package nscale
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"net/http"
12+
13+
"github.com/NVIDIA/topograph/internal/httperr"
14+
"github.com/NVIDIA/topograph/pkg/models"
15+
"github.com/NVIDIA/topograph/pkg/providers"
16+
"github.com/NVIDIA/topograph/pkg/topology"
17+
)
18+
19+
const (
20+
NAME_SIM = "nscale-sim"
21+
22+
errNone = iota
23+
errTopology
24+
)
25+
26+
type simClient struct {
27+
model *models.Model
28+
instanceIDs []string
29+
apiErr int
30+
}
31+
32+
type simProvider struct {
33+
baseProvider
34+
}
35+
36+
func (c *simClient) Topology(ctx context.Context, _ string, pageSize, offset int) ([]InstanceTopology, error) {
37+
if c.apiErr == errTopology {
38+
return nil, providers.ErrAPIError
39+
}
40+
41+
resp := make([]InstanceTopology, 0, pageSize)
42+
43+
var indx int
44+
for indx = offset; indx < offset+pageSize && indx < len(c.instanceIDs); indx++ {
45+
node, exists := c.model.Nodes[c.instanceIDs[indx]]
46+
if !exists {
47+
continue
48+
}
49+
nl := node.NetLayers
50+
path := make([]string, len(nl))
51+
for i, v := range nl {
52+
path[len(nl)-1-i] = v
53+
}
54+
instance := InstanceTopology{
55+
ID: node.Name,
56+
NetworkPath: path,
57+
}
58+
if len(node.Attributes.NVLink) != 0 {
59+
instance.BlockID = &node.Attributes.NVLink
60+
}
61+
62+
resp = append(resp, instance)
63+
}
64+
65+
return resp, nil
66+
}
67+
68+
func NamedLoaderSim() (string, providers.Loader) {
69+
return NAME_SIM, LoaderSim
70+
}
71+
72+
func LoaderSim(_ context.Context, cfg providers.Config) (providers.Provider, *httperr.Error) {
73+
p, err := providers.GetSimulationParams(cfg.Params)
74+
if err != nil {
75+
return nil, httperr.NewError(http.StatusBadRequest, err.Error())
76+
}
77+
78+
model, err := models.NewModelFromFile(p.ModelFileName)
79+
if err != nil {
80+
return nil, httperr.NewError(http.StatusBadRequest, fmt.Sprintf("failed to load model file: %v", err))
81+
}
82+
83+
instanceIDs := make([]string, 0, len(model.Nodes))
84+
for _, node := range model.Nodes {
85+
instanceIDs = append(instanceIDs, node.Name)
86+
}
87+
88+
return &simProvider{
89+
baseProvider: baseProvider{
90+
client: &simClient{
91+
model: model,
92+
instanceIDs: instanceIDs,
93+
apiErr: p.APIError,
94+
},
95+
params: &ProviderParams{},
96+
},
97+
}, nil
98+
}
99+
100+
// Engine support
101+
102+
func (p *simProvider) GetComputeInstances(ctx context.Context) ([]topology.ComputeInstances, *httperr.Error) {
103+
return p.client.(*simClient).model.Instances, nil
104+
}

0 commit comments

Comments
 (0)