Skip to content

Commit 0f2df38

Browse files
committed
feat: mark finished runs as inactive
Inactive runs will not be processed by the controller and will be removed from its cache. These runs will still be accessible from the API. This logic is implemented such that other objects can be marked as inactive and the controller will similarly ignore them. Signed-off-by: Donnie Adams <[email protected]>
1 parent 6385f18 commit 0f2df38

File tree

11 files changed

+143
-26
lines changed

11 files changed

+143
-26
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ require (
2323
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de
2424
github.com/mhale/smtpd v0.8.3
2525
github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d
26-
github.com/obot-platform/nah v0.0.0-20250227230402-c8ac47e1add7
26+
github.com/obot-platform/nah v0.0.0-20250305151239-35a77e89a2eb
2727
github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2
2828
github.com/obot-platform/obot/apiclient v0.0.0-00010101000000-000000000000
2929
github.com/obot-platform/obot/logger v0.0.0-20241217130503-4004a5c69f32

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,8 @@ github.com/oasdiff/yaml3 v0.0.0-20241210130736-a94c01f36349 h1:t05Ww3DxZutOqbMN+
460460
github.com/oasdiff/yaml3 v0.0.0-20241210130736-a94c01f36349/go.mod h1:y5+oSEHCPT/DGrS++Wc/479ERge0zTFxaF8PbGKcg2o=
461461
github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d h1:GzMvRkssr4jAa2YvQiv9eXhjuNpaZVab3GajE7+cQ3s=
462462
github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d/go.mod h1:RzrH0geIlbiTHDGZ8bpCk5k1hwdU9uu3l4zJn9n0pZU=
463-
github.com/obot-platform/nah v0.0.0-20250227230402-c8ac47e1add7 h1:IuG8TZqqxz5P1ZHj2aCxEEn6LGgHXeUVhkG/ELh2a8U=
464-
github.com/obot-platform/nah v0.0.0-20250227230402-c8ac47e1add7/go.mod h1:KG1jLO9FeYvCPGI0iDqe5oqDqOFLd3/dt/iwuMianmI=
463+
github.com/obot-platform/nah v0.0.0-20250305151239-35a77e89a2eb h1:Cgf8prbkB2gwFAI2vsXRN1U7enKkFMYJjaUT+uNR7dc=
464+
github.com/obot-platform/nah v0.0.0-20250305151239-35a77e89a2eb/go.mod h1:be9DBO6pFx4yiKs9bm4GcuZgGX4OVpeRJclA9ojxzmo=
465465
github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2 h1:jiyBM/TYxU6UNVS9ff8Y8n55DOKDYohKkIZjfHpjfTY=
466466
github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2/go.mod h1:isbKX6EfvvG/ojjFB2ZLyz27+2xoG3yRmpTSE+ytWEs=
467467
github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77 h1:3bMMZ1f+GPXFQ1uNaYbO/uECWvSfqEA+ZEXn1rFAT88=

pkg/api/handlers/threads.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (a *ThreadHandler) Update(req api.Context) error {
158158
return err
159159
}
160160

161-
// Don't allow update of tools here, do it with the /tools endpoing
161+
// Don't allow update of tools here, do it with the /tools endpoint
162162
newThread.Tools = existing.Spec.Manifest.Tools
163163

164164
existing.Spec.Manifest = newThread
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package inactive
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/obot-platform/nah/pkg/backend"
8+
kclient "sigs.k8s.io/controller-runtime/pkg/client"
9+
)
10+
11+
func RemoveFromCache(ctx context.Context, backend backend.Backend, obj kclient.Object) error {
12+
gvk := obj.GetObjectKind().GroupVersionKind()
13+
if gvk.Kind == "" {
14+
var err error
15+
gvk, err = backend.GVKForObject(obj, backend.Scheme())
16+
if err != nil {
17+
return fmt.Errorf("failed to get GVK for object: %w", err)
18+
}
19+
}
20+
21+
informer, err := backend.GetInformerForKind(ctx, gvk)
22+
if err != nil {
23+
return fmt.Errorf("failed to get informer for kind %s: %w", gvk.String(), err)
24+
}
25+
26+
return informer.GetStore().Delete(obj)
27+
}

pkg/controller/handlers/runs/runs.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"time"
66

77
"github.com/gptscript-ai/go-gptscript"
8+
"github.com/obot-platform/nah/pkg/backend"
89
"github.com/obot-platform/nah/pkg/router"
910
"github.com/obot-platform/nah/pkg/untriggered"
11+
"github.com/obot-platform/obot/pkg/controller/handlers/inactive"
1012
"github.com/obot-platform/obot/pkg/invoke"
1113
v1 "github.com/obot-platform/obot/pkg/storage/apis/obot.obot.ai/v1"
1214
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -16,10 +18,14 @@ import (
1618

1719
type Handler struct {
1820
invoker *invoke.Invoker
21+
backend backend.Backend
1922
}
2023

21-
func New(invoker *invoke.Invoker) *Handler {
22-
return &Handler{invoker: invoker}
24+
func New(invoker *invoke.Invoker, backend backend.Backend) *Handler {
25+
return &Handler{
26+
invoker: invoker,
27+
backend: backend,
28+
}
2329
}
2430

2531
func (*Handler) DeleteRunState(req router.Request, _ router.Response) error {
@@ -79,3 +85,17 @@ func (h *Handler) DeleteFinished(req router.Request, _ router.Response) error {
7985
}
8086
return nil
8187
}
88+
89+
func (h *Handler) MarkInactive(req router.Request, _ router.Response) error {
90+
run := req.Object.(*v1.Run)
91+
if run.DeletionTimestamp.IsZero() && run.Status.State == gptscript.Continue && run.Labels[v1.LabelInactive] != "true" {
92+
v1.SetInactive(run)
93+
if err := req.Client.Update(req.Ctx, run); err != nil {
94+
return err
95+
}
96+
97+
return inactive.RemoveFromCache(req.Ctx, h.backend, run)
98+
}
99+
100+
return nil
101+
}

pkg/controller/handlers/threads/threads.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88

99
"github.com/gptscript-ai/go-gptscript"
1010
"github.com/obot-platform/nah/pkg/router"
11+
"github.com/obot-platform/nah/pkg/untriggered"
1112
v1 "github.com/obot-platform/obot/pkg/storage/apis/obot.obot.ai/v1"
1213
"github.com/obot-platform/obot/pkg/system"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/fields"
1416
kclient "sigs.k8s.io/controller-runtime/pkg/client"
1517
)
1618

@@ -288,3 +290,25 @@ func (t *Handler) CleanupEphemeralThreads(req router.Request, _ router.Response)
288290

289291
return kclient.IgnoreNotFound(req.Delete(thread))
290292
}
293+
294+
func (t *Handler) ActivateRuns(req router.Request, _ router.Response) error {
295+
var runs v1.RunList
296+
// This must be uncached since inactive things aren't in the cache.
297+
if err := req.List(untriggered.UncachedList(&runs), &kclient.ListOptions{
298+
Namespace: req.Namespace,
299+
FieldSelector: fields.SelectorFromSet(map[string]string{"spec.threadName": req.Object.GetName()}),
300+
}); err != nil {
301+
return fmt.Errorf("failed to list runs for thread %s: %w", req.Object.GetName(), err)
302+
}
303+
304+
for _, run := range runs.Items {
305+
if !v1.IsActive(&run) {
306+
v1.SetActive(&run)
307+
if err := req.Client.Update(req.Ctx, &run); err != nil {
308+
return fmt.Errorf("failed to update run %q to active: %w", run.Name, err)
309+
}
310+
}
311+
}
312+
313+
return nil
314+
}

pkg/controller/routes.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (c *Controller) setupRoutes() error {
4040
knowledgeset := knowledgeset.New(c.services.Invoker)
4141
knowledgesource := knowledgesource.NewHandler(c.services.Invoker, c.services.GPTClient)
4242
knowledgefile := knowledgefile.New(c.services.Invoker, c.services.GPTClient, c.services.KnowledgeSetIngestionLimit)
43-
runs := runs.New(c.services.Invoker)
43+
runs := runs.New(c.services.Invoker, c.services.Router.Backend())
4444
webHooks := webhook.New()
4545
cronJobs := cronjob.New()
4646
oauthLogins := oauthapp.NewLogin(c.services.Invoker, c.services.ServerURL)
@@ -55,6 +55,8 @@ func (c *Controller) setupRoutes() error {
5555
root.Type(&v1.Run{}).HandlerFunc(cleanup.Cleanup)
5656
root.Type(&v1.Run{}).HandlerFunc(runs.Resume)
5757
root.Type(&v1.Run{}).HandlerFunc(workflow.GetTaskResult)
58+
// This handler should be the last one so that deleting Runs will not be marked as inactive.
59+
root.Type(&v1.Run{}).HandlerFunc(runs.MarkInactive)
5860

5961
// Threads
6062
root.Type(&v1.Thread{}).HandlerFunc(cleanup.Cleanup)
@@ -66,6 +68,7 @@ func (c *Controller) setupRoutes() error {
6668
root.Type(&v1.Thread{}).HandlerFunc(threads.CleanupEphemeralThreads)
6769
root.Type(&v1.Thread{}).HandlerFunc(threads.SetCreated)
6870
root.Type(&v1.Thread{}).FinalizeFunc(v1.ThreadFinalizer, credentialCleanup.Remove)
71+
root.Type(&v1.Thread{}).FinalizeFunc(v1.ThreadFinalizer+"-child-cleanup", threads.ActivateRuns)
6972

7073
// KnowledgeSummary
7174
root.Type(&v1.KnowledgeSummary{}).HandlerFunc(cleanup.Cleanup)

pkg/services/config.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"log/slog"
88
"os"
99
"path/filepath"
10+
"reflect"
11+
"slices"
1012
"sort"
1113
"strings"
1214

@@ -18,6 +20,7 @@ import (
1820
"github.com/gptscript-ai/gptscript/pkg/sdkserver"
1921
"github.com/obot-platform/nah"
2022
"github.com/obot-platform/nah/pkg/apply"
23+
nfields "github.com/obot-platform/nah/pkg/fields"
2124
"github.com/obot-platform/nah/pkg/leader"
2225
"github.com/obot-platform/nah/pkg/router"
2326
"github.com/obot-platform/nah/pkg/runtime"
@@ -43,9 +46,12 @@ import (
4346
"github.com/obot-platform/obot/pkg/system"
4447
coordinationv1 "k8s.io/api/coordination/v1"
4548
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
49+
"k8s.io/apimachinery/pkg/fields"
4650
"k8s.io/apimachinery/pkg/runtime/schema"
4751
"k8s.io/apiserver/pkg/authentication/authenticator"
4852
"k8s.io/apiserver/pkg/authentication/request/union"
53+
kcache "sigs.k8s.io/controller-runtime/pkg/cache"
54+
kclient "sigs.k8s.io/controller-runtime/pkg/client"
4955

5056
// Setup nah logging
5157
_ "github.com/obot-platform/nah/pkg/logrus"
@@ -76,6 +82,7 @@ type Config struct {
7682
AuthAdminEmails []string `usage:"Emails of admin users"`
7783
AgentsDir string `usage:"The directory to auto load agents on start (default $XDG_CONFIG_HOME/.obot/agents)"`
7884
StaticDir string `usage:"The directory to serve static files from"`
85+
IgnoreInactive bool `usage:"Ignore inactive objects" default:"false"`
7986

8087
// Sendgrid webhook
8188
SendgridWebhookUsername string `usage:"The username for the sendgrid webhook to authenticate with"`
@@ -279,17 +286,27 @@ func New(ctx context.Context, config Config) (*Services, error) {
279286
return nil, err
280287
}
281288
} else {
282-
var notFound gptscript.ErrNotFound
283-
if err := c.DeleteCredential(ctx, system.DefaultNamespace, system.KnowledgeCredID); err != nil && !errors.As(err, &notFound) {
289+
if err := c.DeleteCredential(ctx, system.DefaultNamespace, system.KnowledgeCredID); err != nil && !errors.As(err, &gptscript.ErrNotFound{}) {
284290
return nil, err
285291
}
286292
}
287293

294+
byObjectFieldSelectors := make(map[kclient.Object]kcache.ByObject)
295+
if config.IgnoreInactive {
296+
for _, t := range scheme.Scheme.AllKnownTypes() {
297+
if v, ok := reflect.New(t).Interface().(nfields.Fields); ok && slices.Contains(v.FieldNames(), v1.LabelInactive) {
298+
// Only add the field selector requirement to objects that support the field selector.
299+
byObjectFieldSelectors[v.(kclient.Object)] = kcache.ByObject{Field: fields.OneTermEqualSelector(v1.LabelInactive, "")}
300+
}
301+
}
302+
}
303+
288304
r, err := nah.NewRouter("obot-controller", &nah.Options{
289-
DefaultRESTConfig: restConfig,
290-
Scheme: scheme.Scheme,
291-
ElectionConfig: leader.NewDefaultElectionConfig("", "obot-controller", restConfig),
292-
HealthzPort: -1,
305+
RESTConfig: restConfig,
306+
Scheme: scheme.Scheme,
307+
ByObject: byObjectFieldSelectors,
308+
ElectionConfig: leader.NewDefaultElectionConfig("", "obot-controller", restConfig),
309+
HealthzPort: -1,
293310
GVKThreadiness: map[schema.GroupVersionKind]int{
294311
v1.SchemeGroupVersion.WithKind("KnowledgeFile"): config.KnowledgeFileWorkers,
295312
},
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package v1
2+
3+
import kclient "sigs.k8s.io/controller-runtime/pkg/client"
4+
5+
const LabelInactive = "obot_inactive_object"
6+
7+
func SetInactive(o kclient.Object) {
8+
if o.GetLabels() == nil {
9+
o.SetLabels(make(map[string]string, 1))
10+
}
11+
o.GetLabels()[LabelInactive] = "true"
12+
}
13+
14+
func SetActive(o kclient.Object) {
15+
delete(o.GetLabels(), LabelInactive)
16+
}
17+
18+
func IsActive(o kclient.Object) bool {
19+
return o.GetLabels()[LabelInactive] != "true"
20+
}

pkg/storage/apis/obot.obot.ai/v1/run.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package v1
22

33
import (
44
gptscriptclient "github.com/gptscript-ai/go-gptscript"
5+
"github.com/obot-platform/nah/pkg/fields"
56
"github.com/obot-platform/obot/apiclient/types"
67
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
78
)
@@ -23,6 +24,11 @@ const (
2324
AuthProviderSyncAnnotation = "obot.ai/auth-provider-sync"
2425
)
2526

27+
var (
28+
_ fields.Fields = (*Run)(nil)
29+
_ DeleteRefs = (*Run)(nil)
30+
)
31+
2632
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
2733

2834
type Run struct {
@@ -40,6 +46,8 @@ func (in *Run) Has(field string) bool {
4046
func (in *Run) Get(field string) string {
4147
if in != nil {
4248
switch field {
49+
case LabelInactive:
50+
return in.Labels[LabelInactive]
4351
case "spec.threadName":
4452
return in.Spec.ThreadName
4553
case "spec.previousRunName":
@@ -51,7 +59,7 @@ func (in *Run) Get(field string) string {
5159
}
5260

5361
func (in *Run) FieldNames() []string {
54-
return []string{"spec.threadName", "spec.previousRunName"}
62+
return []string{"spec.threadName", "spec.previousRunName", LabelInactive}
5563
}
5664

5765
func (in *Run) GetColumns() [][]string {

0 commit comments

Comments
 (0)