From 253fadbefc19b9037b71ad2b8820068589b00ce5 Mon Sep 17 00:00:00 2001 From: Mecoli1219 Date: Mon, 24 Feb 2025 18:59:40 -0800 Subject: [PATCH] Merge upstream & fix existing test --- .../pkg/manager/impl/execution_manager.go | 20 +++--- .../manager/impl/execution_manager_test.go | 72 +++++++++++++++---- .../flytek8s/container_helper_test.go | 1 + .../flytek8s/pod_helper_test.go | 1 + .../tasks/pluginmachinery/flytek8s/utils.go | 8 ++- .../pluginmachinery/flytek8s/utils_test.go | 6 +- .../plugins/array/k8s/management_test.go | 1 + .../go/tasks/plugins/k8s/dask/dask_test.go | 3 +- .../plugins/k8s/kfoperators/mpi/mpi_test.go | 1 + .../k8s/kfoperators/pytorch/pytorch_test.go | 1 + .../kfoperators/tensorflow/tensorflow_test.go | 1 + .../tasks/plugins/k8s/pod/container_test.go | 1 + .../go/tasks/plugins/k8s/ray/ray_test.go | 5 +- .../go/tasks/plugins/k8s/spark/spark_test.go | 1 + flyteplugins/tests/end_to_end.go | 1 + .../v1alpha1/mocks/ExecutableNodeStatus.go | 64 +++++++++++++++++ .../v1alpha1/mocks/MutableNodeStatus.go | 32 +++++++++ .../pkg/controller/nodes/task/handler_test.go | 4 ++ .../nodes/task/taskexec_context_test.go | 1 + 19 files changed, 192 insertions(+), 32 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 2a10badf49..941d13f098 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -231,16 +231,6 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co Value: memory.Limit.String(), }) - // TODO: adjust the value based on the platform memory. - finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ - Name: core.Resources_OOM_RESERVED_MEMORY, - Value: taskResourceRequirements.Defaults.OOMReservedMemory.String(), - }) - finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ - Name: core.Resources_OOM_RESERVED_MEMORY, - Value: taskResourceRequirements.Limits.OOMReservedMemory.String(), - }) - // Only assign ephemeral storage when it is either requested or limited in the task definition, or a platform // default exists. if !taskResourceRequirements.Defaults.EphemeralStorage.IsZero() || @@ -274,6 +264,16 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co }) } + // TODO: adjust the value based on the platform memory. + finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: taskResourceRequirements.Defaults.OOMReservedMemory.String(), + }) + finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{ + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: taskResourceRequirements.Limits.OOMReservedMemory.String(), + }) + task.Template.GetContainer().Resources = &core.Resources{ Requests: finalizedResourceRequests, Limits: finalizedResourceLimits, diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 775ef50522..86cf09b9f7 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -333,6 +333,10 @@ func TestCreateExecution(t *testing.T) { Name: core.Resources_MEMORY, Value: "200Gi", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -343,6 +347,10 @@ func TestCreateExecution(t *testing.T) { Name: core.Resources_MEMORY, Value: "500Gi", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, } mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.ExecutionData) bool { @@ -4113,16 +4121,18 @@ func TestSetDefaults(t *testing.T) { execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ Defaults: runtimeInterfaces.TaskResourceSet{ - CPU: resource.MustParse("200m"), - GPU: resource.MustParse("4"), - Memory: resource.MustParse("200Gi"), - EphemeralStorage: resource.MustParse("500Mi"), + CPU: resource.MustParse("200m"), + GPU: resource.MustParse("4"), + Memory: resource.MustParse("200Gi"), + EphemeralStorage: resource.MustParse("500Mi"), + OOMReservedMemory: resource.MustParse("100Mi"), }, Limits: runtimeInterfaces.TaskResourceSet{ - CPU: resource.MustParse("300m"), - GPU: resource.MustParse("8"), - Memory: resource.MustParse("500Gi"), - EphemeralStorage: resource.MustParse("501Mi"), + CPU: resource.MustParse("300m"), + GPU: resource.MustParse("8"), + Memory: resource.MustParse("500Gi"), + EphemeralStorage: resource.MustParse("501Mi"), + OOMReservedMemory: resource.MustParse("300Mi"), }, }) assert.True(t, proto.Equal( @@ -4145,6 +4155,10 @@ func TestSetDefaults(t *testing.T) { Name: core.Resources_GPU, Value: "4", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -4163,6 +4177,10 @@ func TestSetDefaults(t *testing.T) { Name: core.Resources_GPU, Value: "4", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, }, }, @@ -4225,6 +4243,10 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) { Name: core.Resources_GPU, Value: "4", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -4239,6 +4261,10 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) { Name: core.Resources_GPU, Value: "4", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, }, }, @@ -4247,10 +4273,11 @@ func TestSetDefaults_MissingRequests_ExistingRequestsPreserved(t *testing.T) { func TestSetDefaults_OptionalRequiredResources(t *testing.T) { taskConfigLimits := runtimeInterfaces.TaskResourceSet{ - CPU: resource.MustParse("300m"), - GPU: resource.MustParse("1"), - Memory: resource.MustParse("500Gi"), - EphemeralStorage: resource.MustParse("501Mi"), + CPU: resource.MustParse("300m"), + GPU: resource.MustParse("1"), + Memory: resource.MustParse("500Gi"), + EphemeralStorage: resource.MustParse("501Mi"), + OOMReservedMemory: resource.MustParse("300Mi"), } task := &core.CompiledTask{ @@ -4276,8 +4303,9 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ Defaults: runtimeInterfaces.TaskResourceSet{ - CPU: resource.MustParse("200m"), - Memory: resource.MustParse("200Gi"), + CPU: resource.MustParse("200m"), + Memory: resource.MustParse("200Gi"), + OOMReservedMemory: resource.MustParse("0"), }, Limits: taskConfigLimits, }) @@ -4293,6 +4321,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { Name: core.Resources_MEMORY, Value: "200Gi", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -4303,6 +4335,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { Name: core.Resources_MEMORY, Value: "200Gi", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, }, }, @@ -4337,6 +4373,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { Name: core.Resources_EPHEMERAL_STORAGE, Value: "1", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -4351,6 +4391,10 @@ func TestSetDefaults_OptionalRequiredResources(t *testing.T) { Name: core.Resources_EPHEMERAL_STORAGE, Value: "1", }, + { + Name: core.Resources_OOM_RESERVED_MEMORY, + Value: "0", + }, }, }, }, diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go index 4e609c72b2..df057537cc 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go @@ -405,6 +405,7 @@ func TestToK8sContainer(t *testing.T) { }) mockTaskExecMetadata.OnGetNamespace().Return("my-namespace") mockTaskExecMetadata.OnGetConsoleURL().Return("") + mockTaskExecMetadata.OnGetOOMFailures().Return(0) tCtx := &mocks.TaskExecutionContext{} tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 6fab4ce455..e3a2c13ca7 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -61,6 +61,7 @@ func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, extendedReso taskExecutionMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) taskExecutionMetadata.OnGetConsoleURL().Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) return taskExecutionMetadata } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go index c93eaba12c..aaa970e6de 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go @@ -35,8 +35,12 @@ func ToK8sResourceList(resources []*core.Resources_ResourceEntry, OOMCount uint3 case core.Resources_MEMORY: if !v.IsZero() { memQuantity := k8sResources[v1.ResourceMemory] - memQuantity.Add(v) - k8sResources[v1.ResourceMemory] = memQuantity + if !memQuantity.IsZero() { + memQuantity.Add(v) + k8sResources[v1.ResourceMemory] = memQuantity + } else { + k8sResources[v1.ResourceMemory] = v + } } case core.Resources_GPU: if !v.IsZero() { diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go index e5a78807f6..b0be54ae95 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go @@ -34,7 +34,7 @@ func TestToK8sResourceList(t *testing.T) { {Name: core.Resources_GPU, Value: "1"}, {Name: core.Resources_MEMORY, Value: "1024Mi"}, {Name: core.Resources_EPHEMERAL_STORAGE, Value: "1024Mi"}, - }) + }, 0) assert.NoError(t, err) assert.NotEmpty(t, r) @@ -45,14 +45,14 @@ func TestToK8sResourceList(t *testing.T) { assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceEphemeralStorage]) } { - r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{}) + r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{}, 0) assert.NoError(t, err) assert.Empty(t, r) } { _, err := ToK8sResourceList([]*core.Resources_ResourceEntry{ {Name: core.Resources_CPU, Value: "250x"}, - }) + }, 0) assert.Error(t, err) } diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go index 2edf3882ef..f4691529aa 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go @@ -118,6 +118,7 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta tMeta.OnGetInterruptibleFailureThreshold().Return(2) tMeta.OnGetEnvironmentVariables().Return(nil) tMeta.OnGetConsoleURL().Return("") + tMeta.OnGetOOMFailures().Return(0) ow := &mocks2.OutputWriter{} ow.OnGetOutputPrefixPath().Return("/prefix/") diff --git a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go index 21541d159b..23ae7248df 100644 --- a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go @@ -201,6 +201,7 @@ func dummyDaskTaskContext(taskTemplate *core.TaskTemplate, resources *v1.Resourc taskExecutionMetadata.OnGetK8sServiceAccount().Return(defaultServiceAccountName) taskExecutionMetadata.OnGetNamespace().Return(defaultNamespace) taskExecutionMetadata.OnGetConsoleURL().Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) overrides.OnGetExtendedResources().Return(extendedResources) @@ -469,7 +470,7 @@ func TestBuildResourcesDaskCustomResoureRequirements(t *testing.T) { Name: core.Resources_MEMORY, Value: "15G", }) - expectedResources, _ := flytek8s.ToK8sResourceRequirements(expectedPbResources) + expectedResources, _ := flytek8s.ToK8sResourceRequirements(expectedPbResources, 0) flyteWorkflowResources := v1.ResourceRequirements{ Requests: v1.ResourceList{ diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index 92ade4fe2c..c2d2be8221 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -172,6 +172,7 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) taskExecutionMetadata.OnGetConsoleURL().Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata) pluginStateReaderMock := mocks.PluginStateReader{} diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index 04fcb804ce..4da9bef29d 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -181,6 +181,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate, resources *corev1. taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) taskExecutionMetadata.OnGetConsoleURL().Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata) pluginStateReaderMock := mocks.PluginStateReader{} diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index 3957518478..752e9c0fa6 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -173,6 +173,7 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate, resources *core taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) taskExecutionMetadata.OnGetConsoleURL().Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata) pluginStateReaderMock := mocks.PluginStateReader{} diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go b/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go index 39a98b3862..211777e4fe 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go @@ -108,6 +108,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements, extendedReso Name: "test-owner-name", }) taskMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{}) + taskMetadata.OnGetOOMFailures().Return(0) tID := &pluginsCoreMock.TaskExecutionID{} tID.On("GetID").Return(core.TaskExecutionIdentifier{ diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 8cfa53c263..98af92218d 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -163,6 +163,7 @@ func dummyRayTaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso }) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) taskExecutionMetadata.OnGetConsoleURL().Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) taskCtx.OnTaskExecutionMetadata().Return(taskExecutionMetadata) return taskCtx } @@ -497,7 +498,7 @@ func TestBuildResourceRayCustomK8SPod(t *testing.T) { } headResources := &core.Resources{Requests: headResourceEntries, Limits: headResourceEntries} - expectedHeadResources, err := flytek8s.ToK8sResourceRequirements(headResources) + expectedHeadResources, err := flytek8s.ToK8sResourceRequirements(headResources, 0) require.NoError(t, err) workerResourceEntries := []*core.Resources_ResourceEntry{ @@ -507,7 +508,7 @@ func TestBuildResourceRayCustomK8SPod(t *testing.T) { } workerResources := &core.Resources{Requests: workerResourceEntries, Limits: workerResourceEntries} - expectedWorkerResources, err := flytek8s.ToK8sResourceRequirements(workerResources) + expectedWorkerResources, err := flytek8s.ToK8sResourceRequirements(workerResources, 0) require.NoError(t, err) nvidiaRuntimeClassName := "nvidia-cdi" diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index 1fd63900a4..6c64d152e5 100644 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -475,6 +475,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate, interruptible bool, taskExecutionMetadata.On("GetOverrides").Return(overrides) taskExecutionMetadata.On("GetK8sServiceAccount").Return("new-val") taskExecutionMetadata.On("GetConsoleURL").Return("") + taskExecutionMetadata.OnGetOOMFailures().Return(0) taskCtx.On("TaskExecutionMetadata").Return(taskExecutionMetadata) pluginStateReaderMock := mocks.PluginStateReader{} diff --git a/flyteplugins/tests/end_to_end.go b/flyteplugins/tests/end_to_end.go index 4cb2867abc..bce3eee08c 100644 --- a/flyteplugins/tests/end_to_end.go +++ b/flyteplugins/tests/end_to_end.go @@ -173,6 +173,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i tMeta.OnGetInterruptibleFailureThreshold().Return(2) tMeta.OnGetEnvironmentVariables().Return(nil) tMeta.OnGetConsoleURL().Return("") + tMeta.OnGetOOMFailures().Return(0) catClient := &catalogMocks.Client{} catData := sync.Map{} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go index cdf3f1b6ab..6068f9eb37 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go @@ -423,6 +423,38 @@ func (_m *ExecutableNodeStatus) GetNodeExecutionStatus(ctx context.Context, id s return r0 } +type ExecutableNodeStatus_GetOOMFailures struct { + *mock.Call +} + +func (_m ExecutableNodeStatus_GetOOMFailures) Return(_a0 uint32) *ExecutableNodeStatus_GetOOMFailures { + return &ExecutableNodeStatus_GetOOMFailures{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableNodeStatus) OnGetOOMFailures() *ExecutableNodeStatus_GetOOMFailures { + c_call := _m.On("GetOOMFailures") + return &ExecutableNodeStatus_GetOOMFailures{Call: c_call} +} + +func (_m *ExecutableNodeStatus) OnGetOOMFailuresMatch(matchers ...interface{}) *ExecutableNodeStatus_GetOOMFailures { + c_call := _m.On("GetOOMFailures", matchers...) + return &ExecutableNodeStatus_GetOOMFailures{Call: c_call} +} + +// GetOOMFailures provides a mock function with given fields: +func (_m *ExecutableNodeStatus) GetOOMFailures() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type ExecutableNodeStatus_GetOrCreateArrayNodeStatus struct { *mock.Call } @@ -1061,6 +1093,38 @@ func (_m *ExecutableNodeStatus) IncrementAttempts() uint32 { return r0 } +type ExecutableNodeStatus_IncrementOOMFailures struct { + *mock.Call +} + +func (_m ExecutableNodeStatus_IncrementOOMFailures) Return(_a0 uint32) *ExecutableNodeStatus_IncrementOOMFailures { + return &ExecutableNodeStatus_IncrementOOMFailures{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableNodeStatus) OnIncrementOOMFailures() *ExecutableNodeStatus_IncrementOOMFailures { + c_call := _m.On("IncrementOOMFailures") + return &ExecutableNodeStatus_IncrementOOMFailures{Call: c_call} +} + +func (_m *ExecutableNodeStatus) OnIncrementOOMFailuresMatch(matchers ...interface{}) *ExecutableNodeStatus_IncrementOOMFailures { + c_call := _m.On("IncrementOOMFailures", matchers...) + return &ExecutableNodeStatus_IncrementOOMFailures{Call: c_call} +} + +// IncrementOOMFailures provides a mock function with given fields: +func (_m *ExecutableNodeStatus) IncrementOOMFailures() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type ExecutableNodeStatus_IncrementSystemFailures struct { *mock.Call } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go index 3f103bc2ec..0988778360 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go @@ -493,6 +493,38 @@ func (_m *MutableNodeStatus) IncrementAttempts() uint32 { return r0 } +type MutableNodeStatus_IncrementOOMFailures struct { + *mock.Call +} + +func (_m MutableNodeStatus_IncrementOOMFailures) Return(_a0 uint32) *MutableNodeStatus_IncrementOOMFailures { + return &MutableNodeStatus_IncrementOOMFailures{Call: _m.Call.Return(_a0)} +} + +func (_m *MutableNodeStatus) OnIncrementOOMFailures() *MutableNodeStatus_IncrementOOMFailures { + c_call := _m.On("IncrementOOMFailures") + return &MutableNodeStatus_IncrementOOMFailures{Call: c_call} +} + +func (_m *MutableNodeStatus) OnIncrementOOMFailuresMatch(matchers ...interface{}) *MutableNodeStatus_IncrementOOMFailures { + c_call := _m.On("IncrementOOMFailures", matchers...) + return &MutableNodeStatus_IncrementOOMFailures{Call: c_call} +} + +// IncrementOOMFailures provides a mock function with given fields: +func (_m *MutableNodeStatus) IncrementOOMFailures() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type MutableNodeStatus_IncrementSystemFailures struct { *mock.Call } diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 62e64c02f3..32e3084018 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -487,6 +487,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) { ns := &flyteMocks.ExecutableNodeStatus{} ns.OnGetDataDir().Return("data-dir") ns.OnGetOutputDir().Return("data-dir") + ns.OnGetOOMFailures().Return(0) res := &v1.ResourceRequirements{} n := &flyteMocks.ExecutableNode{} @@ -807,6 +808,7 @@ func Test_task_Abort(t *testing.T) { ns := &flyteMocks.ExecutableNodeStatus{} ns.OnGetDataDir().Return(storage.DataReference("data-dir")) ns.OnGetOutputDir().Return(storage.DataReference("output-dir")) + ns.OnGetOOMFailures().Return(0) res := &v1.ResourceRequirements{} n := &flyteMocks.ExecutableNode{} @@ -971,6 +973,7 @@ func Test_task_Abort_v1(t *testing.T) { ns := &flyteMocks.ExecutableNodeStatus{} ns.OnGetDataDir().Return(storage.DataReference("data-dir")) ns.OnGetOutputDir().Return(storage.DataReference("output-dir")) + ns.OnGetOOMFailures().Return(0) res := &v1.ResourceRequirements{} n := &flyteMocks.ExecutableNode{} @@ -1153,6 +1156,7 @@ func Test_task_Finalize(t *testing.T) { ns := &flyteMocks.ExecutableNodeStatus{} ns.OnGetDataDir().Return(storage.DataReference("data-dir")) ns.OnGetOutputDir().Return(storage.DataReference("output-dir")) + ns.OnGetOOMFailures().Return(0) res := &v1.ResourceRequirements{} n := &flyteMocks.ExecutableNode{} diff --git a/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go b/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go index 9a469fd25c..38a826f08b 100644 --- a/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go +++ b/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go @@ -74,6 +74,7 @@ func dummyNodeExecutionContext(t *testing.T, parentInfo executors.ImmutableParen ns := &flyteMocks.ExecutableNodeStatus{} ns.OnGetDataDir().Return("data-dir") ns.OnGetOutputDir().Return("output-dir") + ns.OnGetOOMFailures().Return(0) n := &flyteMocks.ExecutableNode{} n.OnGetResources().Return(resources)