Skip to content

Commit e608ac0

Browse files
committed
Removing LabelAdapter
Signed-off-by: alanprot <[email protected]>
1 parent 8d79acc commit e608ac0

File tree

160 files changed

+6707
-5117
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

160 files changed

+6707
-5117
lines changed

go.mod

+4-7
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ require (
7070
golang.org/x/net v0.32.0
7171
golang.org/x/sync v0.10.0
7272
golang.org/x/time v0.8.0
73-
google.golang.org/grpc v1.68.0
73+
google.golang.org/grpc v1.70.0-dev.0.20241210174008-e4d084a6ece3
7474
gopkg.in/yaml.v2 v2.4.0
7575
gopkg.in/yaml.v3 v3.0.1
7676
)
@@ -92,7 +92,7 @@ require (
9292
cloud.google.com/go v0.115.1 // indirect
9393
cloud.google.com/go/auth v0.9.3 // indirect
9494
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
95-
cloud.google.com/go/compute/metadata v0.5.0 // indirect
95+
cloud.google.com/go/compute/metadata v0.5.2 // indirect
9696
cloud.google.com/go/iam v1.1.13 // indirect
9797
cloud.google.com/go/storage v1.43.0 // indirect
9898
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
@@ -239,8 +239,8 @@ require (
239239
gonum.org/v1/gonum v0.15.0 // indirect
240240
google.golang.org/api v0.195.0 // indirect
241241
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect
242-
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
243-
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
242+
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect
243+
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
244244
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
245245
gopkg.in/telebot.v3 v3.2.1 // indirect
246246
k8s.io/apimachinery v0.31.1 // indirect
@@ -275,6 +275,3 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
275275
// Same replace used by thanos: (may be removed in the future)
276276
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
277277
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497
278-
279-
// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
280-
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0

go.sum

+68-1,148
Large diffs are not rendered by default.

integration/grpc_server_test.go

+228
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package integration
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"github.com/cortexproject/cortex/pkg/cortexpb"
8+
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
9+
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
10+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
"github.com/weaveworks/common/server"
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/metadata"
17+
"math/rand"
18+
"net"
19+
"strconv"
20+
"sync"
21+
"testing"
22+
"time"
23+
)
24+
25+
type mockGprcServer struct {
26+
ingester_client.IngesterServer
27+
}
28+
29+
func (m mockGprcServer) QueryStream(_ *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
30+
md, _ := metadata.FromIncomingContext(streamServer.Context())
31+
i, _ := strconv.Atoi(md["i"][0])
32+
return streamServer.Send(createStreamResponse(i))
33+
}
34+
35+
func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
36+
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
37+
md, _ := metadata.FromIncomingContext(ctx)
38+
i, _ := strconv.Atoi(md["i"][0])
39+
expected := createRequest(i)
40+
41+
if expected.String() != request.String() {
42+
return nil, fmt.Errorf("expected %v, got %v", expected, request)
43+
}
44+
return nil, nil
45+
}
46+
47+
func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
48+
savedRegistry := prometheus.DefaultRegisterer
49+
prometheus.DefaultRegisterer = prometheus.NewRegistry()
50+
defer func() {
51+
prometheus.DefaultRegisterer = savedRegistry
52+
}()
53+
54+
grpcPort, closeGrpcPort, err := getLocalHostPort()
55+
require.NoError(t, err)
56+
httpPort, closeHTTPPort, err := getLocalHostPort()
57+
require.NoError(t, err)
58+
59+
err = closeGrpcPort()
60+
require.NoError(t, err)
61+
err = closeHTTPPort()
62+
require.NoError(t, err)
63+
64+
cfg.HTTPListenPort = httpPort
65+
cfg.GRPCListenPort = grpcPort
66+
67+
serv, err := server.New(cfg)
68+
require.NoError(t, err)
69+
register(serv.GRPC)
70+
71+
go func() {
72+
err := serv.Run()
73+
require.NoError(t, err)
74+
}()
75+
76+
defer serv.Shutdown()
77+
78+
grpcHost := fmt.Sprintf("localhost:%d", grpcPort)
79+
80+
clientConfig := grpcclient.Config{}
81+
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
82+
83+
dialOptions, err := clientConfig.DialOption(nil, nil)
84+
assert.NoError(t, err)
85+
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)
86+
87+
conn, err := grpc.NewClient(grpcHost, dialOptions...)
88+
assert.NoError(t, err)
89+
validate(t, conn)
90+
}
91+
92+
func TestConcurrentGrpcCalls(t *testing.T) {
93+
cfg := server.Config{}
94+
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
95+
96+
tc := map[string]struct {
97+
cfg server.Config
98+
register func(s *grpc.Server)
99+
validate func(t *testing.T, con *grpc.ClientConn)
100+
}{
101+
"distributor": {
102+
cfg: cfg,
103+
register: func(s *grpc.Server) {
104+
d := &mockGprcServer{}
105+
distributorpb.RegisterDistributorServer(s, d)
106+
},
107+
validate: func(t *testing.T, conn *grpc.ClientConn) {
108+
client := distributorpb.NewDistributorClient(conn)
109+
wg := sync.WaitGroup{}
110+
n := 10000
111+
wg.Add(n)
112+
for i := 0; i < n; i++ {
113+
go func(i int) {
114+
defer wg.Done()
115+
ctx := context.Background()
116+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
117+
_, err := client.Push(ctx, createRequest(i))
118+
require.NoError(t, err)
119+
}(i)
120+
}
121+
122+
wg.Wait()
123+
},
124+
},
125+
"ingester": {
126+
cfg: cfg,
127+
register: func(s *grpc.Server) {
128+
d := &mockGprcServer{}
129+
ingester_client.RegisterIngesterServer(s, d)
130+
},
131+
validate: func(t *testing.T, conn *grpc.ClientConn) {
132+
client := ingester_client.NewIngesterClient(conn)
133+
wg := sync.WaitGroup{}
134+
n := 10000
135+
wg.Add(n)
136+
for i := 0; i < n; i++ {
137+
go func(i int) {
138+
defer wg.Done()
139+
ctx := context.Background()
140+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
141+
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
142+
require.NoError(t, err)
143+
resp, err := s.Recv()
144+
require.NoError(t, err)
145+
expected := createStreamResponse(i)
146+
require.Equal(t, expected.String(), resp.String())
147+
}(i)
148+
}
149+
150+
wg.Wait()
151+
},
152+
},
153+
}
154+
155+
for name, c := range tc {
156+
t.Run(name, func(t *testing.T) {
157+
run(t, c.cfg, c.register, c.validate)
158+
})
159+
}
160+
}
161+
162+
func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
163+
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
164+
{
165+
FromIngesterId: strconv.Itoa(i),
166+
Labels: createLabels(i),
167+
Chunks: []ingester_client.Chunk{
168+
{
169+
StartTimestampMs: int64(i),
170+
EndTimestampMs: int64(i),
171+
Encoding: int32(i),
172+
Data: []byte(strconv.Itoa(i)),
173+
},
174+
},
175+
},
176+
}}
177+
}
178+
179+
func createRequest(i int) *cortexpb.WriteRequest {
180+
labels := createLabels(i)
181+
return &cortexpb.WriteRequest{
182+
Timeseries: []cortexpb.PreallocTimeseries{
183+
{
184+
TimeSeries: &cortexpb.TimeSeries{
185+
Labels: labels,
186+
Samples: []cortexpb.Sample{
187+
{TimestampMs: int64(i), Value: float64(i)},
188+
},
189+
Exemplars: []cortexpb.Exemplar{
190+
{
191+
Labels: labels,
192+
Value: float64(i),
193+
TimestampMs: int64(i),
194+
},
195+
},
196+
},
197+
},
198+
},
199+
}
200+
}
201+
202+
func createLabels(i int) []cortexpb.LabelPair {
203+
labels := make([]cortexpb.LabelPair, 0, 100)
204+
for j := 0; j < 100; j++ {
205+
labels = append(labels, cortexpb.LabelPair{
206+
Name: fmt.Sprintf("test%d_%d", i, j),
207+
Value: fmt.Sprintf("test%d_%d", i, j),
208+
})
209+
}
210+
return labels
211+
}
212+
213+
func getLocalHostPort() (int, func() error, error) {
214+
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
215+
if err != nil {
216+
return 0, nil, err
217+
}
218+
219+
l, err := net.ListenTCP("tcp", addr)
220+
if err != nil {
221+
return 0, nil, err
222+
}
223+
224+
closePort := func() error {
225+
return l.Close()
226+
}
227+
return l.Addr().(*net.TCPAddr).Port, closePort, nil
228+
}

pkg/cortexpb/compat.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms [
5959
//
6060
// Note: while resulting labels.Labels is supposedly sorted, this function
6161
// doesn't enforce that. If input is not sorted, output will be wrong.
62-
func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels {
62+
func FromLabelAdaptersToLabels(ls []LabelPair) labels.Labels {
6363
return *(*labels.Labels)(unsafe.Pointer(&ls))
6464
}
6565

6666
// FromLabelAdaptersToLabelsWithCopy converts []LabelAdapter to labels.Labels.
6767
// Do NOT use unsafe to convert between data types because this function may
6868
// get in input labels whose data structure is reused.
69-
func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels {
69+
func FromLabelAdaptersToLabelsWithCopy(input []LabelPair) labels.Labels {
7070
return CopyLabels(FromLabelAdaptersToLabels(input))
7171
}
7272

@@ -107,29 +107,29 @@ func copyStringToBuffer(in string, buf []byte) (string, []byte) {
107107
// FromLabelsToLabelAdapters casts labels.Labels to []LabelAdapter.
108108
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
109109
// This allows us to use labels.Labels directly in protos.
110-
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelAdapter {
111-
return *(*[]LabelAdapter)(unsafe.Pointer(&ls))
110+
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelPair {
111+
return *(*[]LabelPair)(unsafe.Pointer(&ls))
112112
}
113113

114114
// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric.
115115
// Don't do this on any performance sensitive paths.
116-
func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric {
116+
func FromLabelAdaptersToMetric(ls []LabelPair) model.Metric {
117117
return util.LabelsToMetric(FromLabelAdaptersToLabels(ls))
118118
}
119119

120120
// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric with copy.
121121
// Don't do this on any performance sensitive paths.
122-
func FromLabelAdaptersToMetricWithCopy(ls []LabelAdapter) model.Metric {
122+
func FromLabelAdaptersToMetricWithCopy(ls []LabelPair) model.Metric {
123123
return util.LabelsToMetric(FromLabelAdaptersToLabelsWithCopy(ls))
124124
}
125125

126126
// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter.
127127
// Don't do this on any performance sensitive paths.
128128
// The result is sorted.
129-
func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
130-
result := make([]LabelAdapter, 0, len(metric))
129+
func FromMetricsToLabelAdapters(metric model.Metric) []LabelPair {
130+
result := make([]LabelPair, 0, len(metric))
131131
for k, v := range metric {
132-
result = append(result, LabelAdapter{
132+
result = append(result, LabelPair{
133133
Name: string(k),
134134
Value: string(v),
135135
})
@@ -162,7 +162,7 @@ func FromExemplarProtosToExemplars(es []Exemplar) []exemplar.Exemplar {
162162
return result
163163
}
164164

165-
type byLabel []LabelAdapter
165+
type byLabel []LabelPair
166166

167167
func (s byLabel) Len() int { return len(s) }
168168
func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 }

pkg/cortexpb/compat_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestMetricMetadataToMetricTypeToMetricType(t *testing.T) {
103103
}
104104

105105
func TestFromLabelAdaptersToLabels(t *testing.T) {
106-
input := []LabelAdapter{{Name: "hello", Value: "world"}}
106+
input := []LabelPair{{Name: "hello", Value: "world"}}
107107
expected := labels.Labels{labels.Label{Name: "hello", Value: "world"}}
108108
actual := FromLabelAdaptersToLabels(input)
109109

@@ -115,7 +115,7 @@ func TestFromLabelAdaptersToLabels(t *testing.T) {
115115
}
116116

117117
func TestFromLabelAdaptersToLabelsWithCopy(t *testing.T) {
118-
input := []LabelAdapter{{Name: "hello", Value: "world"}}
118+
input := []LabelPair{{Name: "hello", Value: "world"}}
119119
expected := labels.Labels{labels.Label{Name: "hello", Value: "world"}}
120120
actual := FromLabelAdaptersToLabelsWithCopy(input)
121121

@@ -127,7 +127,7 @@ func TestFromLabelAdaptersToLabelsWithCopy(t *testing.T) {
127127
}
128128

129129
func BenchmarkFromLabelAdaptersToLabelsWithCopy(b *testing.B) {
130-
input := []LabelAdapter{
130+
input := []LabelPair{
131131
{Name: "hello", Value: "world"},
132132
{Name: "some label", Value: "and its value"},
133133
{Name: "long long long long long label name", Value: "perhaps even longer label value, but who's counting anyway?"}}

0 commit comments

Comments
 (0)