From 5197f6062ceebda538cdafd82f5bb5fcc3254695 Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Wed, 5 Feb 2025 22:10:47 -0800 Subject: [PATCH] utilize sync maps to map project mappings of prom watches (#6222) * utilize sync maps to map project mappings of prom watches Signed-off-by: Paul Dittamo * set nested map to be sync map as well Signed-off-by: Paul Dittamo * clean up Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo --- .../pkg/manager/impl/execution_manager.go | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index fd8f0870f1..3f7ed92d27 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "sync" "time" "github.com/benbjohnson/clock" @@ -46,9 +47,6 @@ import ( const childContainerQueueKey = "child_queue" -// Map of [project] -> map of [domain] -> stop watch -type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch - type executionSystemMetrics struct { Scope promutils.Scope ActiveExecutions prometheus.Gauge @@ -68,8 +66,8 @@ type executionSystemMetrics struct { type executionUserMetrics struct { Scope promutils.Scope - ScheduledExecutionDelays projectDomainScopedStopWatchMap - WorkflowExecutionDurations projectDomainScopedStopWatchMap + ScheduledExecutionDelays sync.Map // Map of [project] -> map of [domain] -> stop watch + WorkflowExecutionDurations sync.Map // Map of [project] -> map of [domain] -> stop watch WorkflowExecutionInputBytes prometheus.Summary WorkflowExecutionOutputBytes prometheus.Summary } @@ -81,7 +79,7 @@ type ExecutionManager struct { queueAllocator executions.QueueAllocator _clock clock.Clock systemMetrics executionSystemMetrics - userMetrics executionUserMetrics + userMetrics *executionUserMetrics notificationClient notificationInterfaces.Publisher urlData dataInterfaces.RemoteURLInterface workflowManager interfaces.WorkflowInterface @@ -1347,16 +1345,19 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics( return } - domainCounterMap, ok := m.userMetrics.ScheduledExecutionDelays[execution.GetId().GetProject()] + projectKey := execution.GetId().GetProject() + val, ok := m.userMetrics.ScheduledExecutionDelays.Load(projectKey) if !ok { - domainCounterMap = make(map[string]*promutils.StopWatch) - m.userMetrics.ScheduledExecutionDelays[execution.GetId().GetProject()] = domainCounterMap + val = &sync.Map{} + m.userMetrics.ScheduledExecutionDelays.Store(projectKey, val) } - var watch *promutils.StopWatch - watch, ok = domainCounterMap[execution.GetId().GetDomain()] + domainCounterMap := val.(*sync.Map) + + domainKey := execution.GetId().GetDomain() + watchVal, ok := domainCounterMap.Load(domainKey) if !ok { - newWatch, err := m.systemMetrics.Scope.NewSubScope(execution.GetId().GetProject()).NewSubScope(execution.GetId().GetDomain()).NewStopWatch( + newWatch, err := m.systemMetrics.Scope.NewSubScope(execution.GetId().GetProject()).NewSubScope(domainKey).NewStopWatch( "scheduled_execution_delay", "delay between scheduled execution time and time execution was observed running", time.Nanosecond) @@ -1366,9 +1367,11 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics( "failed to emit scheduled workflow execution delay stat, couldn't find or create counter") return } - watch = &newWatch - domainCounterMap[execution.GetId().GetDomain()] = watch + watchVal = &newWatch + domainCounterMap.Store(domainKey, watchVal) } + + watch := watchVal.(*promutils.StopWatch) watch.Observe(scheduledKickoffTime, runningEventTime) } @@ -1380,16 +1383,19 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime( return } - domainCounterMap, ok := m.userMetrics.WorkflowExecutionDurations[executionModel.Project] + projectKey := executionModel.Project + val, ok := m.userMetrics.WorkflowExecutionDurations.Load(projectKey) if !ok { - domainCounterMap = make(map[string]*promutils.StopWatch) - m.userMetrics.WorkflowExecutionDurations[executionModel.Project] = domainCounterMap + val = &sync.Map{} + m.userMetrics.WorkflowExecutionDurations.Store(projectKey, val) } - var watch *promutils.StopWatch - watch, ok = domainCounterMap[executionModel.Domain] + domainCounterMap := val.(*sync.Map) + + domainKey := executionModel.Domain + watchVal, ok := domainCounterMap.Load(domainKey) if !ok { - newWatch, err := m.systemMetrics.Scope.NewSubScope(executionModel.Project).NewSubScope(executionModel.Domain).NewStopWatch( + newWatch, err := m.systemMetrics.Scope.NewSubScope(executionModel.Project).NewSubScope(domainKey).NewStopWatch( "workflow_execution_duration", "overall time from when when a workflow create request was sent to k8s to the workflow terminating", time.Nanosecond) @@ -1399,10 +1405,12 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime( "failed to emit workflow execution duration stat, couldn't find or create counter") return } - watch = &newWatch - domainCounterMap[executionModel.Domain] = watch + watchVal = &newWatch + domainCounterMap.Store(domainKey, watchVal) } + watch := watchVal.(*promutils.StopWatch) + terminalEventTime, err := ptypes.Timestamp(terminalEventTimeProto) if err != nil { // Timestamps are always sent from propeller and should always be valid @@ -1878,8 +1886,8 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu userMetrics := executionUserMetrics{ Scope: userScope, - ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch), - WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch), + ScheduledExecutionDelays: sync.Map{}, + WorkflowExecutionDurations: sync.Map{}, WorkflowExecutionInputBytes: userScope.MustNewSummary("input_size_bytes", "size in bytes of serialized execution inputs"), WorkflowExecutionOutputBytes: userScope.MustNewSummary("output_size_bytes", @@ -1894,7 +1902,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu queueAllocator: queueAllocator, _clock: clock.New(), systemMetrics: systemMetrics, - userMetrics: userMetrics, + userMetrics: &userMetrics, notificationClient: publisher, urlData: urlData, workflowManager: workflowManager,