Skip to content

Commit

Permalink
enhance: split workflow and chat runs for better user experience
Browse files Browse the repository at this point in the history
Prior to this change, it was possible for workflow-based runs to keep
chat-based runs for being processed by the controller. This change will
split the workflow and chat-based runs into different worker queues to
avoid this backup.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Feb 11, 2025
1 parent d21a00d commit 351d29e
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de
github.com/mhale/smtpd v0.8.3
github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d
github.com/obot-platform/nah v0.0.0-20250207012945-3b7c581712f6
github.com/obot-platform/nah v0.0.0-20250210200356-4c7b8ce27778
github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2
github.com/obot-platform/obot/apiclient v0.0.0-00010101000000-000000000000
github.com/obot-platform/obot/logger v0.0.0-20241217130503-4004a5c69f32
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d h1:GzMvRkssr4jAa2YvQiv9eXhjuNpaZVab3GajE7+cQ3s=
github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d/go.mod h1:RzrH0geIlbiTHDGZ8bpCk5k1hwdU9uu3l4zJn9n0pZU=
github.com/obot-platform/nah v0.0.0-20250207012945-3b7c581712f6 h1:rHIf46CC4pvG4yad9fMaFBTlewXqu3RWOj+epso56xw=
github.com/obot-platform/nah v0.0.0-20250207012945-3b7c581712f6/go.mod h1:KG1jLO9FeYvCPGI0iDqe5oqDqOFLd3/dt/iwuMianmI=
github.com/obot-platform/nah v0.0.0-20250210200356-4c7b8ce27778 h1:7YA4E2AwqdhKII979r85sdCIDdVg2AJXpE5Qu2kfWdU=
github.com/obot-platform/nah v0.0.0-20250210200356-4c7b8ce27778/go.mod h1:KG1jLO9FeYvCPGI0iDqe5oqDqOFLd3/dt/iwuMianmI=
github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2 h1:jiyBM/TYxU6UNVS9ff8Y8n55DOKDYohKkIZjfHpjfTY=
github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2/go.mod h1:isbKX6EfvvG/ojjFB2ZLyz27+2xoG3yRmpTSE+ytWEs=
github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77 h1:3bMMZ1f+GPXFQ1uNaYbO/uECWvSfqEA+ZEXn1rFAT88=
Expand Down
1 change: 1 addition & 0 deletions pkg/api/handlers/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (i *InvokeHandler) Invoke(req api.Context) error {

if agent.Name != "" {
resp, err = i.invoker.Agent(req.Context(), req.Storage, &agent, string(input), invoke.Options{
GenerateName: system.ChatRunPrefix,
ThreadName: threadID,
Synchronous: synchronous,
CreateThread: true,
Expand Down
10 changes: 9 additions & 1 deletion pkg/invoke/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type Options struct {
CreateThread bool
ThreadCredentialScope *bool
UserUID string
GenerateName string
}

func (i *Invoker) getChatState(ctx context.Context, c kclient.Client, run *v1.Run) (result, lastThreadName string, _ error) {
Expand Down Expand Up @@ -362,6 +363,7 @@ func (i *Invoker) Agent(ctx context.Context, c kclient.WithWatch, agent *v1.Agen
WorkflowExecutionName: opt.WorkflowExecutionName,
PreviousRunName: opt.PreviousRunName,
ForceNoResume: opt.ForceNoResume,
GenerateName: opt.GenerateName,
})
}

Expand All @@ -374,6 +376,7 @@ func unAbortThread(ctx context.Context, c kclient.Client, thread *v1.Thread) err
}

type runOptions struct {
GenerateName string
AgentName string
Synchronous bool
WorkflowName string
Expand Down Expand Up @@ -407,9 +410,14 @@ func (i *Invoker) createRun(ctx context.Context, c kclient.WithWatch, thread *v1
return nil, err
}

generateName := opts.GenerateName
if generateName == "" {
generateName = system.RunPrefix
}

run := v1.Run{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.RunPrefix,
GenerateName: generateName,
Namespace: thread.Namespace,
Finalizers: []string{v1.RunFinalizer},
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/services/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/obot-platform/nah/pkg/apply"
"github.com/obot-platform/nah/pkg/leader"
"github.com/obot-platform/nah/pkg/router"
"github.com/obot-platform/nah/pkg/runtime"
"github.com/obot-platform/obot/pkg/api/authn"
"github.com/obot-platform/obot/pkg/api/authz"
"github.com/obot-platform/obot/pkg/api/server"
Expand Down Expand Up @@ -287,6 +288,9 @@ func New(ctx context.Context, config Config) (*Services, error) {
GVKThreadiness: map[schema.GroupVersionKind]int{
v1.SchemeGroupVersion.WithKind("KnowledgeFile"): config.KnowledgeFileWorkers,
},
GVKQueueSplitters: map[schema.GroupVersionKind]runtime.WorkerQueueSplitter{
v1.SchemeGroupVersion.WithKind("Run"): (*runQueueSplitter)(nil),
},
})
if err != nil {
return nil, err
Expand Down
21 changes: 21 additions & 0 deletions pkg/services/queuesplitters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package services

import (
"github.com/obot-platform/nah/pkg/runtime"
"github.com/obot-platform/obot/pkg/system"
)

type runQueueSplitter struct{}

func (*runQueueSplitter) Split(key string) int {
_, name := runtime.KeyParse(key)
if system.IsChatRunID(name) {
return 1
}

return 0
}

func (*runQueueSplitter) Queues() int {
return 2
}
5 changes: 5 additions & 0 deletions pkg/system/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
ThreadPrefix = "t1"
AgentPrefix = "a1"
RunPrefix = "r1"
ChatRunPrefix = "r1chat"
WorkflowPrefix = "w1"
WorkflowExecutionPrefix = "we1"
WorkflowStepPrefix = "ws1"
Expand Down Expand Up @@ -51,3 +52,7 @@ func IsWorkflowID(id string) bool {
func IsEmailReceiverID(id string) bool {
return strings.HasPrefix(id, EmailReceiverPrefix)
}

func IsChatRunID(id string) bool {
return strings.HasPrefix(id, ChatRunPrefix)
}

0 comments on commit 351d29e

Please sign in to comment.