Skip to content

Commit

Permalink
Quotas (#170)
Browse files Browse the repository at this point in the history
First cut at resource allocations for clusters, servers and GPUs.
  • Loading branch information
spjmurray authored Jan 31, 2025
1 parent 39b46e4 commit 7e16892
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 54 deletions.
4 changes: 2 additions & 2 deletions charts/kubernetes/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ description: A Helm chart for deploying Unikorn Kubernetes Service

type: application

version: v0.2.56-rc1
appVersion: v0.2.56-rc1
version: v0.2.56-rc2
appVersion: v0.2.56-rc2

icon: https://raw.githubusercontent.com/unikorn-cloud/assets/main/images/logos/dark-on-light/icon.png

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ require (
github.com/oapi-codegen/runtime v1.1.1
github.com/prometheus/client_golang v1.20.5
github.com/spf13/pflag v1.0.5
github.com/unikorn-cloud/core v0.1.89-rc3
github.com/unikorn-cloud/identity v0.2.53-rc2
github.com/unikorn-cloud/core v0.1.89-rc5
github.com/unikorn-cloud/identity v0.2.53-rc4
github.com/unikorn-cloud/region v0.1.48-rc1
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/sdk v1.34.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/unikorn-cloud/core v0.1.89-rc3 h1:SFsNCTfjQd9sawO7/6HFmNRacspfYG6SlVH8vF3Wy2U=
github.com/unikorn-cloud/core v0.1.89-rc3/go.mod h1:UW7g0AFLjY6r3KVPv9SPu/POttZx6Tl6UZ30+s4da+M=
github.com/unikorn-cloud/identity v0.2.53-rc2 h1:rkK0jsN5kOzKuq2vK4KdYezKMC7qwfq/0ahBm3jiEFs=
github.com/unikorn-cloud/identity v0.2.53-rc2/go.mod h1:DqOMDX52AYimYK1j/NkR9wu0kPqX4EvZDa5qn8EIIE0=
github.com/unikorn-cloud/core v0.1.89-rc5 h1:DYT9DOO9gdqzN3U1cirxWDgvECN9vfPuZda7KQE86KI=
github.com/unikorn-cloud/core v0.1.89-rc5/go.mod h1:UW7g0AFLjY6r3KVPv9SPu/POttZx6Tl6UZ30+s4da+M=
github.com/unikorn-cloud/identity v0.2.53-rc4 h1:TbVMI6JftmuF1EI3mWFL9Prr6Xtw1jsoJAsbzeaj8bY=
github.com/unikorn-cloud/identity v0.2.53-rc4/go.mod h1:z1Y9lXyjEovrjOp7FylZN/k6wGU3/e6i6s7YWmbaRrE=
github.com/unikorn-cloud/region v0.1.48-rc1 h1:+AMe3AqUSQcPQ7qAwudrzwoVYe6RGUCpZnbbYqZm1bg=
github.com/unikorn-cloud/region v0.1.48-rc1/go.mod h1:lRRDZnbJXCgpN37ylzfGnRWzsNJaR3WTpNyeh96SBpE=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/provisioners/managers/cluster/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (p *Provisioner) Provision(ctx context.Context) error {
}

// Likewise identity creation is provisioned asynchronously as it too takes a
// long time, epspectially if a physical network is being provisioned and that
// long time, especially if a physical network is being provisioned and that
// needs to go out and talk to swiches.
clientContext, client, err := p.getRegionClient(ctx, "provision")
if err != nil {
Expand Down
201 changes: 189 additions & 12 deletions pkg/server/handler/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cluster

import (
"context"
goerrors "errors"
"fmt"
"net"
"net/http"
"slices"
Expand All @@ -30,24 +32,31 @@ import (
coreapi "github.com/unikorn-cloud/core/pkg/openapi"
"github.com/unikorn-cloud/core/pkg/server/conversion"
"github.com/unikorn-cloud/core/pkg/server/errors"
identityapi "github.com/unikorn-cloud/identity/pkg/openapi"
unikornv1 "github.com/unikorn-cloud/kubernetes/pkg/apis/unikorn/v1alpha1"
"github.com/unikorn-cloud/kubernetes/pkg/openapi"
"github.com/unikorn-cloud/kubernetes/pkg/provisioners/helmapplications/clusteropenstack"
"github.com/unikorn-cloud/kubernetes/pkg/provisioners/helmapplications/vcluster"
"github.com/unikorn-cloud/kubernetes/pkg/server/handler/clustermanager"
"github.com/unikorn-cloud/kubernetes/pkg/server/handler/common"
"github.com/unikorn-cloud/kubernetes/pkg/server/handler/region"
regionapi "github.com/unikorn-cloud/region/pkg/openapi"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
ErrConsistency = goerrors.New("consistency error")

ErrAPI = goerrors.New("remote api error")
)

type Options struct {
ControlPlaneCPUsMax int
ControlPlaneMemoryMaxGiB int
Expand Down Expand Up @@ -83,16 +92,20 @@ type Client struct {
// options control various defaults and the like.
options *Options

// identity is a client to access the identity service.
identity identityapi.ClientWithResponsesInterface

// region is a client to access regions.
region regionapi.ClientWithResponsesInterface
}

// NewClient returns a new client with required parameters.
func NewClient(client client.Client, namespace string, options *Options, region regionapi.ClientWithResponsesInterface) *Client {
func NewClient(client client.Client, namespace string, options *Options, identity identityapi.ClientWithResponsesInterface, region regionapi.ClientWithResponsesInterface) *Client {
return &Client{
client: client,
namespace: namespace,
options: options,
identity: identity,
region: region,
}
}
Expand Down Expand Up @@ -187,6 +200,129 @@ func (c *Client) GetKubeconfig(ctx context.Context, organizationID, projectID, c
return secret.Data["value"], nil
}

func (c *Client) generateAllocations(ctx context.Context, organizationID string, resource *unikornv1.KubernetesCluster) (*identityapi.AllocationWrite, error) {
flavors, err := region.Flavors(ctx, c.region, organizationID, resource.Spec.RegionID)
if err != nil {
return nil, err
}

var serversCommitted int

var serversReserved int

var gpusCommitted int

var gpusReserved int

// NOTE: the control plane is "free".
for _, pool := range resource.Spec.WorkloadPools.Pools {
serversMinimum := *pool.Replicas
serversMaximum := *pool.Replicas

if pool.Autoscaling != nil {
serversMinimum = *pool.Autoscaling.MinimumReplicas
serversMaximum = *pool.Autoscaling.MaximumReplicas
}

serversCommitted += serversMinimum
serversReserved += serversMaximum - serversMinimum

flavorByID := func(f regionapi.Flavor) bool {
return f.Metadata.Id == *pool.FlavorID
}

index := slices.IndexFunc(flavors, flavorByID)
if index < 0 {
return nil, fmt.Errorf("%w: flavorID does not exist", ErrConsistency)
}

flavor := flavors[index]

if flavor.Spec.Gpu != nil {
gpusCommitted += serversMinimum * flavor.Spec.Gpu.PhysicalCount
gpusReserved += serversMaximum * flavor.Spec.Gpu.PhysicalCount
}
}

request := &identityapi.AllocationWrite{
Metadata: coreapi.ResourceWriteMetadata{
Name: "unused",
},
Spec: identityapi.AllocationSpec{
Kind: "kubernetescluster",
Id: resource.Name,
Allocations: identityapi.QuotaListDetailed{
{
Kind: "clusters",
Committed: 1,
Reserved: 0,
},
{
Kind: "servers",
Committed: serversCommitted,
Reserved: serversReserved,
},
{
Kind: "gpus",
Committed: gpusCommitted,
Reserved: gpusReserved,
},
},
},
}

return request, nil
}

func (c *Client) createAllocation(ctx context.Context, organizationID, projectID string, resource *unikornv1.KubernetesCluster) (*identityapi.AllocationRead, error) {
allocations, err := c.generateAllocations(ctx, organizationID, resource)
if err != nil {
return nil, err
}

resp, err := c.identity.PostApiV1OrganizationsOrganizationIDProjectsProjectIDAllocationsWithResponse(ctx, organizationID, projectID, *allocations)
if err != nil {
return nil, err
}

if resp.StatusCode() != http.StatusCreated {
return nil, fmt.Errorf("%w: unexpected status code %d", ErrAPI, resp.StatusCode())
}

return resp.JSON201, nil
}

func (c *Client) updateAllocation(ctx context.Context, organizationID, projectID string, resource *unikornv1.KubernetesCluster) error {
allocations, err := c.generateAllocations(ctx, organizationID, resource)
if err != nil {
return err
}

resp, err := c.identity.PutApiV1OrganizationsOrganizationIDProjectsProjectIDAllocationsAllocationIDWithResponse(ctx, organizationID, projectID, resource.Annotations[constants.AllocationAnnotation], *allocations)
if err != nil {
return err
}

if resp.StatusCode() != http.StatusOK {
return fmt.Errorf("%w: unexpected status code %d", ErrAPI, resp.StatusCode())
}

return nil
}

func (c *Client) deleteAllocation(ctx context.Context, organizationID, projectID, allocationID string) error {
resp, err := c.identity.DeleteApiV1OrganizationsOrganizationIDProjectsProjectIDAllocationsAllocationIDWithResponse(ctx, organizationID, projectID, allocationID)
if err != nil {
return err
}

if resp.StatusCode() != http.StatusAccepted {
return fmt.Errorf("%w: unexpected status code %d", ErrAPI, resp.StatusCode())
}

return nil
}

func (c *Client) createIdentity(ctx context.Context, organizationID, projectID, regionID, clusterID string) (*regionapi.IdentityRead, error) {
tags := coreapi.TagList{
coreapi.Tag{
Expand Down Expand Up @@ -280,12 +416,13 @@ func (c *Client) getRegion(ctx context.Context, organizationID, regionID string)
return &results[index], nil
}

func (c *Client) applyCloudSpecificConfiguration(ctx context.Context, organizationID, projectID, regionID string, identity *regionapi.IdentityRead, cluster *unikornv1.KubernetesCluster) error {
func (c *Client) applyCloudSpecificConfiguration(ctx context.Context, organizationID, projectID, regionID string, allocation *identityapi.AllocationRead, identity *regionapi.IdentityRead, cluster *unikornv1.KubernetesCluster) error {
// Save the identity ID for later cleanup.
if cluster.Annotations == nil {
cluster.Annotations = map[string]string{}
}

cluster.Annotations[constants.AllocationAnnotation] = allocation.Metadata.Id
cluster.Annotations[constants.IdentityAnnotation] = identity.Metadata.Id

// Apply any region specific configuration based on feature flags.
Expand All @@ -311,6 +448,31 @@ func (c *Client) applyCloudSpecificConfiguration(ctx context.Context, organizati
return nil
}

func preserveAnnotations(requested, current *unikornv1.KubernetesCluster) error {
identity, ok := current.Annotations[constants.IdentityAnnotation]
if !ok {
return fmt.Errorf("%w: identity annotation missing", ErrConsistency)
}

allocation, ok := current.Annotations[constants.AllocationAnnotation]
if !ok {
return fmt.Errorf("%w: allocation annotation missing", ErrConsistency)
}

if requested.Annotations == nil {
requested.Annotations = map[string]string{}
}

requested.Annotations[constants.IdentityAnnotation] = identity
requested.Annotations[constants.AllocationAnnotation] = allocation

if network, ok := current.Annotations[constants.PhysicalNetworkAnnotation]; ok {
requested.Annotations[constants.PhysicalNetworkAnnotation] = network
}

return nil
}

// Create creates the implicit cluster indentified by the JTW claims.
func (c *Client) Create(ctx context.Context, organizationID, projectID string, request *openapi.KubernetesClusterWrite) (*openapi.KubernetesClusterRead, error) {
namespace, err := common.New(c.client).ProjectNamespace(ctx, organizationID, projectID)
Expand All @@ -333,12 +495,17 @@ func (c *Client) Create(ctx context.Context, organizationID, projectID string, r
return nil, err
}

allocation, err := c.createAllocation(ctx, organizationID, projectID, cluster)
if err != nil {
return nil, errors.OAuth2ServerError("failed to create quota allocation").WithError(err)
}

identity, err := c.createIdentity(ctx, organizationID, projectID, request.Spec.RegionId, cluster.Name)
if err != nil {
return nil, err
}

if err := c.applyCloudSpecificConfiguration(ctx, organizationID, projectID, request.Spec.RegionId, identity, cluster); err != nil {
if err := c.applyCloudSpecificConfiguration(ctx, organizationID, projectID, request.Spec.RegionId, allocation, identity, cluster); err != nil {
return nil, err
}

Expand All @@ -356,15 +523,13 @@ func (c *Client) Delete(ctx context.Context, organizationID, projectID, clusterI
return err
}

if namespace.DeletionTimestamp != nil {
return errors.OAuth2InvalidRequest("control plane is being deleted")
}
cluster, err := c.get(ctx, namespace.Name, clusterID)
if err != nil {
if kerrors.IsNotFound(err) {
return errors.HTTPNotFound().WithError(err)
}

cluster := &unikornv1.KubernetesCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterID,
Namespace: namespace.Name,
},
return errors.OAuth2ServerError("failed to get cluster")
}

if err := c.client.Delete(ctx, cluster); err != nil {
Expand All @@ -375,6 +540,10 @@ func (c *Client) Delete(ctx context.Context, organizationID, projectID, clusterI
return errors.OAuth2ServerError("failed to delete cluster").WithError(err)
}

if err := c.deleteAllocation(ctx, organizationID, projectID, cluster.Annotations[constants.AllocationAnnotation]); err != nil {
return errors.OAuth2ServerError("failed to delete quota allocation").WithError(err)
}

return nil
}

Expand Down Expand Up @@ -403,6 +572,14 @@ func (c *Client) Update(ctx context.Context, organizationID, projectID, clusterI
return errors.OAuth2ServerError("failed to merge metadata").WithError(err)
}

if err := preserveAnnotations(required, current); err != nil {
return errors.OAuth2ServerError("failed to merge annotations").WithError(err)
}

if err := c.updateAllocation(ctx, organizationID, projectID, required); err != nil {
return errors.OAuth2ServerError("failed to update quota allocation").WithError(err)
}

// Experience has taught me that modifying caches by accident is a bad thing
// so be extra safe and deep copy the existing resource.
updated := current.DeepCopy()
Expand Down
Loading

0 comments on commit 7e16892

Please sign in to comment.