Skip to content

Commit 3ccae83

Browse files
committed
lz4: start read from offset
1 parent a5d658a commit 3ccae83

File tree

2 files changed

+79
-25
lines changed

2 files changed

+79
-25
lines changed

Diff for: plugin/input/file/provider.go

+31-16
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ type jobProvider struct {
6060
}
6161

6262
type Job struct {
63-
file *os.File
64-
inode inodeID
65-
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
66-
filename string
67-
symlink string
68-
curOffset int64 // offset to not call Seek() everytime
69-
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
63+
file *os.File
64+
mimeType string
65+
isCompressed bool
66+
inode inodeID
67+
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
68+
filename string
69+
symlink string
70+
curOffset int64 // offset to not call Seek() everytime
71+
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
7072

7173
ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
7274
lastEventSeq uint64
@@ -83,10 +85,15 @@ type Job struct {
8385
mu *sync.Mutex
8486
}
8587

86-
func (j *Job) seek(offset int64, whence int, hint string) int64 {
87-
n, err := j.file.Seek(offset, whence)
88-
if err != nil {
89-
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
88+
func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
89+
var err error
90+
if !j.isCompressed {
91+
n, err = j.file.Seek(offset, whence)
92+
if err != nil {
93+
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
94+
}
95+
} else {
96+
n = 0
9097
}
9198
j.curOffset = n
9299

@@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
354361
}
355362
}
356363

364+
func isCompressed(mimeType string) bool {
365+
return mimeType == "application/x-lz4"
366+
}
367+
357368
func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) {
358369
sourceID := sourceIDByStat(stat, symlink)
359370

@@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
370381
}
371382

372383
inode := getInode(stat)
384+
mimeType := getMimeType(filename)
385+
373386
job := &Job{
374-
file: file,
375-
inode: inode,
376-
filename: filename,
377-
symlink: symlink,
378-
sourceID: sourceID,
387+
file: file,
388+
isCompressed: isCompressed(mimeType),
389+
mimeType: mimeType,
390+
inode: inode,
391+
filename: filename,
392+
symlink: symlink,
393+
sourceID: sourceID,
379394

380395
isVirgin: true,
381396
isDone: true,

Diff for: plugin/input/file/worker.go

+48-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"mime"
77
"os"
8+
"os/exec"
89
"path/filepath"
910
"strings"
1011

@@ -89,11 +90,24 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
8990
}
9091
}
9192

92-
mimeType := getMimeType(file.Name())
9393
var reader io.Reader
94-
95-
if mimeType == "application/x-lz4" {
94+
if job.mimeType == "application/x-lz4" {
95+
if isNotFileBeingWritten(file.Name()) {
96+
logger.Error("cannot lock file", zap.String("filename", file.Name()))
97+
break
98+
}
9699
lz4Reader := lz4.NewReader(file)
100+
if len(offsets) > 0 {
101+
for lastOffset+int64(readBufferSize) < offsets[0].Offset {
102+
n, err := lz4Reader.Read(readBuf)
103+
if err != nil {
104+
if err == io.EOF {
105+
break // End of file reached
106+
}
107+
}
108+
lastOffset += int64(n)
109+
}
110+
}
97111
reader = lz4Reader
98112
} else {
99113
reader = file
@@ -196,22 +210,47 @@ func getMimeType(filename string) string {
196210
return mimeType
197211
}
198212

213+
func isNotFileBeingWritten(filePath string) bool {
214+
// Run the lsof command to check open file descriptors
215+
cmd := exec.Command("lsof", filePath)
216+
output, err := cmd.Output()
217+
if err != nil {
218+
return false // Error running lsof
219+
}
220+
221+
// Check the output for write access
222+
lines := strings.Split(string(output), "\n")
223+
for _, line := range lines {
224+
// Check if the line contains 'w' indicating write access
225+
if strings.Contains(line, "w") {
226+
return true // File is being written to
227+
}
228+
}
229+
230+
return false // File is not being written to
231+
}
232+
199233
func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
200234
stat, err := file.Stat()
201235
if err != nil {
202236
return err
203237
}
204238

205-
// files truncated from time to time, after logs from file was processed.
206-
// Position > stat.Size() means that data was truncated and
207-
// caret pointer must be moved to start of file.
208-
if totalOffset > stat.Size() {
209-
jobProvider.truncateJob(job)
239+
if !job.isCompressed {
240+
// files truncated from time to time, after logs from file was processed.
241+
// Position > stat.Size() means that data was truncated and
242+
// caret pointer must be moved to start of file.
243+
if totalOffset > stat.Size() {
244+
jobProvider.truncateJob(job)
245+
}
210246
}
211-
212247
// Mark job as done till new lines has appeared.
213248
jobProvider.doneJob(job)
214249

250+
if job.isCompressed {
251+
file.Close()
252+
}
253+
215254
return nil
216255
}
217256

0 commit comments

Comments
 (0)