diff --git a/lib/cache/cert_authority.go b/lib/cache/cert_authority.go index 948b434175311..39e0ecac1b66f 100644 --- a/lib/cache/cert_authority.go +++ b/lib/cache/cert_authority.go @@ -55,7 +55,7 @@ func newCertAuthorityCollection(t services.Trust, w types.WatchKind) (*collectio }, } }, - getAll: func(ctx context.Context, loadSecrets bool) ([]types.CertAuthority, error) { + fetcher: func(ctx context.Context, loadSecrets bool) ([]types.CertAuthority, error) { var authorities []types.CertAuthority for _, caType := range types.CertAuthTypes { cas, err := t.GetCertAuthorities(ctx, caType, loadSecrets) diff --git a/lib/cache/collection.go b/lib/cache/collection.go index 74d1b3dbc1094..6a9a44f981bf4 100644 --- a/lib/cache/collection.go +++ b/lib/cache/collection.go @@ -27,18 +27,40 @@ import ( // collection is responsible for managing a cached resource. type collection[T any] struct { - getAll func(ctx context.Context, loadSecrets bool) ([]T, error) - store *store[T] - watch types.WatchKind + // fetcher is called by fetch to retrieve and seed the + // store with all known resources from upstream. + fetcher func(ctx context.Context, loadSecrets bool) ([]T, error) + // store persits all resources in memory. + store *store[T] + // watch contains the kind of + watch types.WatchKind + // headerTransform is used when handling delete events in [onDelete]. Since + // [types.OpDelete] events only contain information about the resource key, + // most event handlers only emit a [types.ResourceHeader] which has enough + // information to identify a resource. Some resources do emit a half + // populated [T], or have enough information from the key to emit a full [T]. + // + // If this optional transformation is supplied it will be called when + // processing delete events before attempting to delete the resource + // from the store. headerTransform func(hdr *types.ResourceHeader) T - filter func(T) bool - singleton bool + // filter is an optional function used to prevent some resources + // from being persisted in the store. + filter func(T) bool + // singleton indicates if the resource should only ever have a single item. + singleton bool } func (c collection[_]) watchKind() types.WatchKind { return c.watch } +// onDelete attempts to remove the provided resource from the store. +// An error is returned if the resource is of an unexpected type, or +// the resource is a [types.ResourceHeader] and no headerTransform was +// specified. +// +// This is a no-op if the configured filter does not return true. func (c *collection[T]) onDelete(r types.Resource) error { switch t := r.(type) { case types.Resource153Unwrapper: @@ -47,6 +69,9 @@ func (c *collection[T]) onDelete(r types.Resource) error { if !ok { return trace.BadParameter("unexpected wrapped type %T (expected %v)", unwrapped, reflect.TypeOf((*T)(nil)).Elem()) } + if c.filter != nil && !c.filter(tt) { + return nil + } return trace.Wrap(c.store.delete(tt)) case *types.ResourceHeader: @@ -54,14 +79,27 @@ func (c *collection[T]) onDelete(r types.Resource) error { return trace.BadParameter("unable to convert types.ResourceHeader to %v (no transform specified, this is a bug)", reflect.TypeOf((*T)(nil)).Elem()) } - return trace.Wrap(c.store.delete(c.headerTransform(t))) + tt := c.headerTransform(t) + if c.filter != nil && !c.filter(tt) { + return nil + } + + return trace.Wrap(c.store.delete(tt)) case T: + if c.filter != nil && !c.filter(t) { + return nil + } + return trace.Wrap(c.store.delete(t)) default: return trace.BadParameter("unexpected type %T (expected %v)", r, reflect.TypeOf((*T)(nil)).Elem()) } } +// onUpdate attempts to place the resource into the local store. +// An error is returned if the resource is of an unexpected type +// +// This is a no-op if the configured filter does not return true. func (c *collection[T]) onUpdate(r types.Resource) error { switch t := r.(type) { case types.Resource153Unwrapper: @@ -89,13 +127,14 @@ func (c *collection[T]) onUpdate(r types.Resource) error { } } +// fetch populates the store with items received by the configured fetcher. func (c collection[T]) fetch(ctx context.Context, cacheOK bool) (apply func(context.Context) error, err error) { // Singleton objects will only get deleted or updated, not both deleteSingleton := false var resources []T if cacheOK { - resources, err = c.getAll(ctx, c.watch.LoadSecrets) + resources, err = c.fetcher(ctx, c.watch.LoadSecrets) if err != nil { if !trace.IsNotFound(err) { return nil, trace.Wrap(err) @@ -129,67 +168,3 @@ func (c collection[T]) fetch(ctx context.Context, cacheOK bool) (apply func(cont return nil }, nil } - -type resourceHandler interface { - // fetch fetches resources and returns a function which will apply said resources to the cache. - // fetch *must* not mutate cache state outside of the apply function. - // The provided cacheOK flag indicates whether this collection will be included in the cache generation that is - // being prepared. If cacheOK is false, fetch shouldn't fetch any resources, but the apply function that it - // returns must still delete resources from the backend. - fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error) - // onDelete will delete a single target resource from the cache. For - // singletons, this is usually an alias to clear. - onDelete(t types.Resource) error - // onUpdate will update a single target resource from the cache - onUpdate(t types.Resource) error - // watchKind returns a watch - // required for this collection - watchKind() types.WatchKind -} - -type collections struct { - byKind map[resourceKind]resourceHandler - - staticTokens *collection[types.StaticTokens] - certAuthorities *collection[types.CertAuthority] - users *collection[types.User] -} - -func setupCollections(c Config, watches []types.WatchKind) (*collections, error) { - out := &collections{ - byKind: make(map[resourceKind]resourceHandler, 1), - } - - for _, watch := range watches { - resourceKind := resourceKindFromWatchKind(watch) - - switch watch.Kind { - case types.KindStaticTokens: - collect, err := newStaticTokensCollection(c.ClusterConfig, watch) - if err != nil { - return nil, trace.Wrap(err) - } - - out.staticTokens = collect - out.byKind[resourceKind] = out.staticTokens - case types.KindCertAuthority: - collect, err := newCertAuthorityCollection(c.Trust, watch) - if err != nil { - return nil, trace.Wrap(err) - } - - out.certAuthorities = collect - out.byKind[resourceKind] = out.certAuthorities - case types.KindUser: - collect, err := newUserCollection(c.Users, watch) - if err != nil { - return nil, trace.Wrap(err) - } - - out.users = collect - out.byKind[resourceKind] = out.users - } - } - - return out, nil -} diff --git a/lib/cache/collections.go b/lib/cache/collections.go new file mode 100644 index 0000000000000..63e5ab2b806ab --- /dev/null +++ b/lib/cache/collections.go @@ -0,0 +1,96 @@ +// Teleport +// Copyright (C) 2025 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cache + +import ( + "context" + + "github.com/gravitational/trace" + + "github.com/gravitational/teleport/api/types" +) + +// collectionHandler is used by the [Cache] to seed the initial +// data and process events for a particular resource. +type collectionHandler interface { + // fetch fetches resources and returns a function which will apply said resources to the cache. + // fetch *must* not mutate cache state outside of the apply function. + // The provided cacheOK flag indicates whether this collection will be included in the cache generation that is + // being prepared. If cacheOK is false, fetch shouldn't fetch any resources, but the apply function that it + // returns must still delete resources from the backend. + fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error) + // onDelete will delete a single target resource from the cache. For + // singletons, this is usually an alias to clear. + onDelete(t types.Resource) error + // onUpdate will update a single target resource from the cache + onUpdate(t types.Resource) error + // watchKind returns a watch + // required for this collection + watchKind() types.WatchKind +} + +// collections is the group of resource [collection]s +// that the [Cache] supports. +type collections struct { + byKind map[resourceKind]collectionHandler + + staticTokens *collection[types.StaticTokens] + certAuthorities *collection[types.CertAuthority] + users *collection[types.User] +} + +// setupCollections ensures that the appropriate [collection] is +// initialized for all provided [types.WatcKind]s. An error is +// returned if a [types.WatchKind] has no associated [collection]. +func setupCollections(c Config, watches []types.WatchKind) (*collections, error) { + out := &collections{ + byKind: make(map[resourceKind]collectionHandler, 1), + } + + for _, watch := range watches { + resourceKind := resourceKindFromWatchKind(watch) + + switch watch.Kind { + case types.KindStaticTokens: + collect, err := newStaticTokensCollection(c.ClusterConfig, watch) + if err != nil { + return nil, trace.Wrap(err) + } + + out.staticTokens = collect + out.byKind[resourceKind] = out.staticTokens + case types.KindCertAuthority: + collect, err := newCertAuthorityCollection(c.Trust, watch) + if err != nil { + return nil, trace.Wrap(err) + } + + out.certAuthorities = collect + out.byKind[resourceKind] = out.certAuthorities + case types.KindUser: + collect, err := newUserCollection(c.Users, watch) + if err != nil { + return nil, trace.Wrap(err) + } + + out.users = collect + out.byKind[resourceKind] = out.users + } + } + + return out, nil +} diff --git a/lib/cache/doc.go b/lib/cache/doc.go index fb8c8c78837e1..614b8176d27f0 100644 --- a/lib/cache/doc.go +++ b/lib/cache/doc.go @@ -25,10 +25,4 @@ // This approach allows cache to be up to date without // time based expiration and avoid re-fetching all // resources reducing bandwidth. -// -// There are two types of cache backends used: -// -// * SQLite-based in-memory used for auth nodes -// * SQLite-based on disk persistent cache for nodes and proxies -// providing resilliency in the face of auth servers failures. package cache diff --git a/lib/cache/static_tokens.go b/lib/cache/static_tokens.go index 64bef470bba57..f43145b27e8e4 100644 --- a/lib/cache/static_tokens.go +++ b/lib/cache/static_tokens.go @@ -36,7 +36,7 @@ func newStaticTokensCollection(c services.ClusterConfiguration, w types.WatchKin return u.GetName() }, }), - getAll: func(ctx context.Context, loadSecrets bool) ([]types.StaticTokens, error) { + fetcher: func(ctx context.Context, loadSecrets bool) ([]types.StaticTokens, error) { token, err := c.GetStaticTokens() if err != nil { return nil, trace.Wrap(err) diff --git a/lib/cache/users.go b/lib/cache/users.go index 9302e290bb7dd..56c5acf4ce87b 100644 --- a/lib/cache/users.go +++ b/lib/cache/users.go @@ -40,7 +40,7 @@ func newUserCollection(u services.UsersService, w types.WatchKind) (*collection[ return u.GetName() }, }), - getAll: func(ctx context.Context, loadSecrets bool) ([]types.User, error) { + fetcher: func(ctx context.Context, loadSecrets bool) ([]types.User, error) { return u.GetUsers(ctx, loadSecrets) }, headerTransform: func(hdr *types.ResourceHeader) types.User {