Skip to content

Commit

Permalink
Add support for in memory caching
Browse files Browse the repository at this point in the history
Introduces an alternate to the legacyCollections and
genericCollection that relies on caching resource in memory. The
new collection[T] takes inspiration from the legacy collection types
but aims to both simplify and break reliance on using the local
storage services.

The two collection mechanisms will and can coexist until all types
are converted to the new mechanism. A single implementation of
the new collection exists for the StaticTokens.

Additionally, during the conversion process some refactoring will
be done to try to better organize and reduce the size of cache.go.
In this PR all of the code specific to static tokens was moved
out of cache.go and legacy_collections.go and into static_tokens.go.
This will allow separating concerns and making it much easier to
identify where specific content lives.
  • Loading branch information
rosstimothy committed Feb 18, 2025
1 parent eccea21 commit 3be09f7
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 30 deletions.
87 changes: 69 additions & 18 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ type Cache struct {
// legacyCacheCollections is a registry of resource legacyCollections
legacyCacheCollections *legacyCollections

// collections is a registry of resource collections.
collections *collections

// confirmedKinds is a map of kinds confirmed by the server to be included in the current generation
// by resource Kind/SubKind
confirmedKinds map[resourceKind]types.WatchKind
Expand Down Expand Up @@ -1041,6 +1044,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

collections, err := setupCollections(config, config.Watches)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

cs := &Cache{
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -1091,6 +1100,7 @@ func New(config Config) (*Cache, error) {
pluginStaticCredentialsCache: pluginStaticCredentialsCache,
gitServersCache: gitServersCache,
workloadIdentityCache: workloadIdentityCache,
collections: collections,
Logger: slog.With(
teleport.ComponentKey, config.Component,
"target", config.target,
Expand Down Expand Up @@ -1639,10 +1649,13 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
}

func (c *Cache) watchKinds() []types.WatchKind {
out := make([]types.WatchKind, 0, len(c.legacyCacheCollections.byKind))
out := make([]types.WatchKind, 0, len(c.legacyCacheCollections.byKind)+len(c.collections.byKind))
for _, collection := range c.legacyCacheCollections.byKind {
out = append(out, collection.watchKind())
}
for _, handler := range c.collections.byKind {
out = append(out, handler.watchKind())
}
return out
}

Expand Down Expand Up @@ -1724,7 +1737,7 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(fetchLimit(c.target))
applyfns := make([]applyFn, len(c.legacyCacheCollections.byKind))
applyfns := make([]applyFn, len(c.legacyCacheCollections.byKind)+len(c.collections.byKind))
i := 0
for kind, collection := range c.legacyCacheCollections.byKind {
kind, collection := kind, collection
Expand Down Expand Up @@ -1752,6 +1765,31 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types
})
}

for kind, handler := range c.collections.byKind {
ii := i
i++

g.Go(func() (err error) {
ctx, span := c.Tracer.Start(
ctx,
fmt.Sprintf("cache/fetch/%s", kind.String()),
oteltrace.WithAttributes(
attribute.String("target", c.target),
),
)
defer func() { apitracing.EndSpan(span, err) }()

_, cacheOK := confirmedKinds[resourceKind{kind: kind.kind, subkind: kind.subkind}]
applyfn, err := handler.fetch(ctx, cacheOK)
if err != nil {
return trace.Wrap(err, "failed to fetch resource: %q", kind)
}

applyfns[ii] = tracedApplyFn(fetchSpan, c.Tracer, kind, applyfn)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -1771,16 +1809,37 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types
// the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
resourceKind := resourceKindFromResource(event.Resource)
collection, ok := c.legacyCacheCollections.byKind[resourceKind]
if !ok {

legacyCollection, legacyFound := c.legacyCacheCollections.byKind[resourceKind]
handler, handlerFound := c.collections.byKind[resourceKind]

switch {
case !legacyFound && !handlerFound:
c.Logger.WarnContext(ctx, "Skipping unsupported event",
"event_kind", event.Resource.GetKind(),
"event_sub_kind", event.Resource.GetSubKind(),
)
return nil
}
if err := collection.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
case legacyFound:
if err := legacyCollection.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
case handlerFound:
switch event.Type {
case types.OpDelete:
if err := handler.onDelete(event.Resource); err != nil {
if !trace.IsNotFound(err) {
c.Logger.WarnContext(ctx, "Failed to delete resource", "error", err)
return trace.Wrap(err)
}
}
case types.OpPut:
if err := handler.onUpdate(event.Resource); err != nil {
return trace.Wrap(err)
}
default:
c.Logger.WarnContext(ctx, "Skipping unsupported event type", "event", event.Type)
}
}

c.eventsFanout.Emit(event)
Expand Down Expand Up @@ -1869,17 +1928,9 @@ func (c *Cache) GetCertAuthorities(ctx context.Context, caType types.CertAuthTyp
return rg.reader.GetCertAuthorities(ctx, caType, loadSigningKeys)
}

// GetStaticTokens gets the list of static tokens used to provision nodes.
func (c *Cache) GetStaticTokens() (types.StaticTokens, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetStaticTokens")
defer span.End()

rg, err := readCollectionCache(c, c.legacyCacheCollections.staticTokens)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetStaticTokens()
func (c *Cache) isConfirmedUndlerLock(wk types.WatchKind) bool {
_, ok := c.confirmedKinds[resourceKind{kind: wk.Kind, subkind: wk.SubKind}]
return ok
}

// GetTokens returns all active (non-expired) provisioning tokens
Expand Down
179 changes: 179 additions & 0 deletions lib/cache/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Teleport
// Copyright (C) 2025 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cache

import (
"context"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/types"
)

// collection is responsible for managing a resource in the cache.
type collection[T any, S store[T], U upstream[T]] struct {
upstream U
store S
watch types.WatchKind
singleton bool
}

// upstream is responsible for seeding the cache with resources from
// the auth server.
type upstream[T any] interface {
getAll(ctx context.Context, loadSecrets bool) ([]T, error)
}

// store persists the cached resources locally in memory.
type store[T any] interface {
// put will create or update a target resource in the cache.
put(t T) error
// delete removes a single entry from the cache.
delete(t T) error
// clear will delete all target resources of the type in the cache.
clear() error
}

type eventHandler interface {
// fetch fetches resources and returns a function which will apply said resources to the cache.
// fetch *must* not mutate cache state outside of the apply function.
// The provided cacheOK flag indicates whether this collection will be included in the cache generation that is
// being prepared. If cacheOK is false, fetch shouldn't fetch any resources, but the apply function that it
// returns must still delete resources from the backend.
fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error)
// onDelete will delete a single target resource from the cache. For
// singletons, this is usually an alias to clear.
onDelete(t types.Resource) error
// onUpdate will update a single target resource from the cache
onUpdate(t types.Resource) error
// watchKind returns a watch
// required for this collection
watchKind() types.WatchKind
}

func (c collection[_, _, _]) watchKind() types.WatchKind {
return c.watch
}

func (c *collection[T, _, _]) onDelete(r types.Resource) error {
switch t := r.(type) {
case types.Resource153Unwrapper:
unwrapped := t.Unwrap()
tt, ok := unwrapped.(T)
if !ok {
return trace.BadParameter("unexpected wrapped type %T (expected %T)", unwrapped, tt)
}

c.store.delete(tt)
return nil
case T:
c.store.delete(t)
return nil
default:
return trace.BadParameter("unexpected type %T (expected %T)", r, t)
}
}

func (c *collection[T, _, _]) onUpdate(r types.Resource) error {
switch t := r.(type) {
case types.Resource153Unwrapper:
unwrapped := t.Unwrap()
tt, ok := unwrapped.(T)
if !ok {
return trace.BadParameter("unexpected wrapped type %T (expected %T)", unwrapped, tt)
}

c.store.put(tt)
return nil
case T:
c.store.put(t)
return nil
default:
return trace.BadParameter("unexpected type %T (expected %T)", r, t)
}
}

func (c collection[T, _, _]) fetch(ctx context.Context, cacheOK bool) (apply func(context.Context) error, err error) {
// Singleton objects will only get deleted or updated, not both
deleteSingleton := false

var resources []T
if cacheOK {
resources, err = c.upstream.getAll(ctx, c.watch.LoadSecrets)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
deleteSingleton = true
}
}

return func(ctx context.Context) error {
// Always perform the delete if this is not a singleton, otherwise
// only perform the delete if the singleton wasn't found
// or the resource kind isn't cached in the current generation.
if !c.singleton || deleteSingleton || !cacheOK {
if err := c.store.clear(); err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
}
}
// If this is a singleton and we performed a deletion, return here
// because we only want to update or delete a singleton, not both.
// Also don't continue if the resource kind isn't cached in the current generation.
if c.singleton && deleteSingleton || !cacheOK {
return nil
}
for _, resource := range resources {
if err := c.store.put(resource); err != nil {
return trace.Wrap(err)
}
}
return nil
}, nil
}

type collections struct {
byKind map[resourceKind]eventHandler

staticTokens *collection[types.StaticTokens, *singletonStore[types.StaticTokens], *staticTokensUpstream]
}

func setupCollections(c Config, watches []types.WatchKind) (*collections, error) {
out := &collections{
byKind: make(map[resourceKind]eventHandler, 1),
}

for _, watch := range watches {
resourceKind := resourceKindFromWatchKind(watch)

switch watch.Kind {
case types.KindStaticTokens:
collect, err := newStaticTokensCollection(c.ClusterConfig, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.staticTokens = collect
out.byKind[resourceKind] = out.staticTokens

}
}

return out, nil
}
2 changes: 1 addition & 1 deletion lib/cache/generic_legacy_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (g *genericCollection[T, R, _]) watchKind() types.WatchKind {
return g.watch
}

var _ collection = (*genericCollection[types.Resource, any, executor[types.Resource, any]])(nil)
var _ legacyCollection = (*genericCollection[types.Resource, any, executor[types.Resource, any]])(nil)

// genericCollection obtains the reader object from the executor based on the provided health status of the cache.
// Note that cacheOK set to true means that cache is overall healthy and the collection was confirmed as supported.
Expand Down
14 changes: 3 additions & 11 deletions lib/cache/legacy_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ type legacyCollections struct {
samlIdPSessions collectionReader[samlIdPSessionGetter]
sessionRecordingConfigs collectionReader[sessionRecordingConfigGetter]
snowflakeSessions collectionReader[snowflakeSessionGetter]
staticTokens collectionReader[staticTokensGetter]
tokens collectionReader[tokenGetter]
uiConfigs collectionReader[uiConfigGetter]
users collectionReader[userGetter]
Expand Down Expand Up @@ -202,15 +201,6 @@ func setupLegacyCollections(c *Cache, watches []types.WatchKind) (*legacyCollect
watch: watch,
}
collections.byKind[resourceKind] = collections.certAuthorities
case types.KindStaticTokens:
if c.ClusterConfig == nil {
return nil, trace.BadParameter("missing parameter ClusterConfig")
}
collections.staticTokens = &genericCollection[types.StaticTokens, staticTokensGetter, staticTokensExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.staticTokens
case types.KindToken:
if c.Provisioner == nil {
return nil, trace.BadParameter("missing parameter Provisioner")
Expand Down Expand Up @@ -815,7 +805,9 @@ func setupLegacyCollections(c *Cache, watches []types.WatchKind) (*legacyCollect
}
collections.byKind[resourceKind] = collections.gitServers
default:
return nil, trace.BadParameter("resource %q is not supported", watch.Kind)
if _, ok := c.collections.byKind[resourceKind]; !ok {
return nil, trace.BadParameter("resource %q is not supported", watch.Kind)
}
}
}
return collections, nil
Expand Down
Loading

0 comments on commit 3be09f7

Please sign in to comment.