Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyteagent] Add agents and agentForTaskTypes in helm chart #6239

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions charts/flyte-binary/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ Chart for basic single Flyte executable deployment
| clusterResourceTemplates.labels | object | `{}` | |
| commonAnnotations | object | `{}` | |
| commonLabels | object | `{}` | |
| configuration.agentService.agentForTaskTypes[0].noop_task | string | `"custom_agent"` | |
| configuration.agentService.agents.custom_agent.endpoint | string | `"k8s://flyte_custom_agent.flyte:8000"` | |
| configuration.agentService.agents.custom_agent.insecure | bool | `true` | |
| configuration.agentService.defaultAgent.defaultTimeout | string | `"10s"` | |
| configuration.agentService.defaultAgent.endpoint | string | `"k8s://flyteagent.flyte:8000"` | |
| configuration.agentService.defaultAgent.insecure | bool | `true` | |
| configuration.agentService.defaultAgent.timeouts.CreateTask | string | `"10s"` | |
| configuration.agentService.defaultAgent.timeouts.DeleteTask | string | `"10s"` | |
| configuration.agentService.defaultAgent.timeouts.ExecuteTaskSync | string | `"10s"` | |
| configuration.agentService.defaultAgent.timeouts.GetTask | string | `"10s"` | |
| configuration.agentService.supportedTaskTypes | list | `[]` | |
| configuration.annotations | object | `{}` | |
| configuration.auth.authorizedUris | list | `[]` | |
| configuration.auth.clientSecretsExternalSecretRef | string | `""` | |
Expand Down
10 changes: 10 additions & 0 deletions charts/flyte-binary/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,18 @@ configuration:
endpoint: "k8s://flyteagent.flyte:8000"
insecure: true
timeouts:
CreateTask: 10s
GetTask: 10s
DeleteTask: 10s
ExecuteTaskSync: 10s
defaultTimeout: 10s
supportedTaskTypes: []
agents:
custom_agent:
endpoint: "k8s://flyte_custom_agent.flyte:8000"
insecure: true
agentForTaskTypes:
- noop_task: custom_agent
# propeller Specify configuration for Flyte Propeller
propeller:
# createCRDs If true, Propeller will install CRDs at runtime, if false, CRDs will be installed during helm install
Expand Down
5 changes: 3 additions & 2 deletions charts/flyte-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ helm install gateway bitnami/contour -n flyte
| flyteadmin.serviceMonitor.scrapeTimeout | string | `"30s"` | Sets the timeout after which request to scrape metrics will time out |
| flyteadmin.tolerations | list | `[]` | tolerations for Flyteadmin deployment |
| flyteagent.enabled | bool | `false` | |
| flyteagent.plugin_config.plugins.agent-service | object | `{"defaultAgent":{"endpoint":"k8s://flyteagent.flyte:8000","insecure":true},"supportedTaskTypes":[]}` | Agent service configuration for propeller. |
| flyteagent.plugin_config.plugins.agent-service.defaultAgent | object | `{"endpoint":"k8s://flyteagent.flyte:8000","insecure":true}` | The default agent service to use for plugin tasks. |
| flyteagent.plugin_config.plugins.agent-service | object | `{"agentForTaskTypes":[{"noop_task":"custom_agent"}],"agents":{"custom_agent":{"endpoint":"k8s://flyte_custom_agent.flyte:8000","insecure":true}},"defaultAgent":{"defaultTimeout":"10s","endpoint":"k8s://flyteagent.flyte:8000","insecure":true,"timeouts":{"CreateTask":"10s","DeleteTask":"10s","ExecuteTaskSync":"10s","GetTask":"10s"}},"supportedTaskTypes":[]}` | Agent service configuration for propeller. |
| flyteagent.plugin_config.plugins.agent-service.defaultAgent | object | `{"defaultTimeout":"10s","endpoint":"k8s://flyteagent.flyte:8000","insecure":true,"timeouts":{"CreateTask":"10s","DeleteTask":"10s","ExecuteTaskSync":"10s","GetTask":"10s"}}` | The default agent service to use for plugin tasks. |
| flyteagent.plugin_config.plugins.agent-service.defaultAgent.endpoint | string | `"k8s://flyteagent.flyte:8000"` | The agent service endpoint propeller should connect to. |
| flyteagent.plugin_config.plugins.agent-service.defaultAgent.insecure | bool | `true` | Whether the connection from propeller to the agent service should use TLS. |
| flyteagent.plugin_config.plugins.agent-service.defaultAgent.timeouts | object | `{"CreateTask":"10s","DeleteTask":"10s","ExecuteTaskSync":"10s","GetTask":"10s"}` | Timeouts for each operation, if not specified, the default timeout will be used. |
| flyteagent.plugin_config.plugins.agent-service.supportedTaskTypes | list | `[]` | The task types supported by the default agent. As of #5460 these are discovered automatically and don't need to be configured. |
| flyteagent.podLabels | object | `{}` | Labels for flyteagent pods |
| flyteconsole.affinity | object | `{}` | affinity for Flyteconsole deployment |
Expand Down
13 changes: 13 additions & 0 deletions charts/flyte-core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,22 @@ flyteagent:
endpoint: "k8s://flyteagent.flyte:8000"
# -- Whether the connection from propeller to the agent service should use TLS.
insecure: true
# -- Timeouts for each operation, if not specified, the default timeout will be used.
timeouts:
CreateTask: 10s
GetTask: 10s
DeleteTask: 10s
ExecuteTaskSync: 10s
defaultTimeout: 10s
# -- The task types supported by the default agent. As of #5460 these are discovered automatically and don't
# need to be configured.
supportedTaskTypes: []
agents:
custom_agent:
endpoint: "k8s://flyte_custom_agent.flyte:8000"
insecure: true
agentForTaskTypes:
- noop_task: custom_agent
# -- Labels for flyteagent pods
podLabels: {}

Expand Down
16 changes: 13 additions & 3 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,22 @@ data:
cloudwatch-enabled: false
stackdriver-enabled: false
agent-service:
agentForTaskTypes:
- noop_task: custom_agent
agents:
custom_agent:
endpoint: k8s://flyte_custom_agent.flyte:8000
insecure: true
defaultAgent:
defaultTimeout: 10s
endpoint: k8s://flyteagent.flyte:8000
insecure: true
timeouts:
CreateTask: 10s
DeleteTask: 10s
ExecuteTaskSync: 10s
GetTask: 10s
supportedTaskTypes: []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty supported task types array

Consider adding task types to the supportedTaskTypes array as an empty array may prevent the agent from handling any tasks. You might want to specify the task types this agent should support.

Code suggestion
Check the AI-generated fix before applying
Suggested change
supportedTaskTypes: []
supportedTaskTypes: ["noop_task"]

Code Review Run #5f1d05


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

002-database.yaml: |
database:
postgres:
Expand Down Expand Up @@ -821,7 +831,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: NmdFQmhIcGQ3QUY4anJ4OQ==
haSharedSecret: ZmFXUEpMSXBvam1RQ1dBbQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1252,7 +1262,7 @@ spec:
metadata:
annotations:
checksum/cluster-resource-templates: 6fd9b172465e3089fcc59f738b92b8dc4d8939360c19de8ee65f68b0e7422035
checksum/configuration: 17020fb9c16e349ea5d5d5d0c641e0c7f41ddee4c834b93bffd8f58006fb88be
checksum/configuration: e41f2cbe2b01f18ce3a3f23b49425eaf8188bdca4d5ecd1ba28c13c918a84d5b
checksum/configuration-secret: 09216ffaa3d29e14f88b1f30af580d02a2a5e014de4d750b7f275cc07ed4e914
labels:
app.kubernetes.io/component: flyte-binary
Expand Down Expand Up @@ -1418,7 +1428,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: e70b19a9c6f4e7c05fff1fb0b2adc885112a99eab0fc2a893762513e45e1a230
checksum/secret: 319fe49bb415a6be154ae6343804d89c771ea558293eb25425657b0e7e52a53c
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: TG9qSkFYNDBjc3JJakxZYw==
haSharedSecret: WGloTDYwdld5dVlvQWtaUQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1367,7 +1367,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: a6fd0b4e81971aff50f056b2beddcb3b0eb480659bcea29f287a9773123ede6c
checksum/secret: 8e14a40490a344ef0a144464cb70abc37dbbf4e6b4ab8071a14cc687fa2e543d
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: Q2dOYmdSM0FNbnJSUE9qcA==
haSharedSecret: YjRBRUlCdGhyUHN5ZkFrVw==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -934,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 2fd78377e09dbed8a7a620d718063e8bb1478d7c233bec3b5ebc32bcc255c0d4
checksum/secret: 24e50244c5c97e1586a92919258d3c94da6c5e82a05972e4fa07881296f24209
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
18 changes: 18 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ var (
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
DefaultServiceConfig: `{"loadBalancingConfig": [{"round_robin":{}}]}`,
},
AgentDeployments: map[string]*Deployment{
"agent_1": {
Endpoint: "",
Insecure: true,
DefaultTimeout: config.Duration{Duration: 300 * time.Second},
Timeouts: map[string]config.Duration{
"ExecuteTaskSync": {
Duration: 300 * time.Second,
},
"GetTask": {
Duration: 100 * time.Second,
},
},
},
},
AgentForTaskTypes: map[string]string{
"task_type_1": "agent_1",
},
// AsyncPlugin should be registered to at least one task type.
// Reference: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/pluginmachinery/registry.go#L27
SupportedTaskTypes: []string{"task_type_1", "task_type_2"},
Expand Down
87 changes: 85 additions & 2 deletions flyteplugins/go/tasks/plugins/webapi/agent/config_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package agent

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

agentMocks "github.com/flyteorg/flyte/flyteidl/clients/go/admin/mocks"
"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
flyteIdlCore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
pluginCoreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
ioMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

func TestGetAndSetConfig(t *testing.T) {
Expand Down Expand Up @@ -53,9 +65,80 @@ func TestDefaultAgentConfig(t *testing.T) {
expectedTaskTypes := []string{"task_type_1", "task_type_2"}
assert.Equal(t, expectedTaskTypes, cfg.SupportedTaskTypes)

assert.Empty(t, cfg.AgentDeployments)
assert.Contains(t, cfg.AgentDeployments, "agent_1")

assert.Empty(t, cfg.AgentForTaskTypes)
agent1 := cfg.AgentDeployments["agent_1"]
assert.Equal(t, "", agent1.Endpoint)
assert.True(t, agent1.Insecure)
assert.Equal(t, 300*time.Second, agent1.DefaultTimeout.Duration)
assert.Equal(t, 300*time.Second, agent1.Timeouts["ExecuteTaskSync"].Duration)
assert.Equal(t, 100*time.Second, agent1.Timeouts["GetTask"].Duration)

assert.Equal(t, "agent_1", cfg.AgentForTaskTypes["task_type_1"])

assert.Equal(t, 10*time.Second, cfg.PollInterval.Duration)
}

func TestTaskWithExpectedAgent(t *testing.T) {
// Set up the base configuration
cfg := defaultConfig
cfg.AgentDeployments["agent_1"].Endpoint = defaultAgentEndpoint
err := SetConfig(&cfg)
assert.NoError(t, err)

// Create mock agent client agent_1
agent1Client := new(agentMocks.AsyncAgentServiceClient)
agent1Client.On("CreateTask", mock.Anything, mock.Anything).Return(&admin.CreateTaskResponse{}, nil)

// Create a plugin instance using our mock clients
agentPlugin := webapi.PluginEntry{
ID: "agent",
SupportedTaskTypes: []string{"task_type_1"},
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
return &Plugin{
metricScope: iCtx.MetricsScope(),
cfg: &cfg,
cs: &ClientSet{
asyncAgentClients: map[string]service.AsyncAgentServiceClient{
defaultAgentEndpoint: agent1Client,
},
},
registry: Registry{
"task_type_1": {defaultTaskTypeVersion: {AgentDeployment: cfg.AgentDeployments["agent_1"], IsSync: false}},
},
}, nil
},
}

// Create task template and input
template := flyteIdlCore.TaskTemplate{
Type: "task_type_1", // Should route to agent_1
Target: &flyteIdlCore.TaskTemplate_Container{},
}

pluginEntry := pluginmachinery.CreateRemotePlugin(agentPlugin)
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test"))
assert.NoError(t, err)

inputs, err := coreutils.MakeLiteralMap(map[string]interface{}{"input": "test"})
assert.NoError(t, err)

// Run task type 1 (should use agent_1)
tCtx := getTaskContext(t)
tr := &pluginCoreMocks.TaskReader{}
tr.OnRead(context.Background()).Return(&template, nil)
tCtx.OnTaskReader().Return(tr)
inputReader := &ioMocks.InputReader{}
inputReader.OnGetMatch(mock.Anything).Return(inputs, nil)
inputReader.OnGetInputPath().Return(storage.DataReference("fake://input/path"))
inputReader.OnGetInputPrefixPath().Return(storage.DataReference("fake://input/prefix"))
tCtx.OnInputReader().Return(inputReader)

result, err := plugin.Handle(context.Background(), tCtx)
assert.NoError(t, err)
assert.NotNil(t, result)

// Verify agent_1 was called (failed)
// agent1Client.AssertCalled(t, "CreateTask", mock.Anything, mock.Anything)
Comment on lines +141 to +142
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider enabling mock assertion check

The commented out assertion agent1Client.AssertCalled(t, "CreateTask", mock.Anything, mock.Anything) should be uncommented since it verifies that the agent client was called correctly. Without this assertion, the test is not fully validating the expected behavior.

Code suggestion
Check the AI-generated fix before applying
Suggested change
// Verify agent_1 was called (failed)
// agent1Client.AssertCalled(t, "CreateTask", mock.Anything, mock.Anything)
agent1Client.AssertCalled(t, "CreateTask", mock.Anything, mock.Anything)

Code Review Run #5f1d05


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


}
Loading
Loading