Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow "Kubernetes-Style" labels for aggregation #35

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/bucket/bucket_aggregation_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (monitor *bucketAggregationMonitor) processAggregations(
monitor.updateCounter(hits, labels)
return
}
expectedAggregations = getOriginalAggregationKeys(expectedAggregations)
if buckets, ok := container.Terms(expectedAggregations[0]); !ok {
log.Printf("Missing terms aggregation %s in response %v\n", expectedAggregations[0], container)
} else {
Expand All @@ -67,7 +68,7 @@ func (monitor *bucketAggregationMonitor) updateCounter(newCount int64, labels pr

func withLabel(labels prometheus.Labels, key string, value string) prometheus.Labels {
newLabels := prometheus.Labels{
key: value,
getAllowedPrometheusLabel(key): value,
}
for k, v := range labels {
newLabels[k] = v
Expand Down
32 changes: 30 additions & 2 deletions plugins/bucket/bucket_aggregation_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package main
import (
"github.com/MaibornWolff/elcep/main/config"
"github.com/olivere/elastic"
"github.com/patrickmn/go-cache"
"log"
"regexp"
"time"
)

Expand Down Expand Up @@ -31,7 +33,8 @@ func Create(query config.Query, timeKey string) *bucketAggregationQuery {
if !ok2 {
log.Fatalf("Malformed query %v, %s should be a string", query, _field)
}
aggregations[index] = field

aggregations[index] = getAllowedPrometheusLabel(field)
}

return &bucketAggregationQuery{
Expand All @@ -52,7 +55,8 @@ func (query *bucketAggregationQuery) build(elasticClient *elastic.Client) *elast
Format("yyyy-MM-dd HH:mm:ss"))).
FilterPath("hits.total,aggregations")
if len(query.aggregations) > 0 {
service = service.Aggregation(createAggregations(query.aggregations))
originAggregations := getOriginalAggregationKeys(query.aggregations)
service = service.Aggregation(createAggregations(originAggregations))
}
return service
}
Expand All @@ -69,3 +73,27 @@ func createAggregations(aggregationKeys []string) (string, elastic.Aggregation)
SubAggregation(createAggregations(aggregationKeys[1:]))
}
}

// Retrieve the origin label name from the cache
func getOriginalAggregationKeys(s []string) []string{
strSlice := make([]string, 0)
for _ , str := range s {
if x, found := bucketCache.Get(str); found {
value := x.(string)
strSlice = append(strSlice, value)
}
}
return strSlice
}


// Replace given string special characters and return '_' instead.
// save the origin label to cache.
// https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
func getAllowedPrometheusLabel(s string) string {
var re = regexp.MustCompile(`[!@#$%^&*(),./\\?":{}|<>]`)
r := re.ReplaceAllString(s, `${1}_${2}`)
bucketCache.Set(r, s, cache.NoExpiration) // set origin label to the cache

return r
}
1 change: 1 addition & 0 deletions plugins/bucket/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ require (
github.com/MaibornWolff/elcep/main/plugin v1.2.0
github.com/mitchellh/hashstructure v1.0.0
github.com/olivere/elastic v6.2.19+incompatible
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.0.0
)
2 changes: 2 additions & 0 deletions plugins/bucket/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/olivere/elastic v6.2.19+incompatible h1:FYiohzobKD2tib3sHgEHNBSzcw1onqApaumIBBstchQ=
github.com/olivere/elastic v6.2.19+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
4 changes: 4 additions & 0 deletions plugins/bucket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"github.com/MaibornWolff/elcep/main/config"
"github.com/MaibornWolff/elcep/main/plugin"
"github.com/olivere/elastic"
"github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
"log"
"time"
)

var bucketCache = cache.New(cache.NoExpiration, 60*time.Minute)

// The factory method for the plugin
// noinspection GoUnusedExportedFunction
func NewPlugin(options config.Options, _ interface{}) plugin.Plugin {
Expand Down