Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Implement Prometheus statser #355

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/loader/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type commandOptions struct {

func parseArgs(args []string) commandOptions {
var opts commandOptions
parser := flags.NewParser(&opts, flags.HelpFlag | flags.PassDoubleDash)
parser := flags.NewParser(&opts, flags.HelpFlag|flags.PassDoubleDash)
parser.LongDescription = "" + // because gofmt
"When specifying cardinality, the tag cardinality can be specified multiple times,\n" +
"and each tag will be named tagN:M. The maximum total cardinality will be:\n\n" +
Expand Down
2 changes: 1 addition & 1 deletion cmd/loader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func sendMetricsWorker(
_, err := s.Write(b.Bytes())
if err != nil {
fmt.Printf("Pausing for 1 second, error sending packet: %v\n", err)
time.Sleep(1*time.Second)
time.Sleep(1 * time.Second)
}
b.Reset()
next = next.Add(interval)
Expand Down
2 changes: 2 additions & 0 deletions defaults_and_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
StatserNull = "null"
// StatserTagged is the name used to indicate the use of the tagged statser.
StatserTagged = "tagged"
// StatserPrometheus is the name used to indicate the use of the prometheus statser
StatserPrometheus = "prometheus"
)

const (
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/jstemmer/go-junit-report v0.9.1
github.com/libp2p/go-reuseport v0.0.1
github.com/magiconair/properties v1.8.1
github.com/prometheus/client_golang v0.9.3
github.com/sirupsen/logrus v1.4.2
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.6.2
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ github.com/ash2k/stager v0.0.0-20170622123058-6e9c7b0eacd4/go.mod h1:20N8GhJtHSL
github.com/aws/aws-sdk-go v1.28.13 h1:JyCQQ86yil3hg7MtWdNH8Pbcgx92qlUV2v22Km63Mf4=
github.com/aws/aws-sdk-go v1.28.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bombsimon/wsl/v2 v2.0.0 h1:+Vjcn+/T5lSrO8Bjzhk4v14Un/2UyCA1E3V5j9nwTkQ=
github.com/bombsimon/wsl/v2 v2.0.0/go.mod h1:mf25kr/SqFEPhhcxW1+7pxzGlW+hIl/hYTKY95VwV8U=
Expand Down Expand Up @@ -252,6 +253,7 @@ github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
Expand Down Expand Up @@ -295,12 +297,17 @@ github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.8.0 h1:zvJNkoCFAnYFNC24FV8nW4JdRJ3GIFcLbg65lL/JDcw=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI=
Expand Down
78 changes: 78 additions & 0 deletions pkg/stats/statser_prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package stats

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/atlassian/gostatsd"
)

// NOTE: is using a collector/vec of gauges/counters reasonable? Or does
// using only a single gauge/counter suffice?

// this depends on the metrics being collected, if they are the same thing
// but can be partitioned into different types/groups, e.g. http requests partitioned
// by user age and demographics, then using a collector makes sense. (is this correct?)

// PrometheusStatser is a Statser that monitors gostasd's internal metrics from
// Prometheus, it is useful when there is a large number of ephemeral hosts.
type PrometheusStatser struct {
flushNotifier

// collector of gauges that stores the internal gauge metrics of gostatsd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each GaugeVec and CounterVec is a single metric name, they need to be lazily created.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use the Gauges and Counter structs gostatsd already provides or would using the structs provided from the prometheus package be better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd need to use the ones from the prometheus packages, because they're ultimately added to the prometheus registry.

gaugeVec prometheus.GaugeVec
// collector of counters that stores the internal count metrics of gostatsd
counterVec prometheus.CounterVec
}

// NewPrometheusStatser creates a new Statser which
// sends internal metrics to prometheus
func NewPrometheusStatser(gaugeVec prometheus.GaugeVec, counterVec prometheus.CounterVec) *PrometheusStatser {
return &PrometheusStatser{
gaugeVec: gaugeVec,
counterVec: counterVec,
}
}

func (ps *PrometheusStatser) NotifyFlush(ctx context.Context, d time.Duration) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to implement NotifyFlush and RegisterFlush, it's picked up implicitly from the embedded flushNotifier

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come the TaggedStatser implements those two functions or why does TaggedStatser not use a flushNotifier as one of its attributes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, that comments from my personal account.

The TaggedStatser is a wrapper that explicitly calls to the wrapped Statser.

It shouldn't use a flushNotifier directly, because then the TaggedStatser and the underlying Statser would have different notification targets (and ultimately MetricFlusher.Run would only call NotifyFlush on the underlying Statser)

That being said, it could and probably should use an embedded Statser and remove the explicit implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see that now, what was the design process behind, is it because that they have different notification targets, so one such as TaggedStatser is a wrapper of Statser but InternalStatser is not a wrapper because they have the same notification targets?

I am using the GoLand IDE, and I am trying to find the usage of NotifyFlush, for some reason I cannot see any call to this method outside of the statsers and the flush_notifer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no special design considerations behind TaggedStatser not embedding the Statser, it's possible at the time that I thought you could only embed a struct in a struct, and an interface in an interface, but not an interface in a struct, I don't recall for sure though.

InternalStatser does explicitly implement NotifyFlush however, as it needs to take action on the flush before returning. Flush notification is an asynchronous, best effort action, and any metrics triggered by it are processed in the next flush cycle.

The InternalStatser however explicitly needs to process the metrics in the current flush cycle, so it takes action (dispatching internal metrics) before returning.

I'm not sure why GoLand doesn't detect usage - it does in my setup. You could try rebuilding the index through File -> Invalidatae Caches / Restart, but I don't know if it will help.

The only place it's called is at https://github.com/atlassian/gostatsd/blob/master/pkg/statsd/flusher.go#L69

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My problem was I was not aware that the Statser interface also has a NotifyFlush method, I was only trying to find the usage of NotifyFlush from the flsuhNotifier struct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, that would do it :)

ps.flushNotifier.NotifyFlush(ctx, d)
}

func (ps *PrometheusStatser) RegisterFlush() (<-chan time.Duration, func()) {
return ps.flushNotifier.RegisterFlush()
}

// TODO: how do I use tags here, same for the Count and the TimingMS methods
func (ps *PrometheusStatser) Gauge(name string, value float64, tags gostatsd.Tags) {
ps.gaugeVec.WithLabelValues(name).Add(value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tags map to labels

}

func (ps *PrometheusStatser) Count(name string, amount float64, tags gostatsd.Tags) {
ps.counterVec.WithLabelValues(name).Add(amount)
}

func (ps *PrometheusStatser) Increment(name string, tags gostatsd.Tags) {
ps.Count(name, 1, tags)
}

// TimingMS sends a timing metric from a millisecond value
func (ps *PrometheusStatser) TimingMS(name string, ms float64, tags gostatsd.Tags) {
ps.counterVec.WithLabelValues(name).Add(ms)
}

// TimingDuration sends a timing metric from a time.Duration
func (ps *PrometheusStatser) TimingDuration(name string, d time.Duration, tags gostatsd.Tags) {
ps.TimingMS(name, float64(d)/float64(time.Millisecond), tags)
}

// NewTimer returns a new timer with time set to now
func (ps *PrometheusStatser) NewTimer(name string, tags gostatsd.Tags) *Timer {
return newTimer(ps, name, tags)
}

// WithTags creates a new Statser with additional tags
func (ps *PrometheusStatser) WithTags(tags gostatsd.Tags) Statser {
return NewTaggedStatser(ps, tags)
}
17 changes: 16 additions & 1 deletion pkg/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ash2k/stager"
"github.com/libp2p/go-reuseport"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -186,7 +188,7 @@ func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) erro
runnables = gostatsd.MaybeAppendRunnable(runnables, statser)

// Create any http servers
httpServers, err := web.NewHttpServersFromViper(s.Viper, logger, handler)
httpServers, err := web.NewHttpServersFromViper(s.Viper, logger, handler, statser)
if err != nil {
return err
}
Expand Down Expand Up @@ -218,6 +220,19 @@ func (s *Server) createStatser(hostname gostatsd.Source, handler gostatsd.Pipeli
return stats.NewNullStatser()
case gostatsd.StatserLogging:
return stats.NewLoggingStatser(s.InternalTags, logger)
case gostatsd.StatserPrometheus:
// TODO: move the initializations to the top level
promStatserCounterVec := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prom_statser_counter_vec",
Help: "Help text coming soon...",
}, []string{"counters"})

promStatserGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "prom_statser_gauge_vec",
Help: "Help text coming soon...",
}, []string{"gauges"})

return stats.NewPrometheusStatser(*promStatserGaugeVec, *promStatserCounterVec)
default:
namespace := s.Namespace
if s.InternalNamespace != "" {
Expand Down
3 changes: 3 additions & 0 deletions pkg/web/http_receiver_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/atlassian/gostatsd/pkg/stats"

"github.com/ash2k/stager/wait"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand All @@ -31,6 +33,7 @@ func TestForwardingEndToEndV2(t *testing.T) {
hs, err := web.NewHttpServer(
logrus.StandardLogger(),
ch,
stats.NewNullStatser(),
"TestForwardingEndToEndV2",
"",
false,
Expand Down
13 changes: 11 additions & 2 deletions pkg/web/httpservers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/atlassian/gostatsd/pkg/stats"

"github.com/ash2k/stager/wait"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
Expand All @@ -34,11 +36,11 @@ type route struct {

var done = struct{}{}

func NewHttpServersFromViper(v *viper.Viper, logger logrus.FieldLogger, handler gostatsd.PipelineHandler) ([]*httpServer, error) {
func NewHttpServersFromViper(v *viper.Viper, logger logrus.FieldLogger, handler gostatsd.PipelineHandler, statser stats.Statser) ([]*httpServer, error) {
httpServerNames := v.GetStringSlice("http-servers")
servers := make([]*httpServer, 0, len(httpServerNames))
for _, httpServerName := range httpServerNames {
server, err := newHttpServerFromViper(logger, v, httpServerName, handler)
server, err := newHttpServerFromViper(logger, v, httpServerName, handler, statser)
if err != nil {
return nil, fmt.Errorf("failed to make http-server %s: %v", httpServerName, err)
}
Expand All @@ -52,6 +54,7 @@ func newHttpServerFromViper(
vMain *viper.Viper,
serverName string,
handler gostatsd.PipelineHandler,
statser stats.Statser,
) (*httpServer, error) {
vSub := util.GetSubViper(vMain, "http."+serverName)
vSub.SetDefault("address", "127.0.0.1:8080")
Expand All @@ -63,6 +66,7 @@ func newHttpServerFromViper(
return NewHttpServer(
logger.WithField("http-server", serverName),
handler,
statser,
serverName,
vSub.GetString("address"),
vSub.GetBool("enable-prof"),
Expand All @@ -75,6 +79,7 @@ func newHttpServerFromViper(
func NewHttpServer(
logger logrus.FieldLogger,
handler gostatsd.PipelineHandler,
statser stats.Statser,
serverName, address string,
enableProf,
enableExpVar,
Expand All @@ -88,6 +93,10 @@ func NewHttpServer(
address: address,
}

if _, enablePromStatser := statser.(*stats.PrometheusStatser); enablePromStatser {
// TODO: append the route and the corresponding handler(s)
}

if enableProf {
routes = append(routes,
route{path: "/debug/pprof/cmdline", handler: pprof.Cmdline, methods: []string{"GET"}, name: "pprof_cmdline"},
Expand Down
1 change: 1 addition & 0 deletions pkg/web/web_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestHttpServerShutsdown(t *testing.T) {
hs, err := web.NewHttpServer(
logrus.StandardLogger(),
nil,
nil,
"TestHttpServerShutsdown",
"127.0.0.1:0", // should pick a random port to bind to
false,
Expand Down