|
5 | 5 | "io"
|
6 | 6 | "mime"
|
7 | 7 | "os"
|
| 8 | + "os/exec" |
8 | 9 | "path/filepath"
|
9 | 10 | "strings"
|
10 | 11 |
|
@@ -89,11 +90,24 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
|
89 | 90 | }
|
90 | 91 | }
|
91 | 92 |
|
92 |
| - mimeType := getMimeType(file.Name()) |
93 | 93 | var reader io.Reader
|
94 |
| - |
95 |
| - if mimeType == "application/x-lz4" { |
| 94 | + if job.mimeType == "application/x-lz4" { |
| 95 | + if isNotFileBeingWritten(file.Name()) { |
| 96 | + logger.Error("cannot lock file", zap.String("filename", file.Name())) |
| 97 | + break |
| 98 | + } |
96 | 99 | lz4Reader := lz4.NewReader(file)
|
| 100 | + if len(offsets) > 0 { |
| 101 | + for lastOffset+int64(readBufferSize) < offsets[0].Offset { |
| 102 | + n, err := lz4Reader.Read(readBuf) |
| 103 | + if err != nil { |
| 104 | + if err == io.EOF { |
| 105 | + break // End of file reached |
| 106 | + } |
| 107 | + } |
| 108 | + lastOffset += int64(n) |
| 109 | + } |
| 110 | + } |
97 | 111 | reader = lz4Reader
|
98 | 112 | } else {
|
99 | 113 | reader = file
|
@@ -196,17 +210,39 @@ func getMimeType(filename string) string {
|
196 | 210 | return mimeType
|
197 | 211 | }
|
198 | 212 |
|
| 213 | +func isNotFileBeingWritten(filePath string) bool { |
| 214 | + // Run the lsof command to check open file descriptors |
| 215 | + cmd := exec.Command("lsof", filePath) |
| 216 | + output, err := cmd.Output() |
| 217 | + if err != nil { |
| 218 | + return false // Error running lsof |
| 219 | + } |
| 220 | + |
| 221 | + // Check the output for write access |
| 222 | + lines := strings.Split(string(output), "\n") |
| 223 | + for _, line := range lines { |
| 224 | + // Check if the line contains 'w' indicating write access |
| 225 | + if strings.Contains(line, "w") { |
| 226 | + return true // File is being written to |
| 227 | + } |
| 228 | + } |
| 229 | + |
| 230 | + return false // File is not being written to |
| 231 | +} |
| 232 | + |
199 | 233 | func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
|
200 | 234 | stat, err := file.Stat()
|
201 | 235 | if err != nil {
|
202 | 236 | return err
|
203 | 237 | }
|
204 | 238 |
|
205 |
| - // files truncated from time to time, after logs from file was processed. |
206 |
| - // Position > stat.Size() means that data was truncated and |
207 |
| - // caret pointer must be moved to start of file. |
208 |
| - if totalOffset > stat.Size() { |
209 |
| - jobProvider.truncateJob(job) |
| 239 | + if !job.isCompressed { |
| 240 | + // files truncated from time to time, after logs from file was processed. |
| 241 | + // Position > stat.Size() means that data was truncated and |
| 242 | + // caret pointer must be moved to start of file. |
| 243 | + if totalOffset > stat.Size() { |
| 244 | + jobProvider.truncateJob(job) |
| 245 | + } |
210 | 246 | }
|
211 | 247 |
|
212 | 248 | // Mark job as done till new lines has appeared.
|
|
0 commit comments