Skip to content

Add Kafka msgpack support #440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/sevlyar/go-daemon v0.1.6-0.20210508104436-15bb82c8ea3c
github.com/stretchr/testify v1.6.1
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d
github.com/vmihailenco/msgpack/v5 v5.3.5
go.opentelemetry.io/otel v0.9.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.9.0
go.uber.org/atomic v1.3.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d h1:4J9HCZVpvDmj2tiKGSTUnb3Ok/9CEQb9oqu9LHKQQpc=
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
Expand Down
14 changes: 12 additions & 2 deletions receiver/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (o *Offset) UnmarshalText(text []byte) error {
return nil
}

var supportedProtocols = []string{"plain", "protobuf", "pickle"}
var supportedProtocols = []string{"plain", "protobuf", "pickle", "msgpack"}

// Protocol is a special type to allow user to define wire protocol in Config file as a simple text.
type Protocol int
Expand All @@ -80,6 +80,8 @@ func (p *Protocol) MarshalText() ([]byte, error) {
return []byte("protobuf"), nil
case ProtocolPickle:
return []byte("pickle"), nil
case ProtocolMsgPack:
return []byte("msgpack"), nil
}
return nil, fmt.Errorf("Unsupported offset type %v, supported offsets: %v", p, supportedProtocols)
}
Expand All @@ -95,6 +97,8 @@ func (p *Protocol) UnmarshalText(text []byte) error {
*p = ProtocolProtobuf
case "pickle":
*p = ProtocolPickle
case "msgpack":
*p = ProtocolMsgPack
default:
return fmt.Errorf("Unsupported protocol type %v, supported: %v", protocolName, supportedProtocols)
}
Expand All @@ -110,6 +114,8 @@ func (p *Protocol) ToString() string {
return "protobuf"
case ProtocolPickle:
return "pickle"
case ProtocolMsgPack:
return "msgpack"
}
return "unsupported"
}
Expand All @@ -126,6 +132,8 @@ const (
ProtocolProtobuf = 1
// ProtocolPickle represents pickled messages
ProtocolPickle = 2
// ProtocolMsgPack represents msgpack messages
ProtocolMsgPack = 3
)

// Duration wrapper time.Duration for TOML
Expand Down Expand Up @@ -477,6 +485,8 @@ func (rcv *Kafka) worker() {
protocolParser = parse.Plain
case ProtocolPickle:
protocolParser = parse.Pickle
case ProtocolMsgPack:
protocolParser = parse.Msgpack
}
for {
select {
Expand Down Expand Up @@ -537,5 +547,5 @@ func (rcv *Kafka) worker() {
}

// InitPrometheus is a stub for the receiver prom metrics. Required to satisfy Receiver interface.
func (rcv *Kafka) InitPrometheus(reg prometheus.Registerer) {
func (*Kafka) InitPrometheus(_ prometheus.Registerer) {
}
35 changes: 35 additions & 0 deletions receiver/parse/msgpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package parse

import (
"errors"

"github.com/go-graphite/go-carbon/points"
"github.com/vmihailenco/msgpack/v5"
)

// Datapoint loads a msgpack data
type Datapoint struct {
Name string `json:"Name"`
Value float64 `json:"Value"`
Time int64 `json:"Time"`
}

// Msgpack is used to unpack metrics produced by
// carbon-relay-ng
func Msgpack(body []byte) ([]*points.Points, error) {
result := make([]*points.Points, 0)

var d Datapoint
err := msgpack.Unmarshal(body, &d)
if err != nil {
return result, err
}

if d.Name == "" {
err = errors.New("Empty metric name")
return result, err
}

result = append(result, points.OnePoint(d.Name, d.Value, d.Time))
return result, err
}
39 changes: 39 additions & 0 deletions receiver/parse/msgpack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package parse

import (
"testing"

"github.com/go-graphite/go-carbon/points"
"github.com/vmihailenco/msgpack/v5"
)

func TestMsgpack(t *testing.T) {
var testCaseOK = Datapoint{
Name: "test.case.number.1",
Value: 60.2,
Time: 1423931224,
}

var testCaseEmptyName = Datapoint{
Name: "",
Value: 60.2,
Time: 1423931224,
}

msgOk, _ := msgpack.Marshal(testCaseOK)
msgFail, _ := msgpack.Marshal(testCaseEmptyName)

msgpacks := []testcase{
{"One metric with one datapoint",
msgOk,
[]*points.Points{points.OnePoint("test.case.number.1", 60.2, 1423931224)},
false,
},
{"Empty metric name with one datapoint",
msgFail,
[]*points.Points{points.OnePoint("", 60.2, 1423931224)},
true,
},
}
run(t, msgpacks, Msgpack)
}
4 changes: 4 additions & 0 deletions vendor/github.com/vmihailenco/msgpack/v5/.prettierrc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/github.com/vmihailenco/msgpack/v5/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions vendor/github.com/vmihailenco/msgpack/v5/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions vendor/github.com/vmihailenco/msgpack/v5/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions vendor/github.com/vmihailenco/msgpack/v5/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions vendor/github.com/vmihailenco/msgpack/v5/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading