Skip to content

Commit

Permalink
use lru cache to cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
davidjumani committed Feb 21, 2025
1 parent 0b14b44 commit c96b18f
Showing 1 changed file with 33 additions and 32 deletions.
65 changes: 33 additions & 32 deletions projects/gateway2/status/status_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/solo-io/go-utils/contextutils"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/lru"

"github.com/solo-io/gloo/projects/gateway2/proxy_syncer"
gwplugins "github.com/solo-io/gloo/projects/gateway2/translator/plugins"
Expand Down Expand Up @@ -39,22 +40,22 @@ type GatewayStatusSyncer interface {
type statusSyncerFactory struct {
// maps a proxy sync action to the plugin registry that produced it
// sync iteration -> plugin registry
registryPerSync map[int]*registry.PluginRegistry
registryPerSyncCache *lru.Cache // map[int]*registry.PluginRegistry
// maps a proxy to the sync iteration that produced it
// only the latest sync iteration is stored and used to apply status plugins
resyncsPerProxy map[types.NamespacedName]int
// proxies left to sync
resyncsPerIteration map[int][]types.NamespacedName

lock *sync.Mutex
resyncsPerIterationCache *lru.Cache // map[int][]types.NamespacedName
lock *sync.Mutex
}

func NewStatusSyncerFactory() GatewayStatusSyncer {
return &statusSyncerFactory{
registryPerSync: make(map[int]*registry.PluginRegistry),
resyncsPerProxy: make(map[types.NamespacedName]int),
resyncsPerIteration: make(map[int][]types.NamespacedName),
lock: &sync.Mutex{},
// Set a max value of 3 for n-2 iterations. Ideally we should only care about n-1 but playing it safe
registryPerSyncCache: lru.New(3),
resyncsPerIterationCache: lru.New(3),
resyncsPerProxy: make(map[types.NamespacedName]int),
lock: &sync.Mutex{},
}
}

Expand All @@ -70,21 +71,20 @@ func (f *statusSyncerFactory) QueueStatusForProxies(

contextutils.LoggerFrom(ctx).Debugf("queueing %v proxies for sync %d", len(proxiesToQueue), totalSyncCount)

resyncsPerIteration := make([]types.NamespacedName, len(proxiesToQueue))
// queue each proxy for a given sync iteration
for _, proxy := range proxiesToQueue {
proxyKey := getProxyNameNamespace(proxy)
// overwrite the sync count for the proxy with the most recent sync count
f.resyncsPerProxy[proxyKey] = totalSyncCount

// keep track of proxies to check all proxies are handled in debugger
f.resyncsPerIteration[totalSyncCount] = append(f.resyncsPerIteration[totalSyncCount], proxyKey)
resyncsPerIteration = append(resyncsPerIteration, proxyKey)
}
f.resyncsPerIterationCache.Add(totalSyncCount, resyncsPerIteration)

// the plugin registry that produced the proxies is the same for all proxies in a given sync
f.registryPerSync[totalSyncCount] = pluginRegistry

delete(f.resyncsPerIteration, totalSyncCount-2)
delete(f.registryPerSync, totalSyncCount-2)
f.registryPerSyncCache.Add(totalSyncCount, pluginRegistry)
}

// HandleProxyReports is a callback that applies status plugins to the proxies that have been queued
Expand Down Expand Up @@ -113,39 +113,40 @@ func (f *statusSyncerFactory) HandleProxyReports(ctx context.Context, proxiesWit
continue
}

if len(f.resyncsPerIteration[proxySyncCount]) == 0 {
// remove the key so the map does not indefinitely grow
delete(f.resyncsPerIteration, proxySyncCount)
// re-sync already happened, nothing to do
// If the proxy does not exist in the cache, or the cache is empty, the re-sync already happened, nothing to do
resyncsPerIterationIface, ok := f.resyncsPerIterationCache.Get(proxySyncCount)
if !ok {
continue
} else {
updatedList := make([]types.NamespacedName, 0)
for _, proxyNameNs := range f.resyncsPerIteration[proxySyncCount] {
if proxyNameNs != proxyKey {
updatedList = append(updatedList, proxyNameNs)
}
}
f.resyncsPerIteration[proxySyncCount] = updatedList
}
resyncsPerIteration := resyncsPerIterationIface.([]types.NamespacedName)
if len(resyncsPerIteration) == 0 {
f.resyncsPerIterationCache.Remove(proxySyncCount)
continue
}

if len(f.resyncsPerIteration[proxySyncCount]) == 0 {
// remove the key so the map does not indefinitely grow
delete(f.resyncsPerIteration, proxySyncCount)
updatedList := make([]types.NamespacedName, 0)
for _, proxyNameNs := range resyncsPerIteration {
if proxyNameNs != proxyKey {
updatedList = append(updatedList, proxyNameNs)
}
}
f.resyncsPerIterationCache.Add(proxySyncCount, updatedList)

proxiesToReport[proxySyncCount] = append(proxiesToReport[proxySyncCount], proxyWithReport)
// remove the proxy from the queue
delete(f.resyncsPerProxy, proxyKey)
}

for syncCount, proxies := range proxiesToReport {
if plugins, ok := f.registryPerSync[syncCount]; ok {
newStatusSyncer(plugins).applyStatusPlugins(ctx, proxies)
if plugins, ok := f.registryPerSyncCache.Get(syncCount); ok {
newStatusSyncer(plugins.(*registry.PluginRegistry)).applyStatusPlugins(ctx, proxies)
}

// If there are no more proxies for the sync iteration, delete the sync count
if len(f.resyncsPerIteration[syncCount]) == 0 {
delete(f.registryPerSync, syncCount)
resyncsPerIterationIface, _ := f.resyncsPerIterationCache.Get(syncCount)
resyncsPerIteration := resyncsPerIterationIface.([]types.NamespacedName)
if len(resyncsPerIteration) == 0 {
f.registryPerSyncCache.Remove(syncCount)
}
}
}
Expand Down

0 comments on commit c96b18f

Please sign in to comment.