Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ The component periodically scans the filesystem based on `sync_period` and autom

## Debug metrics

* `loki_source_file_duplicate_files` (gauge): Number of files being tailed multiple times due to targets with different label sets. Non-zero values indicate duplicate log line ingestion. Check component logs for details on which files and labels are affected.
* `loki_source_file_encoding_failures_total` (counter): Number of encoding failures.
* `loki_source_file_file_bytes_total` (gauge): Number of bytes total.
* `loki_source_file_files_active_total` (gauge): Number of active files.
Expand Down
149 changes: 149 additions & 0 deletions internal/component/loki/source/file/duplicates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package file

import (
"os"
"slices"
"strings"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

// duplicateDetector detects when the same file is being tailed multiple times
// with different label sets, which causes duplicate log lines.
//
// Usage:
// 1. Create a new instance for each reconciliation cycle
// 2. Call Track() for each file that will actually be tailed
// 3. Call Report() after reconciliation to detect and log duplicates
type duplicateDetector struct {
logger log.Logger
metric prometheus.Gauge

// fileKeyToTargets accumulates target info during reconciliation.
// It maps file key (inode+device or path) to the list of targets for that file.
fileKeyToTargets map[fileKey][]targetInfo
}

// fileKey is a unique identifier for a file, using inode+device on POSIX
// systems or the file path on Windows.
type fileKey string

// targetInfo holds information about a target for duplicate detection.
type targetInfo struct {
path string
labels model.LabelSet
}

// newDuplicateDetector creates a new duplicate detector.
func newDuplicateDetector(logger log.Logger, metric prometheus.Gauge) *duplicateDetector {
return &duplicateDetector{
logger: logger,
metric: metric,
fileKeyToTargets: make(map[fileKey][]targetInfo),
}
}

// Track records a file that will be tailed for duplicate detection.
// This should be called during reconciliation for each file that passes
// validation and will actually be tailed.
func (d *duplicateDetector) Track(path string, labels model.LabelSet, fi os.FileInfo) {
key := getFileKey(path, fi)
d.fileKeyToTargets[key] = append(d.fileKeyToTargets[key], targetInfo{
path: path,
labels: labels,
})
}

// Report processes the accumulated state and reports any duplicates found.
// This should be called after reconciliation is complete.
func (d *duplicateDetector) Report() {
d.detectDuplicates(d.fileKeyToTargets)
}

// detectDuplicates processes the grouped targets and reports duplicates.
func (d *duplicateDetector) detectDuplicates(fileKeyToTargets map[fileKey][]targetInfo) {
duplicateCount := 0

for key, targets := range fileKeyToTargets {
if len(targets) <= 1 {
continue
}

// Collect all label sets to find which labels differ.
allLabelSets := make([]model.LabelSet, 0, len(targets))
var paths []string
for _, t := range targets {
allLabelSets = append(allLabelSets, t.labels)
paths = append(paths, t.path)
}

// Find labels that have different values across targets.
differingLabels := findDifferingLabels(allLabelSets)
if len(differingLabels) == 0 {
continue // All label sets are identical, not a real duplicate issue
}

duplicateCount++

level.Warn(d.logger).Log(
"msg", "file has multiple targets with different labels which will cause duplicate log lines",
"file_id", key,
"paths", strings.Join(paths, ", "),
"target_count", len(targets),
"differing_labels", strings.Join(differingLabels, ", "),
)
}

d.metric.Set(float64(duplicateCount))
}

// getFileKey returns a unique key for a file, using inode+device when available
// (on POSIX systems), or falling back to the file path on Windows.
func getFileKey(path string, fi os.FileInfo) fileKey {
if fileID, ok := fileext.NewFileID(fi); ok {
return fileKey(fileID.String())
}
// Fall back to path on systems where file identity isn't available.
return fileKey(path)
}

// findDifferingLabels returns the names of labels that have different values
// across the provided label sets.
func findDifferingLabels(labelSets []model.LabelSet) []string {
if len(labelSets) < 2 {
return nil
}

// Collect all label names across all sets.
allNames := make(map[model.LabelName]struct{})
for _, ls := range labelSets {
for name := range ls {
allNames[name] = struct{}{}
}
}

// Check which labels have differing values.
// We compare all label sets against the first one rather than doing pairwise
// comparisons. This is sufficient because if all values are identical, they'll
// all match the first; if any value differs, at least one will differ from the
// first. This gives us O(n) comparisons instead of O(n²).
var differing []string
for name := range allNames {
firstValue, firstHas := labelSets[0][name]
for _, ls := range labelSets[1:] {
value, has := ls[name]
if has != firstHas || value != firstValue {
differing = append(differing, string(name))
break
}
}
}

slices.Sort(differing)
return differing
}
180 changes: 180 additions & 0 deletions internal/component/loki/source/file/duplicates_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package file

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/discovery"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/util/syncbuffer"
)

func TestDuplicateDetection(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

tests := []struct {
name string
// targetsFn receives two file paths and returns the targets.
targetsFn func(file1, file2 string) []discovery.Target
useSameFile bool
expectDuplicate bool
expectedDiffLabels string
}{
{
name: "unique_files_no_duplicates",
targetsFn: func(file1, file2 string) []discovery.Target {
return []discovery.Target{
discovery.NewTargetFromMap(map[string]string{
"__path__": file1,
"app": "myapp",
"env": "prod",
}),
discovery.NewTargetFromMap(map[string]string{
"__path__": file2,
"app": "myapp",
"env": "staging",
}),
}
},
useSameFile: false,
expectDuplicate: false,
},
{
name: "unique_files_with_differing_internal_labels_no_duplicates",
targetsFn: func(file1, file2 string) []discovery.Target {
// Internal labels (__meta_*, __internal) have different values but
// should be filtered out by NonReservedLabelSet(), so these targets
// point to different files and should not be duplicates.
return []discovery.Target{
discovery.NewTargetFromMap(map[string]string{
"__path__": file1,
"__meta_source": "source1",
"__internal": "a",
"app": "myapp",
}),
discovery.NewTargetFromMap(map[string]string{
"__path__": file2,
"__meta_source": "source2",
"__internal": "b",
"app": "myapp",
}),
}
},
useSameFile: false,
expectDuplicate: false,
},
{
name: "same_file_differing_labels_duplicates",
targetsFn: func(file1, _ string) []discovery.Target {
// Both targets point to the same file but have different labels.
// Also includes differing internal labels to verify they don't
// appear in the differing_labels output.
return []discovery.Target{
discovery.NewTargetFromMap(map[string]string{
"__path__": file1,
"__meta_source": "source1",
"__internal": "a",
"app": "myapp",
"port": "8080",
"container": "main",
}),
discovery.NewTargetFromMap(map[string]string{
"__path__": file1,
"__meta_source": "source2",
"__internal": "b",
"app": "myapp",
"port": "9090",
"container": "sidecar",
}),
}
},
useSameFile: true,
expectDuplicate: true,
expectedDiffLabels: "container, port", // Only the non-internal labels should be included.
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Create file(s) for the test.
f1, err := os.CreateTemp(t.TempDir(), "test_file1")
require.NoError(t, err)
defer f1.Close()

file2Path := f1.Name()
if !tc.useSameFile {
f2, err := os.CreateTemp(t.TempDir(), "test_file2")
require.NoError(t, err)
defer f2.Close()
file2Path = f2.Name()
}

var logBuf syncbuffer.Buffer
testLogger, err := logging.New(&logBuf, logging.DefaultOptions)
require.NoError(t, err)

ctrl, err := componenttest.NewControllerFromID(testLogger, "loki.source.file")
require.NoError(t, err)

ch := loki.NewLogsReceiver()

var args Arguments
args.SetToDefault()
args.Targets = tc.targetsFn(f1.Name(), file2Path)
args.ForwardTo = []loki.LogsReceiver{ch}

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

ctrl.WaitRunning(time.Minute)

if tc.expectDuplicate {
// Wait for duplicate warning to appear in logs.
require.Eventually(t, func() bool {
return strings.Contains(logBuf.String(), "file has multiple targets with different labels which will cause duplicate log lines")
}, 5*time.Second, 10*time.Millisecond, "expected duplicate warning in log")

logOutput := logBuf.String()

// The log output may have escaped quotes depending on the logger format.
require.True(t,
strings.Contains(logOutput, fmt.Sprintf(`differing_labels="%s"`, tc.expectedDiffLabels)) ||
strings.Contains(logOutput, fmt.Sprintf(`differing_labels=\"%s\"`, tc.expectedDiffLabels)),
"expected differing labels %q in log, got: %s", tc.expectedDiffLabels, logOutput)

// Internal labels should NOT appear in differing labels.
require.NotContains(t, logOutput, "__meta_source",
"internal labels should not appear in differing labels")
require.NotContains(t, logOutput, "__internal",
"internal labels should not appear in differing labels")
} else {
// Wait for tailing to start (indicates targets have been processed).
require.Eventually(t, func() bool {
return strings.Contains(logBuf.String(), "start tailing file")
}, 5*time.Second, 10*time.Millisecond, "expected tailing to start")

// Wait a bit to ensure duplicate detection has run after tailing started.
time.Sleep(500 * time.Millisecond)

// Verify no duplicate warning was logged.
require.NotContains(t, logBuf.String(),
"file has multiple targets with different labels",
"unexpected duplicate warning in log")
}
})
}
}
13 changes: 12 additions & 1 deletion internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ func New(o component.Options, args Arguments) (*Component, error) {
return nil, err
}

m := newMetrics(o.Registerer)
c := &Component{
opts: o,
metrics: newMetrics(o.Registerer),
metrics: m,
handler: loki.NewLogsReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
posFile: positionsFile,
Expand Down Expand Up @@ -324,6 +325,9 @@ func (c *Component) Update(args component.Arguments) error {
// match the desired state.
// Caller must hold write lock on c.mut before calling this function.
func (c *Component) scheduleSources() {
// Create duplicate detector for this reconciliation cycle.
duplicateDetector := newDuplicateDetector(c.opts.Logger, c.metrics.duplicateFilesTally)

source.Reconcile(
c.opts.Logger,
c.scheduler,
Expand All @@ -347,6 +351,10 @@ func (c *Component) scheduleSources() {
return nil, source.ErrSkip
}

// Track this file for duplicate detection. We only track files that
// pass all validation checks and will actually be tailed.
duplicateDetector.Track(target.Path, target.Labels, fi)

c.metrics.totalBytes.WithLabelValues(target.Path).Set(float64(fi.Size()))

return c.newSource(sourceOptions{
Expand All @@ -361,6 +369,9 @@ func (c *Component) scheduleSources() {
})
},
)

// Report duplicates after reconciliation is complete, so that we don't slow it down.
duplicateDetector.Report()
}

type debugInfo struct {
Expand Down
Loading
Loading