Skip to content

Commit

Permalink
Revert "use lru cache to cleanup"
Browse files Browse the repository at this point in the history
This reverts commit c96b18f.
  • Loading branch information
davidjumani committed Feb 22, 2025
1 parent c96b18f commit 04a6f44
Showing 1 changed file with 32 additions and 33 deletions.
65 changes: 32 additions & 33 deletions projects/gateway2/status/status_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/solo-io/go-utils/contextutils"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/lru"

"github.com/solo-io/gloo/projects/gateway2/proxy_syncer"
gwplugins "github.com/solo-io/gloo/projects/gateway2/translator/plugins"
Expand Down Expand Up @@ -40,22 +39,22 @@ type GatewayStatusSyncer interface {
type statusSyncerFactory struct {
// maps a proxy sync action to the plugin registry that produced it
// sync iteration -> plugin registry
registryPerSyncCache *lru.Cache // map[int]*registry.PluginRegistry
registryPerSync map[int]*registry.PluginRegistry
// maps a proxy to the sync iteration that produced it
// only the latest sync iteration is stored and used to apply status plugins
resyncsPerProxy map[types.NamespacedName]int
// proxies left to sync
resyncsPerIterationCache *lru.Cache // map[int][]types.NamespacedName
lock *sync.Mutex
resyncsPerIteration map[int][]types.NamespacedName

lock *sync.Mutex
}

func NewStatusSyncerFactory() GatewayStatusSyncer {
return &statusSyncerFactory{
// Set a max value of 3 for n-2 iterations. Ideally we should only care about n-1 but playing it safe
registryPerSyncCache: lru.New(3),
resyncsPerIterationCache: lru.New(3),
resyncsPerProxy: make(map[types.NamespacedName]int),
lock: &sync.Mutex{},
registryPerSync: make(map[int]*registry.PluginRegistry),
resyncsPerProxy: make(map[types.NamespacedName]int),
resyncsPerIteration: make(map[int][]types.NamespacedName),
lock: &sync.Mutex{},
}
}

Expand All @@ -71,20 +70,21 @@ func (f *statusSyncerFactory) QueueStatusForProxies(

contextutils.LoggerFrom(ctx).Debugf("queueing %v proxies for sync %d", len(proxiesToQueue), totalSyncCount)

resyncsPerIteration := make([]types.NamespacedName, len(proxiesToQueue))
// queue each proxy for a given sync iteration
for _, proxy := range proxiesToQueue {
proxyKey := getProxyNameNamespace(proxy)
// overwrite the sync count for the proxy with the most recent sync count
f.resyncsPerProxy[proxyKey] = totalSyncCount

// keep track of proxies to check all proxies are handled in debugger
resyncsPerIteration = append(resyncsPerIteration, proxyKey)
f.resyncsPerIteration[totalSyncCount] = append(f.resyncsPerIteration[totalSyncCount], proxyKey)
}
f.resyncsPerIterationCache.Add(totalSyncCount, resyncsPerIteration)

// the plugin registry that produced the proxies is the same for all proxies in a given sync
f.registryPerSyncCache.Add(totalSyncCount, pluginRegistry)
f.registryPerSync[totalSyncCount] = pluginRegistry

delete(f.resyncsPerIteration, totalSyncCount-2)
delete(f.registryPerSync, totalSyncCount-2)
}

// HandleProxyReports is a callback that applies status plugins to the proxies that have been queued
Expand Down Expand Up @@ -113,40 +113,39 @@ func (f *statusSyncerFactory) HandleProxyReports(ctx context.Context, proxiesWit
continue
}

// If the proxy does not exist in the cache, or the cache is empty, the re-sync already happened, nothing to do
resyncsPerIterationIface, ok := f.resyncsPerIterationCache.Get(proxySyncCount)
if !ok {
continue
}
resyncsPerIteration := resyncsPerIterationIface.([]types.NamespacedName)
if len(resyncsPerIteration) == 0 {
f.resyncsPerIterationCache.Remove(proxySyncCount)
if len(f.resyncsPerIteration[proxySyncCount]) == 0 {
// remove the key so the map does not indefinitely grow
delete(f.resyncsPerIteration, proxySyncCount)
// re-sync already happened, nothing to do
continue
}
} else {
updatedList := make([]types.NamespacedName, 0)
for _, proxyNameNs := range f.resyncsPerIteration[proxySyncCount] {
if proxyNameNs != proxyKey {
updatedList = append(updatedList, proxyNameNs)
}
}
f.resyncsPerIteration[proxySyncCount] = updatedList

updatedList := make([]types.NamespacedName, 0)
for _, proxyNameNs := range resyncsPerIteration {
if proxyNameNs != proxyKey {
updatedList = append(updatedList, proxyNameNs)
if len(f.resyncsPerIteration[proxySyncCount]) == 0 {
// remove the key so the map does not indefinitely grow
delete(f.resyncsPerIteration, proxySyncCount)
}
}
f.resyncsPerIterationCache.Add(proxySyncCount, updatedList)

proxiesToReport[proxySyncCount] = append(proxiesToReport[proxySyncCount], proxyWithReport)
// remove the proxy from the queue
delete(f.resyncsPerProxy, proxyKey)
}

for syncCount, proxies := range proxiesToReport {
if plugins, ok := f.registryPerSyncCache.Get(syncCount); ok {
newStatusSyncer(plugins.(*registry.PluginRegistry)).applyStatusPlugins(ctx, proxies)
if plugins, ok := f.registryPerSync[syncCount]; ok {
newStatusSyncer(plugins).applyStatusPlugins(ctx, proxies)
}

// If there are no more proxies for the sync iteration, delete the sync count
resyncsPerIterationIface, _ := f.resyncsPerIterationCache.Get(syncCount)
resyncsPerIteration := resyncsPerIterationIface.([]types.NamespacedName)
if len(resyncsPerIteration) == 0 {
f.registryPerSyncCache.Remove(syncCount)
if len(f.resyncsPerIteration[syncCount]) == 0 {
delete(f.registryPerSync, syncCount)
}
}
}
Expand Down

0 comments on commit 04a6f44

Please sign in to comment.