Skip to content

Commit 192e0b6

Browse files
authored
feat: add cronjob metrics (#215)
* wip: add cron metrics * refactor: change metrics * refactor: restore backward compatibility * refactor: renaming * doc: add comments * doc: add comments * fix: gorm tests * test: add ProvideCronjobMetrics * test: stub
1 parent 98c774a commit 192e0b6

20 files changed

+373
-224
lines changed

cronopts/example_metrics_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package cronopts_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/DoNewsCode/core"
7+
"github.com/DoNewsCode/core/cronopts"
8+
"github.com/DoNewsCode/core/observability"
9+
"github.com/robfig/cron/v3"
10+
"math/rand"
11+
"time"
12+
)
13+
14+
type CronModule struct {
15+
metrics *cronopts.CronJobMetrics
16+
}
17+
18+
func NewCronModule(metrics *cronopts.CronJobMetrics) CronModule {
19+
return CronModule{metrics: metrics.Module("test_module")}
20+
}
21+
22+
func (c CronModule) ProvideCron(crontab *cron.Cron) {
23+
// Create a new cron job, and measure its execution durations.
24+
crontab.AddJob("* * * * *", c.metrics.Job("test_job").Measure(cron.FuncJob(func() {
25+
fmt.Println("running")
26+
// For 50% chance, the job may fail. Report it to metrics collector.
27+
if rand.Float64() > 0.5 {
28+
c.metrics.Fail()
29+
}
30+
})))
31+
}
32+
33+
func Example_cronJobMetrics() {
34+
c := core.Default()
35+
c.Provide(observability.Providers())
36+
c.AddModuleFunc(NewCronModule)
37+
38+
ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
39+
defer cancel()
40+
41+
c.Serve(ctx)
42+
}

cronopts/metrics.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package cronopts
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-kit/kit/metrics"
7+
"github.com/robfig/cron/v3"
8+
)
9+
10+
// CronJobMetrics collects metrics for cron jobs.
11+
type CronJobMetrics struct {
12+
cronJobDurationSeconds metrics.Histogram
13+
cronJobFailCount metrics.Counter
14+
15+
// labels that has been set
16+
module string
17+
job string
18+
}
19+
20+
// NewCronJobMetrics constructs a new *CronJobMetrics, setting default labels to "unknown".
21+
func NewCronJobMetrics(histogram metrics.Histogram, counter metrics.Counter) *CronJobMetrics {
22+
return &CronJobMetrics{
23+
cronJobDurationSeconds: histogram,
24+
cronJobFailCount: counter,
25+
module: "unknown",
26+
job: "unknown",
27+
}
28+
}
29+
30+
// Module specifies the module label for CronJobMetrics.
31+
func (c *CronJobMetrics) Module(module string) *CronJobMetrics {
32+
return &CronJobMetrics{
33+
cronJobDurationSeconds: c.cronJobDurationSeconds,
34+
cronJobFailCount: c.cronJobFailCount,
35+
module: module,
36+
job: c.job,
37+
}
38+
}
39+
40+
// Job specifies the job label for CronJobMetrics.
41+
func (c *CronJobMetrics) Job(job string) *CronJobMetrics {
42+
return &CronJobMetrics{
43+
cronJobDurationSeconds: c.cronJobDurationSeconds,
44+
cronJobFailCount: c.cronJobFailCount,
45+
module: c.module,
46+
job: job,
47+
}
48+
}
49+
50+
// Fail marks the job as failed.
51+
func (c *CronJobMetrics) Fail() {
52+
c.cronJobFailCount.With("module", c.module, "job", c.job).Add(1)
53+
}
54+
55+
// Measure wraps the given job and records the duration and success.
56+
func (c *CronJobMetrics) Measure(job cron.Job) cron.Job {
57+
return cron.FuncJob(func() {
58+
start := time.Now()
59+
defer c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(time.Since(start).Seconds())
60+
job.Run()
61+
})
62+
}
63+
64+
// Measure returns a job wrapper that wraps the given job and records the duration and success.
65+
func Measure(c *CronJobMetrics) cron.JobWrapper {
66+
return c.Measure
67+
}

cronopts/metrics_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package cronopts
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/DoNewsCode/core/internal/stub"
8+
"github.com/robfig/cron/v3"
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestMeasure(t *testing.T) {
13+
histogram := &stub.Histogram{}
14+
counter := &stub.Counter{}
15+
metrics := NewCronJobMetrics(histogram, counter)
16+
metrics = metrics.Module("x").Job("y")
17+
Measure(metrics)(cron.FuncJob(func() {
18+
time.Sleep(time.Millisecond)
19+
})).Run()
20+
assert.ElementsMatch(t, histogram.LabelValues, []string{"module", "x", "job", "y"})
21+
assert.True(t, histogram.ObservedValue > 0)
22+
metrics.Fail()
23+
assert.ElementsMatch(t, counter.LabelValues, []string{"module", "x", "job", "y"})
24+
assert.True(t, counter.CounterValue == 1)
25+
}

internal/stub/metrics.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package stub
2+
3+
import "github.com/go-kit/kit/metrics"
4+
5+
// Histogram is a stub implementation of the go-kit metrics.Histogram interface.
6+
type Histogram struct {
7+
LabelValues []string
8+
ObservedValue float64
9+
}
10+
11+
// With returns a new Histogram with the given label values.
12+
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
13+
h.LabelValues = labelValues
14+
return h
15+
}
16+
17+
// Observe records the given value.
18+
func (h *Histogram) Observe(value float64) {
19+
h.ObservedValue = value
20+
}
21+
22+
// Gauge is a stub implementation of the go-kit metrics.Gauge interface.
23+
type Gauge struct {
24+
LabelValues []string
25+
GaugeValue float64
26+
}
27+
28+
// With returns a new Gauge with the given label values.
29+
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
30+
g.LabelValues = labelValues
31+
return g
32+
}
33+
34+
// Set sets the gauge value.
35+
func (g *Gauge) Set(value float64) {
36+
g.GaugeValue = value
37+
}
38+
39+
// Add adds the given value to the gauge.
40+
func (g *Gauge) Add(delta float64) {
41+
g.GaugeValue = g.GaugeValue + delta
42+
}
43+
44+
// Counter is a stub implementation of the go-kit metrics.Counter interface.
45+
type Counter struct {
46+
LabelValues []string
47+
CounterValue float64
48+
}
49+
50+
// With returns a new Counter with the given label values.
51+
func (c *Counter) With(labelValues ...string) metrics.Counter {
52+
c.LabelValues = labelValues
53+
return c
54+
}
55+
56+
// Add adds the given value to the counter.
57+
func (c *Counter) Add(delta float64) {
58+
c.CounterValue = c.CounterValue + delta
59+
}

internal/stub/metrics_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package stub
2+
3+
import (
4+
"github.com/stretchr/testify/assert"
5+
"testing"
6+
)
7+
8+
func TestMetrics(t *testing.T) {
9+
h := &Histogram{}
10+
g := &Gauge{}
11+
c := &Counter{}
12+
13+
h.With("foo", "bar").Observe(1.0)
14+
assert.ElementsMatch(t, h.LabelValues, []string{"foo", "bar"})
15+
assert.Equal(t, h.ObservedValue, 1.0)
16+
17+
g.With("foo", "bar").Set(1.0)
18+
g.Add(1)
19+
assert.Equal(t, g.GaugeValue, 2.0)
20+
assert.ElementsMatch(t, g.LabelValues, []string{"foo", "bar"})
21+
22+
c.With("foo", "bar").Add(1)
23+
c.Add(1)
24+
assert.ElementsMatch(t, c.LabelValues, []string{"foo", "bar"})
25+
assert.Equal(t, c.CounterValue, 2.0)
26+
}

observability/metrics.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package observability
22

33
import (
4+
"github.com/DoNewsCode/core/cronopts"
45
"github.com/DoNewsCode/core/di"
56
"github.com/DoNewsCode/core/otgorm"
67
"github.com/DoNewsCode/core/otkafka"
@@ -53,6 +54,34 @@ func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSec
5354
return srvgrpc.NewRequestDurationSeconds(prometheus.NewHistogram(grpc))
5455
}
5556

57+
// ProvideCronJobMetrics returns a *cronopts.CronJobMetrics that is designed to
58+
// measure cron job metrics. The returned metrics can be used like this:
59+
// metrics := cronopts.NewCronJobMetrics(...)
60+
// job := cron.NewChain(
61+
// cron.Recover(logger),
62+
// cronopts.Measure(metrics),
63+
// ).Then(job)
64+
func ProvideCronJobMetrics(in MetricsIn) *cronopts.CronJobMetrics {
65+
histogram := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{
66+
Name: "cronjob_duration_seconds",
67+
Help: "Total time spent running cron jobs.",
68+
}, []string{"module", "job"})
69+
70+
counter := stdprometheus.NewCounterVec(stdprometheus.CounterOpts{
71+
Name: "cronjob_failures_total",
72+
Help: "Total number of cron jobs that failed.",
73+
}, []string{"module", "job"})
74+
75+
if in.Registerer == nil {
76+
in.Registerer = stdprometheus.DefaultRegisterer
77+
}
78+
79+
in.Registerer.MustRegister(histogram)
80+
in.Registerer.MustRegister(counter)
81+
82+
return cronopts.NewCronJobMetrics(prometheus.NewHistogram(histogram), prometheus.NewCounter(counter))
83+
}
84+
5685
// ProvideGORMMetrics returns a *otgorm.Gauges that measures the connection info
5786
// in databases. It is meant to be consumed by the otgorm.Providers.
5887
func ProvideGORMMetrics(in MetricsIn) *otgorm.Gauges {

observability/observability.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func Providers() di.Deps {
2828
ProvideRedisMetrics,
2929
ProvideKafkaReaderMetrics,
3030
ProvideKafkaWriterMetrics,
31+
ProvideCronJobMetrics,
3132
provideConfig,
3233
}
3334
}

observability/observability_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package observability
22

33
import (
4+
"github.com/DoNewsCode/core/cronopts"
45
"os"
56
"strings"
67
"testing"
@@ -78,14 +79,16 @@ func TestProvideRedisMetrics(t *testing.T) {
7879
c.Provide(otredis.Providers())
7980
c.Invoke(func(cli redis.UniversalClient, g *otredis.Gauges) {
8081
stats := cli.PoolStats()
81-
withValues := []string{"dbname", "default"}
82-
83-
g.Hits.With(withValues...).Set(float64(stats.Hits))
84-
g.Misses.With(withValues...).Set(float64(stats.Misses))
85-
g.Timeouts.With(withValues...).Set(float64(stats.Timeouts))
86-
g.TotalConns.With(withValues...).Set(float64(stats.TotalConns))
87-
g.IdleConns.With(withValues...).Set(float64(stats.IdleConns))
88-
g.StaleConns.With(withValues...).Set(float64(stats.StaleConns))
82+
g.Observe(stats)
83+
})
84+
}
85+
86+
func TestProvideCronjobMetrics(t *testing.T) {
87+
c := core.New()
88+
c.ProvideEssentials()
89+
c.Provide(Providers())
90+
c.Invoke(func(metrics *cronopts.CronJobMetrics) {
91+
metrics.Fail()
8992
})
9093
}
9194

otgorm/gorm_metrics.go

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,58 +20,48 @@ type Gauges struct {
2020
inUse metrics.Gauge
2121
open metrics.Gauge
2222

23-
dbName bool
24-
driver bool
23+
dbName string
24+
driver string
2525
}
2626

2727
// NewGauges returns a new Gauges.
2828
func NewGauges(idle, inUse, open metrics.Gauge) *Gauges {
2929
return &Gauges{
30-
idle: idle,
31-
inUse: inUse,
32-
open: open,
30+
idle: idle,
31+
inUse: inUse,
32+
open: open,
33+
dbName: "unknown",
34+
driver: "default",
3335
}
3436
}
3537

3638
// DBName sets the dbname label of metrics.
3739
func (g *Gauges) DBName(dbName string) *Gauges {
38-
withValues := []string{"dbname", dbName}
3940
return &Gauges{
40-
idle: g.idle.With(withValues...),
41-
inUse: g.inUse.With(withValues...),
42-
open: g.open.With(withValues...),
43-
dbName: true,
41+
idle: g.idle,
42+
inUse: g.inUse,
43+
open: g.open,
44+
dbName: dbName,
4445
driver: g.driver,
4546
}
4647
}
4748

4849
// Driver sets the driver label of metrics.
4950
func (g *Gauges) Driver(driver string) *Gauges {
50-
withValues := []string{"driver", driver}
5151
return &Gauges{
52-
idle: g.idle.With(withValues...),
53-
inUse: g.inUse.With(withValues...),
54-
open: g.open.With(withValues...),
52+
idle: g.idle,
53+
inUse: g.inUse,
54+
open: g.open,
5555
dbName: g.dbName,
56-
driver: true,
56+
driver: driver,
5757
}
5858
}
5959

6060
// Observe records the DBStats collected. It should be called periodically.
6161
func (g *Gauges) Observe(stats sql.DBStats) {
62-
if !g.dbName {
63-
g.idle = g.idle.With("dbname", "")
64-
g.inUse = g.inUse.With("dbname", "")
65-
g.open = g.open.With("dbname", "")
66-
}
67-
if !g.driver {
68-
g.idle = g.idle.With("driver", "")
69-
g.inUse = g.inUse.With("driver", "")
70-
g.open = g.open.With("driver", "")
71-
}
72-
g.idle.Set(float64(stats.Idle))
73-
g.inUse.Set(float64(stats.InUse))
74-
g.open.Set(float64(stats.OpenConnections))
62+
g.idle.With("dbname", g.dbName, "driver", g.driver).Set(float64(stats.Idle))
63+
g.inUse.With("dbname", g.dbName, "driver", g.driver).Set(float64(stats.InUse))
64+
g.open.With("dbname", g.dbName, "driver", g.driver).Set(float64(stats.OpenConnections))
7565
}
7666

7767
// newCollector creates a new database wrapper containing the name of the database,

0 commit comments

Comments
 (0)