Skip to content

Commit

Permalink
utilize sync maps to map project mappings of prom watches (#6222)
Browse files Browse the repository at this point in the history
* utilize sync maps to map project mappings of prom watches

Signed-off-by: Paul Dittamo <[email protected]>

* set nested map to be sync map as well

Signed-off-by: Paul Dittamo <[email protected]>

* clean up

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Feb 6, 2025
1 parent b5443c8 commit 5197f60
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -46,9 +47,6 @@ import (

const childContainerQueueKey = "child_queue"

// Map of [project] -> map of [domain] -> stop watch
type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch

type executionSystemMetrics struct {
Scope promutils.Scope
ActiveExecutions prometheus.Gauge
Expand All @@ -68,8 +66,8 @@ type executionSystemMetrics struct {

type executionUserMetrics struct {
Scope promutils.Scope
ScheduledExecutionDelays projectDomainScopedStopWatchMap
WorkflowExecutionDurations projectDomainScopedStopWatchMap
ScheduledExecutionDelays sync.Map // Map of [project] -> map of [domain] -> stop watch
WorkflowExecutionDurations sync.Map // Map of [project] -> map of [domain] -> stop watch
WorkflowExecutionInputBytes prometheus.Summary
WorkflowExecutionOutputBytes prometheus.Summary
}
Expand All @@ -81,7 +79,7 @@ type ExecutionManager struct {
queueAllocator executions.QueueAllocator
_clock clock.Clock
systemMetrics executionSystemMetrics
userMetrics executionUserMetrics
userMetrics *executionUserMetrics
notificationClient notificationInterfaces.Publisher
urlData dataInterfaces.RemoteURLInterface
workflowManager interfaces.WorkflowInterface
Expand Down Expand Up @@ -1347,16 +1345,19 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
return
}

domainCounterMap, ok := m.userMetrics.ScheduledExecutionDelays[execution.GetId().GetProject()]
projectKey := execution.GetId().GetProject()
val, ok := m.userMetrics.ScheduledExecutionDelays.Load(projectKey)
if !ok {
domainCounterMap = make(map[string]*promutils.StopWatch)
m.userMetrics.ScheduledExecutionDelays[execution.GetId().GetProject()] = domainCounterMap
val = &sync.Map{}
m.userMetrics.ScheduledExecutionDelays.Store(projectKey, val)
}

var watch *promutils.StopWatch
watch, ok = domainCounterMap[execution.GetId().GetDomain()]
domainCounterMap := val.(*sync.Map)

domainKey := execution.GetId().GetDomain()
watchVal, ok := domainCounterMap.Load(domainKey)
if !ok {
newWatch, err := m.systemMetrics.Scope.NewSubScope(execution.GetId().GetProject()).NewSubScope(execution.GetId().GetDomain()).NewStopWatch(
newWatch, err := m.systemMetrics.Scope.NewSubScope(execution.GetId().GetProject()).NewSubScope(domainKey).NewStopWatch(
"scheduled_execution_delay",
"delay between scheduled execution time and time execution was observed running",
time.Nanosecond)
Expand All @@ -1366,9 +1367,11 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
"failed to emit scheduled workflow execution delay stat, couldn't find or create counter")
return
}
watch = &newWatch
domainCounterMap[execution.GetId().GetDomain()] = watch
watchVal = &newWatch
domainCounterMap.Store(domainKey, watchVal)
}

watch := watchVal.(*promutils.StopWatch)
watch.Observe(scheduledKickoffTime, runningEventTime)
}

Expand All @@ -1380,16 +1383,19 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime(
return
}

domainCounterMap, ok := m.userMetrics.WorkflowExecutionDurations[executionModel.Project]
projectKey := executionModel.Project
val, ok := m.userMetrics.WorkflowExecutionDurations.Load(projectKey)
if !ok {
domainCounterMap = make(map[string]*promutils.StopWatch)
m.userMetrics.WorkflowExecutionDurations[executionModel.Project] = domainCounterMap
val = &sync.Map{}
m.userMetrics.WorkflowExecutionDurations.Store(projectKey, val)
}

var watch *promutils.StopWatch
watch, ok = domainCounterMap[executionModel.Domain]
domainCounterMap := val.(*sync.Map)

domainKey := executionModel.Domain
watchVal, ok := domainCounterMap.Load(domainKey)
if !ok {
newWatch, err := m.systemMetrics.Scope.NewSubScope(executionModel.Project).NewSubScope(executionModel.Domain).NewStopWatch(
newWatch, err := m.systemMetrics.Scope.NewSubScope(executionModel.Project).NewSubScope(domainKey).NewStopWatch(
"workflow_execution_duration",
"overall time from when when a workflow create request was sent to k8s to the workflow terminating",
time.Nanosecond)
Expand All @@ -1399,10 +1405,12 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime(
"failed to emit workflow execution duration stat, couldn't find or create counter")
return
}
watch = &newWatch
domainCounterMap[executionModel.Domain] = watch
watchVal = &newWatch
domainCounterMap.Store(domainKey, watchVal)
}

watch := watchVal.(*promutils.StopWatch)

terminalEventTime, err := ptypes.Timestamp(terminalEventTimeProto)
if err != nil {
// Timestamps are always sent from propeller and should always be valid
Expand Down Expand Up @@ -1878,8 +1886,8 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu

userMetrics := executionUserMetrics{
Scope: userScope,
ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch),
WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch),
ScheduledExecutionDelays: sync.Map{},
WorkflowExecutionDurations: sync.Map{},
WorkflowExecutionInputBytes: userScope.MustNewSummary("input_size_bytes",
"size in bytes of serialized execution inputs"),
WorkflowExecutionOutputBytes: userScope.MustNewSummary("output_size_bytes",
Expand All @@ -1894,7 +1902,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu
queueAllocator: queueAllocator,
_clock: clock.New(),
systemMetrics: systemMetrics,
userMetrics: userMetrics,
userMetrics: &userMetrics,
notificationClient: publisher,
urlData: urlData,
workflowManager: workflowManager,
Expand Down

0 comments on commit 5197f60

Please sign in to comment.