Skip to content

Commit 80a8417

Browse files
committed
ruler: add ability to remote_write directly
Add ability to use remote_write directly. This makes the component more general and makes it possible to use it with Thanos. Signed-off-by: Giedrius Statkevičius <[email protected]>
1 parent 7fb98ab commit 80a8417

File tree

4 files changed

+215
-2
lines changed

4 files changed

+215
-2
lines changed

integration/ruler_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -1805,3 +1805,77 @@ func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
18051805
},
18061806
}
18071807
}
1808+
1809+
func TestRulerEvalWithQueryFrontendAndRemoteWrite(t *testing.T) {
1810+
s, err := e2e.NewScenario(networkName)
1811+
require.NoError(t, err)
1812+
defer s.Close()
1813+
1814+
// Start dependencies.
1815+
consul := e2edb.NewConsul()
1816+
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
1817+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1818+
1819+
// Configure the ruler.
1820+
flags := mergeFlags(
1821+
BlocksStorageFlags(),
1822+
RulerFlags(),
1823+
map[string]string{
1824+
// Evaluate rules often, so that we don't need to wait for metrics to show up.
1825+
"-ruler.evaluation-interval": "2s",
1826+
// We run single ingester only, no replication.
1827+
"-distributor.replication-factor": "1",
1828+
"-log.level": "debug",
1829+
},
1830+
)
1831+
1832+
const namespace = "test"
1833+
const user = "user"
1834+
1835+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1836+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1837+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1838+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1839+
require.NoError(t, s.Start(queryFrontend))
1840+
1841+
require.NoError(t, writeFileToSharedDir(s, "rulercfg.yml", []byte(`ruler:
1842+
remote_write:
1843+
headers:
1844+
X-Scope-OrgID: "test-org-id"`)))
1845+
1846+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1847+
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1848+
"-ruler.remote-write-url": fmt.Sprintf("http://%s/api/v1/push", distributor.NetworkEndpoint(80)),
1849+
"-config.file": filepath.Join(e2e.ContainerSharedDir, "rulercfg.yml"),
1850+
}), "")
1851+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1852+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1853+
}), "")
1854+
require.NoError(t, s.StartAndWaitReady(ruler, querier))
1855+
1856+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1857+
require.NoError(t, err)
1858+
1859+
expression := "metric"
1860+
groupName := "rule_group"
1861+
ruleName := "rule_name"
1862+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1863+
1864+
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1865+
// Wait until ruler has loaded the group.
1866+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1867+
// Wait until rule group has tried to evaluate the rule.
1868+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1869+
1870+
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1871+
// Check that cortex_ruler_query_frontend_clients went up
1872+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1873+
// Check that cortex_ruler_queries_total went up
1874+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1875+
// Check that cortex_ruler_queries_failed_total is zero
1876+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1877+
// Check that cortex_ruler_write_requests_total went up
1878+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1879+
// Check that cortex_ruler_write_requests_failed_total is zero
1880+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1881+
}

pkg/cortex/modules.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -613,11 +613,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
613613
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
614614
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
615615
} else {
616+
var pusher ruler.Pusher = t.Distributor
617+
if t.Cfg.Ruler.RemoteWriteConfig.URL != "" {
618+
pusher = ruler.NewRemoteWritePusher(t.Cfg.Ruler.RemoteWriteConfig.URL, t.Cfg.Ruler.RemoteWriteConfig.Headers)
619+
}
616620
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
617621
// TODO: Consider wrapping logger to differentiate from querier module logger
618622
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
619623

620-
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
624+
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
621625
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
622626
}
623627

@@ -821,7 +825,7 @@ func (t *Cortex) setupModuleManager() error {
821825
TenantFederation: {Queryable},
822826
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler, Compactor, AlertManager},
823827
}
824-
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
828+
if (t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil) || (t.Cfg.Ruler.FrontendAddress != "" && t.Cfg.Ruler.RemoteWriteConfig.URL != "") {
825829
deps[Ruler] = []string{Overrides, RulerStorage}
826830
}
827831
for mod, targets := range deps {

pkg/ruler/remote_write.go

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package ruler
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
10+
"github.com/cortexproject/cortex/pkg/cortexpb"
11+
"github.com/klauspost/compress/snappy"
12+
"github.com/prometheus/prometheus/prompb"
13+
)
14+
15+
type RemoteWritePusher struct {
16+
u string
17+
headers map[string]string
18+
}
19+
20+
func NewRemoteWritePusher(u string, headers map[string]string) *RemoteWritePusher {
21+
return &RemoteWritePusher{
22+
u: u,
23+
headers: headers,
24+
}
25+
}
26+
27+
var _ Pusher = &RemoteWritePusher{}
28+
29+
func (r *RemoteWritePusher) Push(ctx context.Context, wr *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
30+
promwr := &prompb.WriteRequest{
31+
Timeseries: make([]prompb.TimeSeries, 0, len(wr.Timeseries)),
32+
Metadata: make([]prompb.MetricMetadata, 0, len(wr.Metadata)),
33+
}
34+
35+
for _, ts := range wr.Timeseries {
36+
promwr.Timeseries = append(promwr.Timeseries, prompb.TimeSeries{
37+
Labels: makeLabels(ts.Labels),
38+
Samples: makeSamples(ts.Samples),
39+
Exemplars: makeExemplars(ts.Exemplars),
40+
//Histograms: makeHistograms(ts.Histograms),
41+
})
42+
}
43+
44+
for _, m := range wr.Metadata {
45+
promwr.Metadata = append(promwr.Metadata, prompb.MetricMetadata{
46+
Type: prompb.MetricMetadata_MetricType(m.Type),
47+
Unit: m.Unit,
48+
Help: m.Help,
49+
MetricFamilyName: m.MetricFamilyName,
50+
})
51+
}
52+
53+
m, err := promwr.Marshal()
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
encoded := snappy.Encode(nil, m)
59+
60+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.u, bytes.NewReader(encoded))
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
for k, v := range r.headers {
66+
req.Header.Set(k, v)
67+
}
68+
69+
resp, err := http.DefaultClient.Do(req)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
if resp.Body != nil {
75+
io.Copy(io.Discard, resp.Body)
76+
resp.Body.Close()
77+
}
78+
79+
if resp.StatusCode/100 != 2 {
80+
return nil, fmt.Errorf("got status code: %d", resp.StatusCode)
81+
}
82+
83+
return &cortexpb.WriteResponse{}, nil
84+
}
85+
86+
func makeLabels(in []cortexpb.LabelAdapter) []prompb.Label {
87+
out := make([]prompb.Label, 0, len(in))
88+
for _, l := range in {
89+
out = append(out, prompb.Label{Name: l.Name, Value: l.Value})
90+
}
91+
return out
92+
}
93+
94+
func makeSamples(in []cortexpb.Sample) []prompb.Sample {
95+
out := make([]prompb.Sample, 0, len(in))
96+
for _, s := range in {
97+
out = append(out, prompb.Sample{
98+
Value: s.Value,
99+
Timestamp: s.TimestampMs,
100+
})
101+
}
102+
return out
103+
}
104+
105+
func makeExemplars(in []cortexpb.Exemplar) []prompb.Exemplar {
106+
out := make([]prompb.Exemplar, 0, len(in))
107+
for _, e := range in {
108+
out = append(out, prompb.Exemplar{
109+
Labels: makeLabels(e.Labels),
110+
Value: e.Value,
111+
Timestamp: e.TimestampMs,
112+
})
113+
}
114+
return out
115+
}
116+
117+
/*
118+
func makeHistograms(in []cortexpb.Histogram) []prompb.Histogram {
119+
out := make([]prompb.Histogram, 0, len(in))
120+
for _, h := range in {
121+
out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h))
122+
}
123+
return out
124+
}
125+
*/

pkg/ruler/ruler.go

+10
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ func (e *DisabledRuleGroupErr) Error() string {
9292
return e.Message
9393
}
9494

95+
type RemoteWriteConfig struct {
96+
URL string `yaml:"url"`
97+
Headers map[string]string `yaml:"headers"`
98+
}
99+
95100
// Config is the configuration for the recording rules server.
96101
type Config struct {
97102
// This is used for query to query frontend to evaluate rules
@@ -113,6 +118,10 @@ type Config struct {
113118
// Path to store rule files for prom manager.
114119
RulePath string `yaml:"rule_path"`
115120

121+
// Configuration for remote_write. If this is configured then
122+
// Ruler only writes to this address.
123+
RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write"`
124+
116125
// URL of the Alertmanager to send notifications to.
117126
// If you are configuring the ruler to send to a Cortex Alertmanager,
118127
// ensure this includes any path set in the Alertmanager external URL.
@@ -217,6 +226,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
217226
f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing DNS resolutions of Alertmanager hosts.")
218227
f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.")
219228
f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.")
229+
f.StringVar(&cfg.RemoteWriteConfig.URL, "ruler.remote-write-url", "", "URL of the remote write endpoint to send samples to.")
220230

221231
f.DurationVar(&cfg.SearchPendingFor, "ruler.search-pending-for", 5*time.Minute, "Time to spend searching for a pending ruler when shutting down.")
222232
f.BoolVar(&cfg.EnableSharding, "ruler.enable-sharding", false, "Distribute rule evaluation using ring backend")

0 commit comments

Comments
 (0)