Skip to content

Commit

Permalink
Merge upstream & fix existing test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mecoli1219 committed Feb 25, 2025
1 parent ea51a4b commit 253fadb
Show file tree
Hide file tree
Showing 19 changed files with 192 additions and 32 deletions.
20 changes: 10 additions & 10 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,6 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
Value: memory.Limit.String(),
})

// TODO: adjust the value based on the platform memory.
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: taskResourceRequirements.Defaults.OOMReservedMemory.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: taskResourceRequirements.Limits.OOMReservedMemory.String(),
})

// Only assign ephemeral storage when it is either requested or limited in the task definition, or a platform
// default exists.
if !taskResourceRequirements.Defaults.EphemeralStorage.IsZero() ||
Expand Down Expand Up @@ -274,6 +264,16 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
})
}

// TODO: adjust the value based on the platform memory.
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: taskResourceRequirements.Defaults.OOMReservedMemory.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: taskResourceRequirements.Limits.OOMReservedMemory.String(),
})

task.Template.GetContainer().Resources = &core.Resources{
Requests: finalizedResourceRequests,
Limits: finalizedResourceLimits,
Expand Down
72 changes: 58 additions & 14 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func TestCreateExecution(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -343,6 +347,10 @@ func TestCreateExecution(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "500Gi",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
}
mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.ExecutionData) bool {
Expand Down Expand Up @@ -4113,16 +4121,18 @@ func TestSetDefaults(t *testing.T) {
execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
GPU: resource.MustParse("4"),
Memory: resource.MustParse("200Gi"),
EphemeralStorage: resource.MustParse("500Mi"),
CPU: resource.MustParse("200m"),
GPU: resource.MustParse("4"),
Memory: resource.MustParse("200Gi"),
EphemeralStorage: resource.MustParse("500Mi"),
OOMReservedMemory: resource.MustParse("100Mi"),
},
Limits: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
OOMReservedMemory: resource.MustParse("300Mi"),
},
})
assert.True(t, proto.Equal(
Expand All @@ -4145,6 +4155,10 @@ func TestSetDefaults(t *testing.T) {
Name: core.Resources_GPU,
Value: "4",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -4163,6 +4177,10 @@ func TestSetDefaults(t *testing.T) {
Name: core.Resources_GPU,
Value: "4",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
},
},
Expand Down Expand Up @@ -4225,6 +4243,10 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) {
Name: core.Resources_GPU,
Value: "4",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -4239,6 +4261,10 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) {
Name: core.Resources_GPU,
Value: "4",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
},
},
Expand All @@ -4247,10 +4273,11 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) {

func TestSetDefaults_OptionalRequiredResources(t *testing.T) {
taskConfigLimits := runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("1"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("1"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
OOMReservedMemory: resource.MustParse("300Mi"),
}

task := &core.CompiledTask{
Expand All @@ -4276,8 +4303,9 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) {
execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
Memory: resource.MustParse("200Gi"),
CPU: resource.MustParse("200m"),
Memory: resource.MustParse("200Gi"),
OOMReservedMemory: resource.MustParse("0"),
},
Limits: taskConfigLimits,
})
Expand All @@ -4293,6 +4321,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -4303,6 +4335,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
},
},
Expand Down Expand Up @@ -4337,6 +4373,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) {
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "1",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -4351,6 +4391,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) {
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "1",
},
{
Name: core.Resources_OOM_RESERVED_MEMORY,
Value: "0",
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func TestToK8sContainer(t *testing.T) {
})
mockTaskExecMetadata.OnGetNamespace().Return("my-namespace")
mockTaskExecMetadata.OnGetConsoleURL().Return("")
mockTaskExecMetadata.OnGetOOMFailures().Return(0)

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, extendedReso
taskExecutionMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{})
taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil)
taskExecutionMetadata.OnGetConsoleURL().Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
return taskExecutionMetadata
}

Expand Down
8 changes: 6 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ func ToK8sResourceList(resources []*core.Resources_ResourceEntry, OOMCount uint3
case core.Resources_MEMORY:
if !v.IsZero() {
memQuantity := k8sResources[v1.ResourceMemory]
memQuantity.Add(v)
k8sResources[v1.ResourceMemory] = memQuantity
if !memQuantity.IsZero() {
memQuantity.Add(v)
k8sResources[v1.ResourceMemory] = memQuantity

Check warning on line 40 in flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go#L39-L40

Added lines #L39 - L40 were not covered by tests
} else {
k8sResources[v1.ResourceMemory] = v
}
}
case core.Resources_GPU:
if !v.IsZero() {
Expand Down
6 changes: 3 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestToK8sResourceList(t *testing.T) {
{Name: core.Resources_GPU, Value: "1"},
{Name: core.Resources_MEMORY, Value: "1024Mi"},
{Name: core.Resources_EPHEMERAL_STORAGE, Value: "1024Mi"},
})
}, 0)

assert.NoError(t, err)
assert.NotEmpty(t, r)
Expand All @@ -45,14 +45,14 @@ func TestToK8sResourceList(t *testing.T) {
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceEphemeralStorage])
}
{
r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{})
r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{}, 0)
assert.NoError(t, err)
assert.Empty(t, r)
}
{
_, err := ToK8sResourceList([]*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250x"},
})
}, 0)
assert.Error(t, err)
}

Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta
tMeta.OnGetInterruptibleFailureThreshold().Return(2)
tMeta.OnGetEnvironmentVariables().Return(nil)
tMeta.OnGetConsoleURL().Return("")
tMeta.OnGetOOMFailures().Return(0)

ow := &mocks2.OutputWriter{}
ow.OnGetOutputPrefixPath().Return("/prefix/")
Expand Down
3 changes: 2 additions & 1 deletion flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func dummyDaskTaskContext(taskTemplate *core.TaskTemplate, resources *v1.Resourc
taskExecutionMetadata.OnGetK8sServiceAccount().Return(defaultServiceAccountName)
taskExecutionMetadata.OnGetNamespace().Return(defaultNamespace)
taskExecutionMetadata.OnGetConsoleURL().Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
overrides := &mocks.TaskOverrides{}
overrides.OnGetResources().Return(resources)
overrides.OnGetExtendedResources().Return(extendedResources)
Expand Down Expand Up @@ -469,7 +470,7 @@ func TestBuildResourcesDaskCustomResoureRequirements(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "15G",
})
expectedResources, _ := flytek8s.ToK8sResourceRequirements(expectedPbResources)
expectedResources, _ := flytek8s.ToK8sResourceRequirements(expectedPbResources, 0)

flyteWorkflowResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso
taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{})
taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil)
taskExecutionMetadata.OnGetConsoleURL().Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata)

pluginStateReaderMock := mocks.PluginStateReader{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate, resources *corev1.
taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{})
taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil)
taskExecutionMetadata.OnGetConsoleURL().Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata)

pluginStateReaderMock := mocks.PluginStateReader{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate, resources *core
taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{})
taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil)
taskExecutionMetadata.OnGetConsoleURL().Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata)

pluginStateReaderMock := mocks.PluginStateReader{}
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements, extendedReso
Name: "test-owner-name",
})
taskMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{})
taskMetadata.OnGetOOMFailures().Return(0)

tID := &pluginsCoreMock.TaskExecutionID{}
tID.On("GetID").Return(core.TaskExecutionIdentifier{
Expand Down
5 changes: 3 additions & 2 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func dummyRayTaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso
})
taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil)
taskExecutionMetadata.OnGetConsoleURL().Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata)
return taskCtx
}
Expand Down Expand Up @@ -497,7 +498,7 @@ func TestBuildResourceRayCustomK8SPod(t *testing.T) {
}
headResources := &core.Resources{Requests: headResourceEntries, Limits: headResourceEntries}

expectedHeadResources, err := flytek8s.ToK8sResourceRequirements(headResources)
expectedHeadResources, err := flytek8s.ToK8sResourceRequirements(headResources, 0)
require.NoError(t, err)

workerResourceEntries := []*core.Resources_ResourceEntry{
Expand All @@ -507,7 +508,7 @@ func TestBuildResourceRayCustomK8SPod(t *testing.T) {
}
workerResources := &core.Resources{Requests: workerResourceEntries, Limits: workerResourceEntries}

expectedWorkerResources, err := flytek8s.ToK8sResourceRequirements(workerResources)
expectedWorkerResources, err := flytek8s.ToK8sResourceRequirements(workerResources, 0)
require.NoError(t, err)

nvidiaRuntimeClassName := "nvidia-cdi"
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate, interruptible bool,
taskExecutionMetadata.On("GetOverrides").Return(overrides)
taskExecutionMetadata.On("GetK8sServiceAccount").Return("new-val")
taskExecutionMetadata.On("GetConsoleURL").Return("")
taskExecutionMetadata.OnGetOOMFailures().Return(0)
taskCtx.On("TaskExecutionMetadata").Return(taskExecutionMetadata)

pluginStateReaderMock := mocks.PluginStateReader{}
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
tMeta.OnGetInterruptibleFailureThreshold().Return(2)
tMeta.OnGetEnvironmentVariables().Return(nil)
tMeta.OnGetConsoleURL().Return("")
tMeta.OnGetOOMFailures().Return(0)

catClient := &catalogMocks.Client{}
catData := sync.Map{}
Expand Down
Loading

0 comments on commit 253fadb

Please sign in to comment.