Skip to content

Commit b80dfd5

Browse files
authored
Merge pull request #440 from zerosoul13/msgpackSupport
Add Kafka msgpack support
2 parents e0a132a + 619a4b4 commit b80dfd5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+5124
-2
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
github.com/sevlyar/go-daemon v0.1.6-0.20210508104436-15bb82c8ea3c
4242
github.com/stretchr/testify v1.6.1
4343
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d
44+
github.com/vmihailenco/msgpack/v5 v5.3.5
4445
go.opentelemetry.io/otel v0.9.0
4546
go.opentelemetry.io/otel/exporters/trace/jaeger v0.9.0
4647
go.uber.org/atomic v1.3.2 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
201201
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
202202
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d h1:4J9HCZVpvDmj2tiKGSTUnb3Ok/9CEQb9oqu9LHKQQpc=
203203
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
204+
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
205+
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
206+
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
207+
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
204208
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
205209
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
206210
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=

receiver/kafka/kafka.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (o *Offset) UnmarshalText(text []byte) error {
6666
return nil
6767
}
6868

69-
var supportedProtocols = []string{"plain", "protobuf", "pickle"}
69+
var supportedProtocols = []string{"plain", "protobuf", "pickle", "msgpack"}
7070

7171
// Protocol is a special type to allow user to define wire protocol in Config file as a simple text.
7272
type Protocol int
@@ -80,6 +80,8 @@ func (p *Protocol) MarshalText() ([]byte, error) {
8080
return []byte("protobuf"), nil
8181
case ProtocolPickle:
8282
return []byte("pickle"), nil
83+
case ProtocolMsgPack:
84+
return []byte("msgpack"), nil
8385
}
8486
return nil, fmt.Errorf("Unsupported offset type %v, supported offsets: %v", p, supportedProtocols)
8587
}
@@ -95,6 +97,8 @@ func (p *Protocol) UnmarshalText(text []byte) error {
9597
*p = ProtocolProtobuf
9698
case "pickle":
9799
*p = ProtocolPickle
100+
case "msgpack":
101+
*p = ProtocolMsgPack
98102
default:
99103
return fmt.Errorf("Unsupported protocol type %v, supported: %v", protocolName, supportedProtocols)
100104
}
@@ -110,6 +114,8 @@ func (p *Protocol) ToString() string {
110114
return "protobuf"
111115
case ProtocolPickle:
112116
return "pickle"
117+
case ProtocolMsgPack:
118+
return "msgpack"
113119
}
114120
return "unsupported"
115121
}
@@ -126,6 +132,8 @@ const (
126132
ProtocolProtobuf = 1
127133
// ProtocolPickle represents pickled messages
128134
ProtocolPickle = 2
135+
// ProtocolMsgPack represents msgpack messages
136+
ProtocolMsgPack = 3
129137
)
130138

131139
// Duration wrapper time.Duration for TOML
@@ -477,6 +485,8 @@ func (rcv *Kafka) worker() {
477485
protocolParser = parse.Plain
478486
case ProtocolPickle:
479487
protocolParser = parse.Pickle
488+
case ProtocolMsgPack:
489+
protocolParser = parse.Msgpack
480490
}
481491
for {
482492
select {
@@ -537,5 +547,5 @@ func (rcv *Kafka) worker() {
537547
}
538548

539549
// InitPrometheus is a stub for the receiver prom metrics. Required to satisfy Receiver interface.
540-
func (rcv *Kafka) InitPrometheus(reg prometheus.Registerer) {
550+
func (*Kafka) InitPrometheus(_ prometheus.Registerer) {
541551
}

receiver/parse/msgpack.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package parse
2+
3+
import (
4+
"errors"
5+
6+
"github.com/go-graphite/go-carbon/points"
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
// Datapoint loads a msgpack data
11+
type Datapoint struct {
12+
Name string `json:"Name"`
13+
Value float64 `json:"Value"`
14+
Time int64 `json:"Time"`
15+
}
16+
17+
// Msgpack is used to unpack metrics produced by
18+
// carbon-relay-ng
19+
func Msgpack(body []byte) ([]*points.Points, error) {
20+
result := make([]*points.Points, 0)
21+
22+
var d Datapoint
23+
err := msgpack.Unmarshal(body, &d)
24+
if err != nil {
25+
return result, err
26+
}
27+
28+
if d.Name == "" {
29+
err = errors.New("Empty metric name")
30+
return result, err
31+
}
32+
33+
result = append(result, points.OnePoint(d.Name, d.Value, d.Time))
34+
return result, err
35+
}

receiver/parse/msgpack_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package parse
2+
3+
import (
4+
"testing"
5+
6+
"github.com/go-graphite/go-carbon/points"
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
func TestMsgpack(t *testing.T) {
11+
var testCaseOK = Datapoint{
12+
Name: "test.case.number.1",
13+
Value: 60.2,
14+
Time: 1423931224,
15+
}
16+
17+
var testCaseEmptyName = Datapoint{
18+
Name: "",
19+
Value: 60.2,
20+
Time: 1423931224,
21+
}
22+
23+
msgOk, _ := msgpack.Marshal(testCaseOK)
24+
msgFail, _ := msgpack.Marshal(testCaseEmptyName)
25+
26+
msgpacks := []testcase{
27+
{"One metric with one datapoint",
28+
msgOk,
29+
[]*points.Points{points.OnePoint("test.case.number.1", 60.2, 1423931224)},
30+
false,
31+
},
32+
{"Empty metric name with one datapoint",
33+
msgFail,
34+
[]*points.Points{points.OnePoint("", 60.2, 1423931224)},
35+
true,
36+
},
37+
}
38+
run(t, msgpacks, Msgpack)
39+
}

vendor/github.com/vmihailenco/msgpack/v5/.prettierrc

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/vmihailenco/msgpack/v5/.travis.yml

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/vmihailenco/msgpack/v5/CHANGELOG.md

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/vmihailenco/msgpack/v5/LICENSE

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/vmihailenco/msgpack/v5/Makefile

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/vmihailenco/msgpack/v5/README.md

Lines changed: 86 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/vmihailenco/msgpack/v5/commitlint.config.js

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)