Skip to content

Commit

Permalink
Restore cache.go
Browse files Browse the repository at this point in the history
  • Loading branch information
liamfallon committed Feb 11, 2025
1 parent 23e5217 commit 81373da
Showing 1 changed file with 43 additions and 122 deletions.
165 changes: 43 additions & 122 deletions pkg/cache/memorycache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"context"
"errors"
"fmt"
"path/filepath"
"sync"
"time"

kptoci "github.com/GoogleContainerTools/kpt/pkg/oci"
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/cache"
"github.com/nephio-project/porch/pkg/git"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/oci"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/externalrepo"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/watch"
)

var tracer = otel.Tracer("memorycache")

// Cache allows us to keep state for repositories, rather than querying them every time.
//
// Cache Structure:
Expand All @@ -42,140 +38,56 @@ import (
// * Caches oci images with further hierarchy underneath
// * We Cache image layers in <cacheDir>/oci/layers/ (this might be obsolete with the flattened Cache)
// * We Cache flattened tar files in <cacheDir>/oci/ (so we don't need to pull to read resources)
// * We poll the repositories (every minute) and Cache the discovered images in memory.
// * We poll the repositories periodically (configurable) and cache the discovered images in memory.
type Cache struct {
mutex sync.Mutex
repositories map[string]*cachedRepository
cacheDir string
credentialResolver repository.CredentialResolver
userInfoProvider repository.UserInfoProvider
metadataStore meta.MetadataStore
repoSyncFrequency time.Duration
objectNotifier objectNotifier
useUserDefinedCaBundle bool
}

var _ cache.Cache = &Cache{}

type objectNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision) int
mutex sync.Mutex
repositories map[string]*cachedRepository
options cachetypes.CacheOptions
}

type CacheOptions struct {
CredentialResolver repository.CredentialResolver
UserInfoProvider repository.UserInfoProvider
MetadataStore meta.MetadataStore
ObjectNotifier objectNotifier
}

func NewCache(cacheDir string, repoSyncFrequency time.Duration, useUserDefinedCaBundle bool, opts CacheOptions) *Cache {
return &Cache{
repositories: make(map[string]*cachedRepository),
cacheDir: cacheDir,
credentialResolver: opts.CredentialResolver,
userInfoProvider: opts.UserInfoProvider,
metadataStore: opts.MetadataStore,
objectNotifier: opts.ObjectNotifier,
repoSyncFrequency: repoSyncFrequency,
useUserDefinedCaBundle: useUserDefinedCaBundle,
}
}

func getCacheKey(repositorySpec *configapi.Repository) (string, error) {
switch repositoryType := repositorySpec.Spec.Type; repositoryType {
case configapi.RepositoryTypeOCI:
ociSpec := repositorySpec.Spec.Oci
if ociSpec == nil {
return "", fmt.Errorf("oci not configured")
}
return "oci://" + ociSpec.Registry, nil

case configapi.RepositoryTypeGit:
gitSpec := repositorySpec.Spec.Git
if gitSpec == nil {
return "", errors.New("git property is required")
}
if gitSpec.Repo == "" {
return "", errors.New("git.repo property is required")
}
return fmt.Sprintf("git://%s/%s@%s/%s", gitSpec.Repo, gitSpec.Directory, repositorySpec.Namespace, repositorySpec.Name), nil

default:
return "", fmt.Errorf("repository type %q not supported", repositoryType)
}
}
var _ cachetypes.Cache = &Cache{}

func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error) {
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
defer span.End()

key, err := getCacheKey(repositorySpec)
key, err := externalrepo.RepositoryKey(repositorySpec)
if err != nil {
return nil, err
}

c.mutex.Lock()
defer c.mutex.Unlock()
cachedRepo := c.repositories[key]

switch repositoryType := repositorySpec.Spec.Type; repositoryType {
case configapi.RepositoryTypeOCI:
ociSpec := repositorySpec.Spec.Oci
if cachedRepo == nil {
cacheDir := filepath.Join(c.cacheDir, "oci")
storage, err := kptoci.NewStorage(cacheDir)
if err != nil {
return nil, err
}

r, err := oci.OpenRepository(repositorySpec.Name, repositorySpec.Namespace, ociSpec, repositorySpec.Spec.Deployment, storage)
if err != nil {
return nil, err
}
cachedRepo = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cachedRepo
}
return cachedRepo, nil

case configapi.RepositoryTypeGit:
gitSpec := repositorySpec.Spec.Git
if cachedRepo == nil {
var mbs git.MainBranchStrategy
if gitSpec.CreateBranch {
mbs = git.CreateIfMissing
} else {
mbs = git.ErrorIfMissing
}

r, err := git.OpenRepository(ctx, repositorySpec.Name, repositorySpec.Namespace, gitSpec, repositorySpec.Spec.Deployment, filepath.Join(c.cacheDir, "git"), git.GitRepositoryOptions{
CredentialResolver: c.credentialResolver,
UserInfoProvider: c.userInfoProvider,
MainBranchStrategy: mbs,
UseUserDefinedCaBundle: c.useUserDefinedCaBundle,
})
if err != nil {
return nil, err
}

cachedRepo = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cachedRepo

if cachedRepo := c.repositories[key]; cachedRepo != nil {
// If there is an error from the background refresh goroutine, return it.
if err := cachedRepo.getRefreshError(); err == nil {
return cachedRepo, nil
} else {
// If there is an error from the background refresh goroutine, return it.
if err := cachedRepo.getRefreshError(); err != nil {
return nil, err
}
return nil, err
}
return cachedRepo, nil
}

default:
return nil, fmt.Errorf("type %q not supported", repositoryType)
externalRepo, err := externalrepo.CreateRepositoryImpl(ctx, repositorySpec, c.options.ExternalRepoOptions)
if err != nil {
return nil, err
}

cachedRepo := newRepository(key, repositorySpec, externalRepo, c.options)
c.repositories[key] = cachedRepo

return cachedRepo, nil
}

func (c *Cache) UpdateRepository(ctx context.Context, repositorySpec *configapi.Repository) error {
return errors.New("update on memory cached repositories is not supported")
}

func (c *Cache) CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error {
_, span := tracer.Start(ctx, "Cache::CloseRepository", trace.WithAttributes())
defer span.End()

key, err := getCacheKey(repositorySpec)
key, err := externalrepo.RepositoryKey(repositorySpec)
if err != nil {
return err
}
Expand All @@ -185,7 +97,7 @@ func (c *Cache) CloseRepository(ctx context.Context, repositorySpec *configapi.R
if r.Name == repositorySpec.Name && r.Namespace == repositorySpec.Namespace {
continue
}
otherKey, err := getCacheKey(&r)
otherKey, err := externalrepo.RepositoryKey(&r)
if err != nil {
return err
}
Expand All @@ -211,3 +123,12 @@ func (c *Cache) CloseRepository(ctx context.Context, repositorySpec *configapi.R
return nil
}
}

func (c *Cache) GetRepositories(ctx context.Context) []configapi.Repository {
repoSlice := []configapi.Repository{}

for _, repo := range c.repositories {
repoSlice = append(repoSlice, *repo.repoSpec)
}
return repoSlice
}

0 comments on commit 81373da

Please sign in to comment.