From 343723c91fd6a3a81d4e45f9d901d40dffd6ac4c Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Thu, 16 Jun 2016 14:43:53 -0400 Subject: [PATCH] Memory map kafka checkpoint file --- cmake/externals.cmake | 1 + plugins/kafka/kafka_input.go | 15 +++++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cmake/externals.cmake b/cmake/externals.cmake index 213f6c6f4..559e36b18 100644 --- a/cmake/externals.cmake +++ b/cmake/externals.cmake @@ -161,6 +161,7 @@ git_clone(https://github.com/rafrombrc/gospec 2e46585948f47047b0c217d00fa24bbc4e git_clone(https://github.com/crankycoder/xmlpath 670b185b686fd11aa115291fb2f6dc3ed7ebb488) git_clone(https://github.com/thoj/go-ircevent 90dc7f966b95d133f1c65531c6959b52effd5e40) git_clone(https://github.com/cactus/gostrftime d329f83c5ce9c416f8983f0a0044734db54ee24d) +git_clone(https://github.com/edsrzf/mmap-go 935e0e8a636ca4ba70b713f3e38a19e1b77739e8) git_clone(https://github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a) git_clone(https://github.com/eapache/go-resiliency v1.0.0) diff --git a/plugins/kafka/kafka_input.go b/plugins/kafka/kafka_input.go index 7bb713721..39d4e6cf7 100644 --- a/plugins/kafka/kafka_input.go +++ b/plugins/kafka/kafka_input.go @@ -26,6 +26,7 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/edsrzf/mmap-go" "github.com/mozilla-services/heka/message" "github.com/mozilla-services/heka/pipeline" "github.com/mozilla-services/heka/plugins/tcp" @@ -74,6 +75,7 @@ type KafkaInput struct { pConfig *pipeline.PipelineConfig ir pipeline.InputRunner checkpointFile *os.File + checkpoint mmap.MMap stopChan chan bool name string checkpointFilename string @@ -109,13 +111,18 @@ func fileExists(path string) bool { func (k *KafkaInput) writeCheckpoint(offset int64) (err error) { if k.checkpointFile == nil { - if k.checkpointFile, err = os.OpenFile(k.checkpointFilename, - os.O_WRONLY|os.O_SYNC|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + if k.checkpointFile, err = os.Create(k.checkpointFilename); err != nil { + return + } + + binary.Write(k.checkpointFile, binary.LittleEndian, int64(0)) + if k.checkpoint, err = mmap.Map(k.checkpointFile, mmap.RDWR, 0); err != nil { return } } - k.checkpointFile.Seek(0, 0) - err = binary.Write(k.checkpointFile, binary.LittleEndian, &offset) + + binary.LittleEndian.PutUint64(k.checkpoint, uint64(offset)) + return }