Skip to content

Commit 290fe0b

Browse files
authored
Merge pull request #44 from ansakharov/files-watcher
[fix-filewatcher]
2 parents 26bb487 + d9d6112 commit 290fe0b

File tree

14 files changed

+67
-46
lines changed

14 files changed

+67
-46
lines changed

Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,16 @@ deps:
1111

1212
.PHONY: cover
1313
cover:
14-
go test -coverprofile=coverage.out ./...
14+
go test -short -coverprofile=coverage.out ./...
1515
go tool cover -html=coverage.out
1616
rm coverage.out
1717

18+
.PHONY: test-short
19+
test-short:
20+
go test ./fd/ -v -count 1 -short
21+
go test ./pipeline/ -v -count 1 -short
22+
go test ./plugin/... -v -count 1 -short
23+
1824
.PHONY: test
1925
test:
2026
go test ./fd/ -v -count 1

fd/file.d.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,3 @@ func (f *FileD) serveFreeOsMem(_ http.ResponseWriter, _ *http.Request) {
277277
func (f *FileD) serveLiveReady(_ http.ResponseWriter, _ *http.Request) {
278278
logger.Infof("live/ready OK")
279279
}
280-
281-
func (f *FileD) servePipelines(_ http.ResponseWriter, _ *http.Request) {
282-
logger.Infof("pipelines OK")
283-
}

pipeline/batch_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ func (b *batcherTail) Error(err string) {
2222
logger.Panic(err)
2323
}
2424

25-
func (b *batcherTail) WaitOrPanic(string) {}
26-
func (b *batcherTail) RecoverFromPanic() {}
25+
func (b *batcherTail) RecoverFromPanic() {}
2726

2827
func TestBatcher(t *testing.T) {
2928
eventCount := 10000000

pipeline/pipeline.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (p *Pipeline) GetOutput() OutputPlugin {
264264
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) uint64 {
265265
length := len(bytes)
266266

267-
// don't process shit
267+
// don't process shit.
268268
isEmpty := length == 0 || (bytes[0] == '\n' && length == 1)
269269
isSpam := p.antispamer.isSpam(sourceID, sourceName, isNewSource)
270270
isLong := p.settings.MaxEventSize != 0 && length > p.settings.MaxEventSize
@@ -273,7 +273,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
273273
}
274274

275275
event := p.eventPool.get()
276-
dec := decoder.NO
276+
var dec decoder.DecoderType
277277
if p.decoder == decoder.AUTO {
278278
dec = p.suggestedDecoder
279279
} else {

pipeline/processor.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ type processor struct {
6161
busyActions []bool
6262
busyActionsTotal int
6363
actionWatcher *actionWatcher
64-
waitOrPanic func(msgStr string)
6564
recoverFromPanic func()
6665

6766
heartbeatCh chan *stream

plugin/action/debug/debug.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@ import (
99
/*{ introduction
1010
It logs event to stdout. Useful for debugging.
1111
}*/
12-
type Plugin struct {
13-
paths [][]string
14-
names []string
15-
preserveFields bool
16-
}
12+
type Plugin struct{}
1713

1814
type Config map[string]interface{}
1915

plugin/action/parse_es/parse_es.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ It parses HTTP input using Elasticsearch `/_bulk` API format. It converts source
1212
}*/
1313
type Plugin struct {
1414
logger *zap.SugaredLogger
15-
config *Config
1615
passNext bool
1716
discardNext bool
1817
isStrict bool

plugin/input/file/offset_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package file
22

33
import (
4+
"go.uber.org/atomic"
45
"os"
56
"sync"
67
"testing"
@@ -77,7 +78,7 @@ func TestParallel(t *testing.T) {
7778
lastEventSeq: 0,
7879
isVirgin: false,
7980
isDone: false,
80-
shouldSkip: false,
81+
shouldSkip: *atomic.NewBool(false),
8182
offsets: offsets,
8283
mu: &sync.Mutex{},
8384
}
@@ -91,7 +92,7 @@ func TestParallel(t *testing.T) {
9192
lastEventSeq: 0,
9293
isVirgin: false,
9394
isDone: false,
94-
shouldSkip: false,
95+
shouldSkip: *atomic.NewBool(false),
9596
offsets: offsets,
9697
mu: &sync.Mutex{},
9798
}

plugin/input/file/provider.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type Job struct {
6666

6767
isVirgin bool // it should be set to false if job hits isDone=true at the first time
6868
isDone bool
69-
shouldSkip bool
69+
shouldSkip atomic.Bool
7070

7171
// offsets is a sliceMap of streamName to offset.
7272
// Unlike map[string]int, sliceMap can work with mutable strings when using unsafe conversion from []byte.
@@ -305,7 +305,7 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
305305

306306
isVirgin: true,
307307
isDone: true,
308-
shouldSkip: false,
308+
shouldSkip: *atomic.NewBool(false),
309309

310310
offsets: nil,
311311

@@ -368,7 +368,7 @@ func (jp *jobProvider) initJobOffset(operation offsetsOp, job *Job) {
368368
if err != nil {
369369
jp.logger.Panicf("can't make job, can't seek file %d:%s: %s", job.sourceID, job.filename, err.Error())
370370
}
371-
job.shouldSkip = true
371+
job.shouldSkip.Store(true)
372372

373373
case offsetsOpReset:
374374
_, err := job.file.Seek(0, io.SeekStart)

plugin/input/file/watcher.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ func (w *watcher) start() {
5656
eventsCh := make(chan notify.EventInfo, 128)
5757
w.watcherCh = eventsCh
5858

59-
events := []notify.Event{notify.Create}
59+
events := []notify.Event{notify.Create, notify.Rename, notify.Remove}
6060
if w.shouldWatchWrites {
6161
events = append(events, notify.Write)
6262
}
63-
64-
// watch recursivly
63+
// watch recursively.
6564
err := notify.Watch(filepath.Join(w.path, "..."), eventsCh, events...)
6665
if err != nil {
6766
w.logger.Warnf("can't create fs watcher: %s", err.Error())

0 commit comments

Comments
 (0)