Skip to content

Commit

Permalink
Add message headers to produced record (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Mar 19, 2024
1 parent 16fc83a commit 2b54298
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
6 changes: 5 additions & 1 deletion source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (

const (
// MetadataKafkaTopic is the metadata key for storing the kafka topic
MetadataKafkaTopic = "kafka.topic"
MetadataKafkaTopic = "kafka.topic"
MetadataKafkaHeaderPrefix = "kafka.header."
)

type Source struct {
Expand Down Expand Up @@ -98,6 +99,9 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) {

metadata := sdk.Metadata{MetadataKafkaTopic: rec.Topic}
metadata.SetCreatedAt(rec.Timestamp)
for _, h := range rec.Headers {
metadata[MetadataKafkaHeaderPrefix+h.Key] = string(h.Value)
}

return sdk.Util.Source.NewRecordCreate(
source.Position{
Expand Down
15 changes: 12 additions & 3 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matryer/is"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/mock/gomock"
)

Expand Down Expand Up @@ -53,6 +56,10 @@ func TestSource_Read(t *testing.T) {
ctrl := gomock.NewController(t)

rec := test.GenerateFranzRecords(0, 0)[0]
rec.Headers = []kgo.RecordHeader{
{Key: "header-a", Value: []byte("value-a")},
{Key: "header-b", Value: []byte{0, 1, 2}},
}
want := sdk.Record{
Position: source.Position{
GroupID: "",
Expand All @@ -62,8 +69,10 @@ func TestSource_Read(t *testing.T) {
}.ToSDKPosition(),
Operation: sdk.OperationCreate,
Metadata: map[string]string{
MetadataKafkaTopic: rec.Topic,
sdk.MetadataCreatedAt: strconv.FormatInt(rec.Timestamp.UnixNano(), 10),
MetadataKafkaTopic: rec.Topic,
sdk.MetadataCreatedAt: strconv.FormatInt(rec.Timestamp.UnixNano(), 10),
"kafka.header.header-a": "value-a",
"kafka.header.header-b": string([]byte{0, 1, 2}),
},
Key: sdk.RawData(rec.Key),
Payload: sdk.Change{
Expand All @@ -83,5 +92,5 @@ func TestSource_Read(t *testing.T) {
is.NoErr(err)
is.True(got.Metadata[sdk.MetadataReadAt] != "")
want.Metadata[sdk.MetadataReadAt] = got.Metadata[sdk.MetadataReadAt]
is.Equal(want, got)
is.Equal(cmp.Diff(want, got, cmpopts.IgnoreUnexported(sdk.Record{})), "")
}

0 comments on commit 2b54298

Please sign in to comment.