Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream connection for remote write #6580

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2679,6 +2679,11 @@ ha_tracker:
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]

# EXPERIMENTAL: If enabled, distributor would use stream connection to send
# requests to ingesters.
# CLI flag: -distributor.use-stream-push
[use_stream_push: <boolean> | default = false]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
499 changes: 392 additions & 107 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ message WriteRequest {
bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
}

message StreamWriteRequest {
string TenantID = 1;
WriteRequest Request = 2;
}

message WriteResponse {}

message TimeSeries {
Expand Down
35 changes: 23 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"flag"
"fmt"
io "io"
"io"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -150,6 +150,7 @@ type Config struct {
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
UseStreamPush bool `yaml:"use_stream_push"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand Down Expand Up @@ -204,6 +205,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.BoolVar(&cfg.UseStreamPush, "distributor.use-stream-push", false, "EXPERIMENTAL: If enabled, distributor would use stream connection to send requests to ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names and values for now), only results from quorum number of zones will be included.")
Expand Down Expand Up @@ -242,7 +244,7 @@ const (
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
return ingester_client.MakeIngesterClient(addr, clientConfig)
return ingester_client.MakeIngesterClient(addr, clientConfig, cfg.UseStreamPush)
}
}

Expand Down Expand Up @@ -1134,20 +1136,29 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time

c := h.(ingester_client.HealthAndIngesterClient)

req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source

d.inflightClientRequests.Inc()
defer d.inflightClientRequests.Dec()

_, err = c.PushPreAlloc(ctx, req)
if d.cfg.UseStreamPush {
req := &cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}
_, err = c.PushStreamConnection(ctx, req)
} else {
req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source

// We should not reuse the req in case of errors:
// See: https://github.com/grpc/grpc-go/issues/6355
if err == nil {
cortexpb.ReuseWriteRequest(req)
_, err = c.PushPreAlloc(ctx, req)

// We should not reuse the req in case of errors:
// See: https://github.com/grpc/grpc-go/issues/6355
if err == nil {
cortexpb.ReuseWriteRequest(req)
}
}

if len(metadata) > 0 {
Expand Down
91 changes: 53 additions & 38 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,46 +351,50 @@ func TestDistributor_Push(t *testing.T) {
`,
},
} {
for _, shardByAllLabels := range []bool{true, false} {
tc := tc
name := name
shardByAllLabels := shardByAllLabels
t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v)", name, shardByAllLabels), func(t *testing.T) {
t.Parallel()
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRate = 20
limits.IngestionBurstSize = 20
for _, useStreamPush := range []bool{false, true} {
for _, shardByAllLabels := range []bool{true, false} {
tc := tc
name := name
shardByAllLabels := shardByAllLabels
useStreamPush := useStreamPush
t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v,useStreamPush=%v)", name, shardByAllLabels, useStreamPush), func(t *testing.T) {
t.Parallel()
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRate = 20
limits.IngestionBurstSize = 20

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: tc.numIngesters,
happyIngesters: tc.happyIngesters,
numDistributors: 1,
shardByAllLabels: shardByAllLabels,
limits: limits,
errFail: tc.ingesterError,
useStreamPush: useStreamPush,
})

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: tc.numIngesters,
happyIngesters: tc.happyIngesters,
numDistributors: 1,
shardByAllLabels: shardByAllLabels,
limits: limits,
errFail: tc.ingesterError,
var request *cortexpb.WriteRequest
if !tc.histogramSamples {
request = makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0)
} else {
request = makeWriteRequest(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num)
}
response, err := ds[0].Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, status.Code(tc.expectedError), status.Code(err))

// Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum
// is reached, when we reach this point the 3rd ingester may not have received series/metadata
// yet. To avoid flaky test we retry metrics assertion until we hit the desired state (no error)
// within a reasonable timeout.
if tc.expectedMetrics != "" {
test.Poll(t, time.Second, nil, func() interface{} {
return testutil.GatherAndCompare(regs[0], strings.NewReader(tc.expectedMetrics), tc.metricNames...)
})
}
})

var request *cortexpb.WriteRequest
if !tc.histogramSamples {
request = makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0)
} else {
request = makeWriteRequest(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num)
}
response, err := ds[0].Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, status.Code(tc.expectedError), status.Code(err))

// Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum
// is reached, when we reach this point the 3rd ingester may not have received series/metadata
// yet. To avoid flaky test we retry metrics assertion until we hit the desired state (no error)
// within a reasonable timeout.
if tc.expectedMetrics != "" {
test.Poll(t, time.Second, nil, func() interface{} {
return testutil.GatherAndCompare(regs[0], strings.NewReader(tc.expectedMetrics), tc.metricNames...)
})
}
})
}
}
}
}
Expand Down Expand Up @@ -2340,6 +2344,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
distributorCfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
return &noopIngester{}, nil
}
distributorCfg.UseStreamPush = false

overrides, err := validation.NewOverrides(limits, nil)
require.NoError(b, err)
Expand Down Expand Up @@ -2836,6 +2841,7 @@ type prepConfig struct {
enableTracker bool
errFail error
tokens [][]uint32
useStreamPush bool
}

type prepState struct {
Expand Down Expand Up @@ -2950,6 +2956,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
distributorCfg.InstanceLimits.MaxInflightPushRequests = cfg.maxInflightRequests
distributorCfg.InstanceLimits.MaxInflightClientRequests = cfg.maxInflightClientRequests
distributorCfg.InstanceLimits.MaxIngestionRate = cfg.maxIngestionRate
distributorCfg.UseStreamPush = cfg.useStreamPush

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
Expand Down Expand Up @@ -3307,6 +3314,10 @@ func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWr
return i.Push(ctx, &in.WriteRequest, opts...)
}

func (i *mockIngester) PushStreamConnection(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return i.Push(ctx, in, opts...)
}

func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
i.Lock()
defer i.Unlock()
Expand Down Expand Up @@ -3561,6 +3572,10 @@ func (i *noopIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt
return nil, nil
}

func (i *noopIngester) PushStreamConnection(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return nil, nil
}

type queryStream struct {
grpc.ClientStream
i int
Expand Down
Loading
Loading