Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
kafka: Add topic_prefix and escape topic names
Browse files Browse the repository at this point in the history
These are especially useful when paired with `topic_variable`.
Since the variable is being extracted from a field, we can't guarantee
that the characters being used for the topic name are valid, so
we implement a slightly modified version of stdlib's `QueryEncode`
which sanitizes the name to the range of valid topic chars.

Example:

```toml
[KafkaOutput]
topic_prefix = "heka-"
topic_variable = "Fields[ContainerName]"
```
  • Loading branch information
mattrobenolt committed Jun 10, 2015
1 parent 32cadf8 commit 7152020
Showing 1 changed file with 53 additions and 4 deletions.
57 changes: 53 additions & 4 deletions plugins/kafka/kafka_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type KafkaOutputConfig struct {
HashVariable string `toml:"hash_variable"` // HashPartitioner key is extracted from a message variable
TopicVariable string `toml:"topic_variable"` // Topic extracted from a message variable
Topic string // Static topic
TopicPrefix string `toml:"topic_prefix"` // String to be prepended to the topic

RequiredAcks string `toml:"required_acks"` // NoResponse, WaitForLocal, WaitForAll
Timeout uint32
Expand Down Expand Up @@ -317,9 +318,10 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er
go k.processKafkaErrors(or, errChan, &wg)

var (
pack *pipeline.PipelinePack
topic = k.config.Topic
key sarama.Encoder
pack *pipeline.PipelinePack
topic = k.config.Topic
prefix = k.config.TopicPrefix
key sarama.Encoder
)

for pack = range inChan {
Expand All @@ -334,7 +336,7 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er

if msgBytes, err := or.Encode(pack); err == nil {
if msgBytes != nil {
err = k.producer.QueueMessage(topic, key, sarama.ByteEncoder(msgBytes))
err = k.producer.QueueMessage(topicEscape(prefix+topic), key, sarama.ByteEncoder(msgBytes))
if err != nil {
atomic.AddInt64(&k.processMessageFailures, 1)
or.LogError(err)
Expand Down Expand Up @@ -371,6 +373,53 @@ func (k *KafkaOutput) CleanupForRestart() {
return
}

// Escapes a topic name by encoding any non-valid kafka topic name
func topicEscape(s string) string {
hexCount := 0
for i := 0; i < len(s); i++ {
if shouldEscape(s[i]) {
hexCount++
}
}

if hexCount == 0 {
return s
}

t := make([]byte, len(s)+2*hexCount)
j := 0
for i := 0; i < len(s); i++ {
c := s[i]
if shouldEscape(c) {
t[j] = '-'
t[j+1] = "0123456789ABCDEF"[c>>4]
t[j+2] = "0123456789ABCDEF"[c&15]
j += 3
} else {
t[j] = s[i]
j++
}
}
return string(t)
}

func shouldEscape(c byte) bool {
// List of valid kafka topic characters can be found at:
// https://github.com/apache/kafka/blob/43b92f8b1ce8140c432edf11b0c842f5fbe04120/core/src/main/scala/kafka/common/Topic.scala#L25
// val legalChars = "[a-zA-Z0-9\\._\\-]"
if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' {
return false
}

switch c {
case '.', '_', '-':
return false
}

// Everything else must be escaped.
return true
}

func init() {
pipeline.RegisterPlugin("KafkaOutput", func() interface{} {
return new(KafkaOutput)
Expand Down

0 comments on commit 7152020

Please sign in to comment.