Skip to content

Commit

Permalink
Merge pull request #7 from krancour/exporter-tests
Browse files Browse the repository at this point in the history
improve test coverage on exporter
  • Loading branch information
krancour authored Jul 26, 2021
2 parents 2f28aff + bfef4d9 commit b533fa8
Show file tree
Hide file tree
Showing 3 changed files with 492 additions and 79 deletions.
170 changes: 93 additions & 77 deletions exporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,50 @@ import (
)

type metricsExporter struct {
apiClient sdk.APIClient
coreClient core.APIClient
authnClient authn.APIClient
scrapeInterval time.Duration
totalProjects prometheus.Gauge
totalUsers prometheus.Gauge
totalServiceAccounts prometheus.Gauge
projectsGauge prometheus.Gauge
usersGauge prometheus.Gauge
serviceAccountsGauge prometheus.Gauge
allWorkersByPhase *prometheus.GaugeVec
totalPendingJobs prometheus.Gauge
pendingJobsGauge prometheus.Gauge
}

func newMetricsExporter(
apiClient sdk.APIClient,
scrapeInterval time.Duration,
) *metricsExporter {
return &metricsExporter{
apiClient: apiClient,
coreClient: apiClient.Core(),
authnClient: apiClient.Authn(),
scrapeInterval: scrapeInterval,
totalProjects: promauto.NewGauge(
projectsGauge: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "brigade_projects_total",
Help: "The total number of brigade projects",
Help: "The total number of projects",
},
),
totalUsers: promauto.NewGauge(
usersGauge: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "brigade_users_total",
Help: "The total number of users",
},
),
totalServiceAccounts: promauto.NewGauge(
serviceAccountsGauge: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "brigade_service_accounts_total",
Help: "The total number of service accounts",
},
),
allWorkersByPhase: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "brigade_all_workers_by_phase",
Help: "All workers separated by phase",
Name: "brigade_events_by_worker_phase",
Help: "The total number of events grouped by worker phase",
},
[]string{"workerPhase"},
),
totalPendingJobs: promauto.NewGauge(
pendingJobsGauge: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "brigade_pending_jobs_total",
Help: "The total number of pending jobs",
Expand All @@ -70,112 +72,126 @@ func (m *metricsExporter) run(ctx context.Context) {
for {
select {
case <-ticker.C:
m.recordMetrics()
if err := m.recordProjectsCount(); err != nil {
log.Println(err)
}
if err := m.recordUsersCount(); err != nil {
log.Println(err)
}
if err := m.recordServiceAccountsCount(); err != nil {
log.Println(err)
}
if err := m.recordEventCountsByWorkersPhase(); err != nil {
log.Println(err)
}
if err := m.recordPendingJobsCount(); err != nil {
log.Println(err)
}
case <-ctx.Done():
return
}
}
}

func (m *metricsExporter) recordMetrics() {
func (m *metricsExporter) recordProjectsCount() error {
// brigade_projects_total
projects, err := m.apiClient.Core().Projects().List(
projects, err := m.coreClient.Projects().List(
context.Background(),
&core.ProjectsSelector{},
&meta.ListOptions{},
)
if err != nil {
log.Println(err)
} else {
m.totalProjects.Set(float64(len(projects.Items) +
int(projects.RemainingItemCount)))
return err
}
m.projectsGauge.Set(
float64(len(projects.Items) + int(projects.RemainingItemCount)),
)
return nil
}

func (m *metricsExporter) recordUsersCount() error {
// brigade_users_total
users, err := m.apiClient.Authn().Users().List(
users, err := m.authnClient.Users().List(
context.Background(),
&authn.UsersSelector{},
&meta.ListOptions{},
)
if err != nil {
log.Println(err)
} else {
m.totalUsers.Set(float64(len(users.Items) +
int(users.RemainingItemCount)))
return err
}
m.usersGauge.Set(
float64(int64(len(users.Items)) + users.RemainingItemCount),
)
return nil
}

func (m *metricsExporter) recordServiceAccountsCount() error {
// brigade_service_accounts_total
serviceAccounts, err := m.apiClient.Authn().ServiceAccounts().List(
serviceAccounts, err := m.authnClient.ServiceAccounts().List(
context.Background(),
&authn.ServiceAccountsSelector{},
&meta.ListOptions{},
)
if err != nil {
log.Println(err)
} else {
m.totalServiceAccounts.Set(
float64(
len(serviceAccounts.Items) +
int(serviceAccounts.RemainingItemCount),
),
)
return err
}
m.serviceAccountsGauge.Set(
float64(
int64(len(serviceAccounts.Items)) + serviceAccounts.RemainingItemCount,
),
)
return nil
}

// brigade_all_workers_by_phase
func (m *metricsExporter) recordEventCountsByWorkersPhase() error {
// brigade_events_by_worker_phase
for _, phase := range core.WorkerPhasesAll() {
var events core.EventList
events, err = m.apiClient.Core().Events().List(
events, err := m.coreClient.Events().List(
context.Background(),
&core.EventsSelector{
WorkerPhases: []core.WorkerPhase{phase},
},
&meta.ListOptions{},
)
if err != nil {
log.Println(err)
} else {
m.allWorkersByPhase.With(
prometheus.Labels{"workerPhase": string(phase)},
).Set(float64(len(events.Items) + int(events.RemainingItemCount)))
return err
}
m.allWorkersByPhase.With(
prometheus.Labels{"workerPhase": string(phase)},
).Set(float64(len(events.Items) + int(events.RemainingItemCount)))
}
return nil
}

// brigade_pending_jobs_total
//
// There is no way to query the API directly for pending Jobs, but only
// running Workers should ever HAVE pending Jobs, so if we're currently
// counting running Workers, we can iterate over those to count pending
// jobs. Note, there's a cap on the max number of workers that can run
// concurrently, so we assume that as long as that cap isn't enormous (which
// would only occur on an enormous cluster), it's practical to iterate over
// all the running workers.
if phase == core.WorkerPhaseRunning {
var pendingJobs int
for {
for _, event := range events.Items {
for _, job := range event.Worker.Jobs {
if job.Status.Phase == core.JobPhasePending {
pendingJobs++
}
}
}
if events.Continue == "" {
break
}
if events, err = m.apiClient.Core().Events().List(
context.Background(),
&core.EventsSelector{
WorkerPhases: []core.WorkerPhase{phase},
},
&meta.ListOptions{Continue: events.Continue},
); err != nil {
log.Println(err)
break
func (m *metricsExporter) recordPendingJobsCount() error {
// brigade_pending_jobs_total
var pendingJobs int
var continueValue string
for {
events, err := m.coreClient.Events().List(
context.Background(),
&core.EventsSelector{
WorkerPhases: []core.WorkerPhase{core.WorkerPhaseRunning},
},
&meta.ListOptions{
Continue: continueValue,
},
)
if err != nil {
return err
}
for _, event := range events.Items {
for _, job := range event.Worker.Jobs {
if job.Status.Phase == core.JobPhasePending {
pendingJobs++
}
}
if err == nil {
m.totalPendingJobs.Set(float64(pendingJobs))
}
}
if events.Continue == "" {
break
}
continueValue = events.Continue
}

m.pendingJobsGauge.Set(float64(pendingJobs))
return nil
}
Loading

0 comments on commit b533fa8

Please sign in to comment.