Skip to content

Commit

Permalink
feedback 3/n
Browse files Browse the repository at this point in the history
  • Loading branch information
BominRahmani committed Feb 3, 2025
1 parent 19ee6ed commit ce33e15
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 58 deletions.
46 changes: 36 additions & 10 deletions connector/throughputanomalyconnector/anomaly.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func calculateStatistics(counts []int64) Statistics {
for _, count := range counts {
sum += float64(count)
}

mean := sum / float64(len(counts))

// Calculate standard deviation
Expand All @@ -77,18 +76,34 @@ func calculateStatistics(counts []int64) Statistics {
// Calculate median
sortedCounts := make([]int64, len(counts))
copy(sortedCounts, counts)
sort.Slice(sortedCounts, func(i int, j int) bool {
sort.Slice(sortedCounts, func(i, j int) bool {
return sortedCounts[i] < sortedCounts[j]
})
median := float64(sortedCounts[len(sortedCounts)/2])

// Calculate MAD
var median float64
if len(sortedCounts)%2 == 0 {
// If even number of samples, average the two middle values
mid := len(sortedCounts) / 2
median = (float64(sortedCounts[mid-1]) + float64(sortedCounts[mid])) / 2
} else {
// If odd number of samples, take the middle value
median = float64(sortedCounts[len(sortedCounts)/2])
}

// Calculate MAD (Median Absolute Deviation)
deviations := make([]float64, len(counts))
for i, count := range counts {
deviations[i] = math.Abs(float64(count) - median)
}
sort.Float64s(deviations)
mad := deviations[len(deviations)/2] * 1.4826

var mad float64
if len(deviations)%2 == 0 {
mid := len(deviations) / 2
mad = (deviations[mid-1] + deviations[mid]) / 2 * 1.4826
} else {
mad = deviations[len(deviations)/2] * 1.4826
}

return Statistics{
mean: mean,
Expand All @@ -111,9 +126,20 @@ func (d *Detector) checkForAnomaly() *AnomalyStat {

stats := calculateStatistics(historicalCounts)

if stats.stdDev == 0 {
var percentageDiff float64
if stats.mean == 0 {
if float64(currentCount) == 0 {
percentageDiff = 0
} else {
percentageDiff = 100 // handle division by zero by allowing percentage diff to be 100%
}
} else {
percentageDiff = ((float64(currentCount) - stats.mean) / stats.mean) * 100
}
percentageDiff = math.Abs(percentageDiff)

if stats.stdDev == 0 || stats.mad == 0 {
if float64(currentCount) != stats.mean {
percentageDiff := ((float64(currentCount) - stats.mean) / stats.mean) * 100
anomalyType := "Drop"
if float64(currentCount) > stats.mean {
anomalyType = "Spike"
Expand All @@ -125,15 +151,15 @@ func (d *Detector) checkForAnomaly() *AnomalyStat {
currentCount: currentCount,
zScore: 0, // Not meaningful when stdDev is 0
madScore: 0, // Not meaningful when MAD is 0
percentageDiff: math.Abs(percentageDiff),
percentageDiff: percentageDiff,
timestamp: d.lastWindowEndTime,
}
}
return nil
}

zScore := (float64(currentCount) - stats.mean) / stats.stdDev
madScore := (float64(currentCount) - stats.median) / stats.mad
percentageDiff := ((float64(currentCount) - stats.mean) / stats.mean) * 100

// Check for anomaly using both Z-score and MAD
if math.Abs(zScore) > d.config.ZScoreThreshold || math.Abs(madScore) > d.config.MADThreshold {
Expand All @@ -148,7 +174,7 @@ func (d *Detector) checkForAnomaly() *AnomalyStat {
currentCount: currentCount,
zScore: zScore,
madScore: madScore,
percentageDiff: math.Abs(percentageDiff),
percentageDiff: percentageDiff,
timestamp: d.lastWindowEndTime,
}
}
Expand Down
58 changes: 34 additions & 24 deletions connector/throughputanomalyconnector/anomaly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,40 @@ func TestConfig_Validate(t *testing.T) {
{
name: "valid config",
config: Config{
SampleInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
AnalysisInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
},
expectError: false,
},
{
name: "invalid sample interval - too small",
config: Config{
SampleInterval: time.Second * 30,
MaxWindowAge: time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
AnalysisInterval: time.Second * 30,
MaxWindowAge: time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
},
expectError: true,
},
{
name: "invalid max window age - too small relative to sample interval",
config: Config{
SampleInterval: time.Minute,
MaxWindowAge: time.Minute * 5,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
AnalysisInterval: time.Minute,
MaxWindowAge: time.Minute * 5,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
},
expectError: true,
},
{
name: "invalid threshold - negative z-score",
config: Config{
SampleInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: -1.0,
MADThreshold: 3.0,
AnalysisInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: -1.0,
MADThreshold: 3.0,
},
expectError: true,
},
Expand Down Expand Up @@ -160,10 +160,10 @@ func (m *mockConsumer) Capabilities() consumer.Capabilities {

func TestDetector_AnomalyDetection(t *testing.T) {
config := &Config{
SampleInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: 2.0,
MADThreshold: 2.0,
AnalysisInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: 2.0,
MADThreshold: 2.0,
}

logger := zap.NewNop()
Expand Down Expand Up @@ -254,20 +254,26 @@ func TestDetector_AnomalyDetection(t *testing.T) {

func TestDetector_Shutdown(t *testing.T) {
config := &Config{
SampleInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
AnalysisInterval: time.Minute,
MaxWindowAge: time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.0,
}

logger := zap.NewNop()
mockConsumer := &mockConsumer{}
detector := newDetector(config, logger, mockConsumer)

// Initialize the log channel with proper capacity
detector.logChan = make(chan logBatch, 1)

// Start the detector
err := detector.Start(context.Background(), nil)
require.NoError(t, err)

// Allow some time for goroutine to start
time.Sleep(100 * time.Millisecond)

// Test clean shutdown
ctx := context.Background()
err = detector.Shutdown(ctx)
Expand All @@ -283,9 +289,13 @@ func TestDetector_Shutdown(t *testing.T) {

// Test shutdown with canceled context
detector = newDetector(config, logger, mockConsumer)
detector.logChan = make(chan logBatch, 1) // Initialize channel for new detector
err = detector.Start(context.Background(), nil)
require.NoError(t, err)

// Allow some time for goroutine to start
time.Sleep(100 * time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
cancel()
err = detector.Shutdown(ctx)
Expand Down
20 changes: 10 additions & 10 deletions connector/throughputanomalyconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var _ component.Config = (*Config)(nil)
// Config defines the configuration parameters for the log anomaly detector connector.
type Config struct {
// How often to take measurements
SampleInterval time.Duration `mapstructure:"sample_interval"`
AnalysisInterval time.Duration `mapstructure:"analysis_interval"`
// MaxWindowAge defines the maximum age of samples to retain in the detection window.
// Samples older than this duration are pruned from the analysis window.
// This duration determines how far back the detector looks when establishing baseline behavior.
Expand All @@ -46,14 +46,14 @@ type Config struct {
// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if config.SampleInterval <= 0 {
return fmt.Errorf("sample_interval must be positive, got %v", config.SampleInterval)
if config.AnalysisInterval <= 0 {
return fmt.Errorf("analysis_interval must be positive, got %v", config.AnalysisInterval)
}
if config.SampleInterval < time.Minute {
return fmt.Errorf("sample_interval must be at least 1 minute, got %v", config.SampleInterval)
if config.AnalysisInterval < time.Minute {
return fmt.Errorf("analysis_interval must be at least 1 minute, got %v", config.AnalysisInterval)
}
if config.SampleInterval > time.Hour {
return fmt.Errorf("sample_interval must not exceed 1 hour, got %v", config.SampleInterval)
if config.AnalysisInterval > time.Hour {
return fmt.Errorf("analysis_interval must not exceed 1 hour, got %v", config.AnalysisInterval)
}
if config.MaxWindowAge <= 0 {
return fmt.Errorf("max_window_age must be positive, got %v", config.MaxWindowAge)
Expand All @@ -62,9 +62,9 @@ func (config *Config) Validate() error {
return fmt.Errorf("max_window_age must be at least 1 hour, got %v", config.MaxWindowAge)
}

if config.MaxWindowAge < config.SampleInterval*10 {
return fmt.Errorf("max_window_age (%v) must be at least 10 times larger than sample_interval (%v)",
config.MaxWindowAge, config.SampleInterval)
if config.MaxWindowAge < config.AnalysisInterval*10 {
return fmt.Errorf("max_window_age (%v) must be at least 10 times larger than analysis_interval (%v)",
config.MaxWindowAge, config.AnalysisInterval)
}

if config.ZScoreThreshold <= 0 {
Expand Down
25 changes: 15 additions & 10 deletions connector/throughputanomalyconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func newDetector(config *Config, logger *zap.Logger, nextConsumer consumer.Logs)
stateLock: sync.Mutex{},
nextConsumer: nextConsumer,
logChan: make(chan logBatch, 1000),
counts: make([]int64, numWindows),
lastWindowEndTime: time.Now().Truncate(time.Minute),
counts: make([]int64, 0, numWindows),
lastWindowEndTime: time.Now().Truncate(config.AnalysisInterval),
}
}

Expand All @@ -73,7 +73,7 @@ func newDetector(config *Config, logger *zap.Logger, nextConsumer consumer.Logs)
func (d *Detector) Start(ctx context.Context, _ component.Host) error {
d.ctx, d.cancel = context.WithCancel(ctx)

ticker := time.NewTicker(d.config.SampleInterval)
ticker := time.NewTicker(d.config.AnalysisInterval)

d.wg.Add(1)

Expand Down Expand Up @@ -135,18 +135,15 @@ func (d *Detector) analyzeTimeWindow() {
now := time.Now()
currentCount := d.currentCount.Swap(0)

if len(d.counts) > 0 {
d.counts[len(d.counts)-1] = currentCount
}

// drop any windows that are too old
maxAge := d.config.MaxWindowAge
cutoffTime := now.Add(-maxAge)
windowDuration := d.config.AnalysisInterval

// find the first window that is not too old
var keepIndex int
for i := range d.counts {
windowTime := d.lastWindowEndTime.Add(-time.Duration(len(d.counts)-1-i) * time.Minute)
windowTime := d.lastWindowEndTime.Add(-time.Duration(len(d.counts)-1-i) * windowDuration)
if windowTime.After(cutoffTime) {
keepIndex = i
break
Expand All @@ -158,9 +155,17 @@ func (d *Detector) analyzeTimeWindow() {
}

// add windows until we reach current time
for d.lastWindowEndTime.Add(time.Minute).Before(now) {
for d.lastWindowEndTime.Add(windowDuration).Before(now) {
d.counts = append(d.counts, 0)
d.lastWindowEndTime = d.lastWindowEndTime.Add(time.Minute)
d.lastWindowEndTime = d.lastWindowEndTime.Add(windowDuration)
}

if len(d.counts) > 0 {
d.counts[len(d.counts)-1] = currentCount
} else {
// Initialize first window
d.counts = append(d.counts, currentCount)
d.lastWindowEndTime = now.Truncate(windowDuration)
}

// Check for anomalies using the fixed window counts
Expand Down
8 changes: 4 additions & 4 deletions connector/throughputanomalyconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func NewFactory() connector.Factory {

func createDefaultConfig() component.Config {
return &Config{
SampleInterval: 1 * time.Minute,
MaxWindowAge: 1 * time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.5,
AnalysisInterval: 1 * time.Minute,
MaxWindowAge: 1 * time.Hour,
ZScoreThreshold: 3.0,
MADThreshold: 3.5,
}
}

Expand Down

0 comments on commit ce33e15

Please sign in to comment.