Skip to content

Commit

Permalink
improve documentation and organization
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy committed Feb 21, 2025
1 parent 56a7cbe commit fb1947b
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 80 deletions.
2 changes: 1 addition & 1 deletion lib/cache/cert_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newCertAuthorityCollection(t services.Trust, w types.WatchKind) (*collectio
},
}
},
getAll: func(ctx context.Context, loadSecrets bool) ([]types.CertAuthority, error) {
fetcher: func(ctx context.Context, loadSecrets bool) ([]types.CertAuthority, error) {
var authorities []types.CertAuthority
for _, caType := range types.CertAuthTypes {
cas, err := t.GetCertAuthorities(ctx, caType, loadSecrets)
Expand Down
117 changes: 46 additions & 71 deletions lib/cache/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,40 @@ import (

// collection is responsible for managing a cached resource.
type collection[T any] struct {
getAll func(ctx context.Context, loadSecrets bool) ([]T, error)
store *store[T]
watch types.WatchKind
// fetcher is called by fetch to retrieve and seed the
// store with all known resources from upstream.
fetcher func(ctx context.Context, loadSecrets bool) ([]T, error)
// store persists all resources in memory.
store *store[T]
// watch contains the kind of
watch types.WatchKind
// headerTransform is used when handling delete events in [onDelete]. Since
// [types.OpDelete] events only contain information about the resource key,
// most event handlers only emit a [types.ResourceHeader] which has enough
// information to identify a resource. Some resources do emit a half
// populated [T], or have enough information from the key to emit a full [T].
//
// If this optional transformation is supplied it will be called when
// processing delete events before attempting to delete the resource
// from the store.
headerTransform func(hdr *types.ResourceHeader) T
filter func(T) bool
singleton bool
// filter is an optional function used to prevent some resources
// from being persisted in the store.
filter func(T) bool
// singleton indicates if the resource should only ever have a single item.
singleton bool
}

func (c collection[_]) watchKind() types.WatchKind {
return c.watch
}

// onDelete attempts to remove the provided resource from the store.
// An error is returned if the resource is of an unexpected type, or
// the resource is a [types.ResourceHeader] and no headerTransform was
// specified.
//
// This is a no-op if the configured filter does not return true.
func (c *collection[T]) onDelete(r types.Resource) error {
switch t := r.(type) {
case types.Resource153Unwrapper:
Expand All @@ -47,21 +69,37 @@ func (c *collection[T]) onDelete(r types.Resource) error {
if !ok {
return trace.BadParameter("unexpected wrapped type %T (expected %v)", unwrapped, reflect.TypeOf((*T)(nil)).Elem())
}
if c.filter != nil && !c.filter(tt) {
return nil
}

return trace.Wrap(c.store.delete(tt))
case *types.ResourceHeader:
if c.headerTransform == nil {
return trace.BadParameter("unable to convert types.ResourceHeader to %v (no transform specified, this is a bug)", reflect.TypeOf((*T)(nil)).Elem())
}

return trace.Wrap(c.store.delete(c.headerTransform(t)))
tt := c.headerTransform(t)
if c.filter != nil && !c.filter(tt) {
return nil
}

return trace.Wrap(c.store.delete(tt))
case T:
if c.filter != nil && !c.filter(t) {
return nil
}

return trace.Wrap(c.store.delete(t))
default:
return trace.BadParameter("unexpected type %T (expected %v)", r, reflect.TypeOf((*T)(nil)).Elem())
}
}

// onUpdate attempts to place the resource into the local store.
// An error is returned if the resource is of an unexpected type
//
// This is a no-op if the configured filter does not return true.
func (c *collection[T]) onUpdate(r types.Resource) error {
switch t := r.(type) {
case types.Resource153Unwrapper:
Expand Down Expand Up @@ -89,13 +127,14 @@ func (c *collection[T]) onUpdate(r types.Resource) error {
}
}

// fetch populates the store with items received by the configured fetcher.
func (c collection[T]) fetch(ctx context.Context, cacheOK bool) (apply func(context.Context) error, err error) {
// Singleton objects will only get deleted or updated, not both
deleteSingleton := false

var resources []T
if cacheOK {
resources, err = c.getAll(ctx, c.watch.LoadSecrets)
resources, err = c.fetcher(ctx, c.watch.LoadSecrets)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -129,67 +168,3 @@ func (c collection[T]) fetch(ctx context.Context, cacheOK bool) (apply func(cont
return nil
}, nil
}

type resourceHandler interface {
// fetch fetches resources and returns a function which will apply said resources to the cache.
// fetch *must* not mutate cache state outside of the apply function.
// The provided cacheOK flag indicates whether this collection will be included in the cache generation that is
// being prepared. If cacheOK is false, fetch shouldn't fetch any resources, but the apply function that it
// returns must still delete resources from the backend.
fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error)
// onDelete will delete a single target resource from the cache. For
// singletons, this is usually an alias to clear.
onDelete(t types.Resource) error
// onUpdate will update a single target resource from the cache
onUpdate(t types.Resource) error
// watchKind returns a watch
// required for this collection
watchKind() types.WatchKind
}

type collections struct {
byKind map[resourceKind]resourceHandler

staticTokens *collection[types.StaticTokens]
certAuthorities *collection[types.CertAuthority]
users *collection[types.User]
}

func setupCollections(c Config, watches []types.WatchKind) (*collections, error) {
out := &collections{
byKind: make(map[resourceKind]resourceHandler, 1),
}

for _, watch := range watches {
resourceKind := resourceKindFromWatchKind(watch)

switch watch.Kind {
case types.KindStaticTokens:
collect, err := newStaticTokensCollection(c.ClusterConfig, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.staticTokens = collect
out.byKind[resourceKind] = out.staticTokens
case types.KindCertAuthority:
collect, err := newCertAuthorityCollection(c.Trust, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.certAuthorities = collect
out.byKind[resourceKind] = out.certAuthorities
case types.KindUser:
collect, err := newUserCollection(c.Users, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.users = collect
out.byKind[resourceKind] = out.users
}
}

return out, nil
}
96 changes: 96 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Teleport
// Copyright (C) 2025 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cache

import (
"context"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/types"
)

// collectionHandler is used by the [Cache] to seed the initial
// data and process events for a particular resource.
type collectionHandler interface {
// fetch fetches resources and returns a function which will apply said resources to the cache.
// fetch *must* not mutate cache state outside of the apply function.
// The provided cacheOK flag indicates whether this collection will be included in the cache generation that is
// being prepared. If cacheOK is false, fetch shouldn't fetch any resources, but the apply function that it
// returns must still delete resources from the backend.
fetch(ctx context.Context, cacheOK bool) (apply func(ctx context.Context) error, err error)
// onDelete will delete a single target resource from the cache. For
// singletons, this is usually an alias to clear.
onDelete(t types.Resource) error
// onUpdate will update a single target resource from the cache
onUpdate(t types.Resource) error
// watchKind returns a watch
// required for this collection
watchKind() types.WatchKind
}

// collections is the group of resource [collection]s
// that the [Cache] supports.
type collections struct {
byKind map[resourceKind]collectionHandler

staticTokens *collection[types.StaticTokens]
certAuthorities *collection[types.CertAuthority]
users *collection[types.User]
}

// setupCollections ensures that the appropriate [collection] is
// initialized for all provided [types.WatcKind]s. An error is
// returned if a [types.WatchKind] has no associated [collection].
func setupCollections(c Config, watches []types.WatchKind) (*collections, error) {
out := &collections{
byKind: make(map[resourceKind]collectionHandler, 1),
}

for _, watch := range watches {
resourceKind := resourceKindFromWatchKind(watch)

switch watch.Kind {
case types.KindStaticTokens:
collect, err := newStaticTokensCollection(c.ClusterConfig, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.staticTokens = collect
out.byKind[resourceKind] = out.staticTokens
case types.KindCertAuthority:
collect, err := newCertAuthorityCollection(c.Trust, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.certAuthorities = collect
out.byKind[resourceKind] = out.certAuthorities
case types.KindUser:
collect, err := newUserCollection(c.Users, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.users = collect
out.byKind[resourceKind] = out.users
}
}

return out, nil
}
6 changes: 0 additions & 6 deletions lib/cache/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,4 @@
// This approach allows cache to be up to date without
// time based expiration and avoid re-fetching all
// resources reducing bandwidth.
//
// There are two types of cache backends used:
//
// * SQLite-based in-memory used for auth nodes
// * SQLite-based on disk persistent cache for nodes and proxies
// providing resilliency in the face of auth servers failures.
package cache
2 changes: 1 addition & 1 deletion lib/cache/static_tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newStaticTokensCollection(c services.ClusterConfiguration, w types.WatchKin
return u.GetName()
},
}),
getAll: func(ctx context.Context, loadSecrets bool) ([]types.StaticTokens, error) {
fetcher: func(ctx context.Context, loadSecrets bool) ([]types.StaticTokens, error) {
token, err := c.GetStaticTokens()
if err != nil {
return nil, trace.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion lib/cache/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newUserCollection(u services.UsersService, w types.WatchKind) (*collection[
return u.GetName()
},
}),
getAll: func(ctx context.Context, loadSecrets bool) ([]types.User, error) {
fetcher: func(ctx context.Context, loadSecrets bool) ([]types.User, error) {
return u.GetUsers(ctx, loadSecrets)
},
headerTransform: func(hdr *types.ResourceHeader) types.User {
Expand Down

0 comments on commit fb1947b

Please sign in to comment.