Skip to content

Support entity for OTLP custom metrics #1529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5e1b3aa
Improve cluster detection for components during translation (#1525)
musa-asad Feb 5, 2025
c23457c
added fix in entity logic to fallback to workload name when unknown s…
nathalapooja Feb 21, 2025
20824f2
Set the service name source to Instrumentation in entity when custome…
nathalapooja Mar 3, 2025
8b26c47
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Mar 6, 2025
3c65c39
Implement Kubernetes Metadata Extension (#1583)
musa-asad Mar 17, 2025
1a6fa9b
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Mar 18, 2025
470a1e3
Gets podIp from connection context in entity processor to extract the…
nathalapooja Mar 20, 2025
c765c9a
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Mar 20, 2025
44d7deb
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Mar 24, 2025
e623536
Add Instance id entity attribute only if the application node is same…
nathalapooja Mar 26, 2025
2cf8c1f
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Mar 27, 2025
256bd35
align with main
musa-asad Mar 27, 2025
a2babd5
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Mar 27, 2025
5c6bd33
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Apr 3, 2025
448dd7b
Update Application Signals Processor to use extension if available (#…
musa-asad Apr 7, 2025
2f52708
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Apr 7, 2025
f00e7ea
Enable entity in EMF logs for OTLP
musa-asad Apr 7, 2025
1310484
Prioritize resource attributes over k8smetadata extension
musa-asad Apr 9, 2025
78a8c80
Merge branch 'main' into feature-custom-metrics-entity
musa-asad Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion extension/k8smetadata/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kubernetes Metadata

The Kubernetes Metadata utilizes a Kubernetes client to start an informer, which queries the Kubernetes API for EndpointSlices. The EndpointSlices are transformed to reduce storage and periodically updated.
The Kubernetes Metadata utilizes a Kubernetes client to start an informer, which queries the Kubernetes API for EndpointSlices and Services. The EndpointSlices and Services are transformed to reduce storage and periodically updated.

> Kubernetes' EndpointSlice API provides a way to track network endpoints within a Kubernetes cluster. (https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/)

Expand All @@ -11,3 +11,17 @@ Pod IP → {Workload, Namespace, Node} mappings are stored.
- Namespace: This is the Kubernetes namespace the application is in.
- Node: This is the Kubernetes node the application is in.

Alternatively, if Pod IP isn't provided, a Cluster IP → Service → {Workload, Namespace, Node} mapping is used instead.

## Configuration

```yaml
extensions:
k8smetadata:
objects:
- endpointslices
- services

service:
extensions: [k8smetadata]
```
26 changes: 25 additions & 1 deletion extension/k8smetadata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,33 @@
package k8smetadata

import (
"errors"

"go.opentelemetry.io/collector/component"
"go.uber.org/multierr"
)

type Config struct{}
type Config struct {
Objects []string `mapstructure:"objects"`
}

func (c *Config) Validate() error {
var errs error
if len(c.Objects) == 0 {
errs = multierr.Append(errs, errors.New("no k8s objects passed in"))
}

allowedObjects := map[string]bool{
"endpointslices": true,
"services": true,
}

for _, obj := range c.Objects {
if !allowedObjects[obj] {
errs = multierr.Append(errs, errors.New("invalid k8s object: "+obj+". Only 'endpointslices' and 'services' are allowed"))
}
}
return errs
}

var _ component.Config = (*Config)(nil)
46 changes: 46 additions & 0 deletions extension/k8smetadata/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,49 @@ func TestUnmarshalDefaultConfig(t *testing.T) {
assert.NoError(t, confmap.New().Unmarshal(cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}

func TestConfigValidate(t *testing.T) {
testCases := []struct {
desc string
config Config
expectedErr string
}{
{
desc: "empty objects",
config: Config{Objects: []string{}},
expectedErr: "no k8s objects passed in",
},
{
desc: "invalid object",
config: Config{Objects: []string{"pods"}},
expectedErr: "invalid k8s object: pods. Only 'endpointslices' and 'services' are allowed",
},
{
desc: "multiple invalid objects",
config: Config{Objects: []string{"pods", "deployments"}},
expectedErr: "invalid k8s object: pods. Only 'endpointslices' and 'services' are allowed; invalid k8s object: deployments. Only 'endpointslices' and 'services' are allowed",
},
{
desc: "valid single object",
config: Config{Objects: []string{"services"}},
expectedErr: "",
},
{
desc: "valid multiple objects",
config: Config{Objects: []string{"services", "endpointslices"}},
expectedErr: "",
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
err := tc.config.Validate()
if tc.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr)
}
})
}
}
81 changes: 70 additions & 11 deletions extension/k8smetadata/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type KubernetesMetadata struct {
ready atomic.Bool
safeStopCh *k8sclient.SafeChannel
endpointSliceWatcher *k8sclient.EndpointSliceWatcher
serviceWatcher *k8sclient.ServiceWatcher
}

var _ extension.Extension = (*KubernetesMetadata)(nil)
Expand Down Expand Up @@ -57,15 +58,24 @@ func (e *KubernetesMetadata) Start(_ context.Context, _ component.Host) error {

timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)

e.endpointSliceWatcher = k8sclient.NewEndpointSliceWatcher(e.logger, sharedInformerFactory, timedDeleter)
e.safeStopCh = &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}

e.endpointSliceWatcher.Run(e.safeStopCh.Ch)

e.endpointSliceWatcher.WaitForCacheSync(e.safeStopCh.Ch)
for _, obj := range e.config.Objects {
switch obj {
case "endpointslices":
e.endpointSliceWatcher = k8sclient.NewEndpointSliceWatcher(e.logger, sharedInformerFactory, timedDeleter)
e.endpointSliceWatcher.Run(e.safeStopCh.Ch)
e.endpointSliceWatcher.WaitForCacheSync(e.safeStopCh.Ch)
e.logger.Debug("EndpointSlice cache synced")
case "services":
e.serviceWatcher = k8sclient.NewServiceWatcher(e.logger, sharedInformerFactory, timedDeleter)
e.serviceWatcher.Run(e.safeStopCh.Ch)
e.serviceWatcher.WaitForCacheSync(e.safeStopCh.Ch)
e.logger.Debug("Service cache synced")
}
}

e.logger.Debug("EndpointSlice cache synced, extension fully started")
e.logger.Debug("Cache synced, extension fully started")
e.ready.Store(true)

return nil
Expand All @@ -78,22 +88,71 @@ func (e *KubernetesMetadata) Shutdown(_ context.Context) error {
return nil
}

func (e *KubernetesMetadata) GetPodMetadata(ip string) k8sclient.PodMetadata {
func (e *KubernetesMetadata) GetPodMetadataFromPodIP(ip string) k8sclient.PodMetadata {
if e.endpointSliceWatcher == nil {
e.logger.Debug("GetPodMetadataFromPodIP: endpointslices not enabled in config")
return k8sclient.PodMetadata{}
}
if ip == "" {
e.logger.Debug("GetPodMetadata: no IP provided")
e.logger.Debug("GetPodMetadataFromPodIP: no IP provided")
return k8sclient.PodMetadata{}
}
pm, ok := e.endpointSliceWatcher.IPToPodMetadata.Load(ip)
pm, ok := e.endpointSliceWatcher.GetIPToPodMetadata().Load(ip)
if !ok {
e.logger.Debug("GetPodMetadata: no mapping found for IP", zap.String("ip", ip))
e.logger.Debug("GetPodMetadataFromPodIP: no mapping found for IP", zap.String("ip", ip))
return k8sclient.PodMetadata{}
}
metadata := pm.(k8sclient.PodMetadata)
e.logger.Debug("GetPodMetadata: found metadata",
e.logger.Debug("GetPodMetadataFromPodIP: found metadata",
zap.String("ip", ip),
zap.String("workload", metadata.Workload),
zap.String("namespace", metadata.Namespace),
zap.String("node", metadata.Node),
)
return metadata
}

func (e *KubernetesMetadata) GetPodMetadataFromServiceAndNamespace(svcAndNS string) k8sclient.PodMetadata {
if e.endpointSliceWatcher == nil {
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: endpointslices not enabled in config")
return k8sclient.PodMetadata{}
}
if svcAndNS == "" {
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: no service@namespace provided")
return k8sclient.PodMetadata{}
}
pm, ok := e.endpointSliceWatcher.GetServiceNamespaceToPodMetadata().Load(svcAndNS)
if !ok {
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: no mapping found", zap.String("svcAndNS", svcAndNS))
return k8sclient.PodMetadata{}
}
metadata := pm.(k8sclient.PodMetadata)
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: found metadata",
zap.String("serviceNameAndNamespace", svcAndNS),
zap.String("workload", metadata.Workload),
zap.String("node", metadata.Node),
)
return metadata
}

func (e *KubernetesMetadata) GetServiceAndNamespaceFromClusterIP(ip string) string {
if e.serviceWatcher == nil {
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: services not enabled in config")
return ""
}
if ip == "" {
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: no IP provided")
return ""
}
svcAndNS, ok := e.serviceWatcher.GetIPToServiceAndNamespace().Load(ip)
if !ok {
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: no mapping found", zap.String("ip", ip))
return ""
}
svcAndNSString := svcAndNS.(string)
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: found metadata",
zap.String("ip", ip),
zap.String("svcAndNS", svcAndNSString),
)
return svcAndNSString
}
117 changes: 104 additions & 13 deletions extension/k8smetadata/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package k8smetadata

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -14,51 +13,143 @@ import (
)

func TestKubernetesMetadata_GetPodMetadata(t *testing.T) {
esw := &k8sclient.EndpointSliceWatcher{
IPToPodMetadata: &sync.Map{},
}
esw := &k8sclient.EndpointSliceWatcher{}
esw.InitializeIPToPodMetadata()

const testIP = "1.2.3.4"
expected := k8sclient.PodMetadata{
Workload: "my-workload",
Namespace: "my-namespace",
Node: "my-node",
}
esw.IPToPodMetadata.Store(testIP, expected)
esw.GetIPToPodMetadata().Store(testIP, expected)

kMeta := &KubernetesMetadata{
logger: zap.NewNop(),
endpointSliceWatcher: esw,
config: &Config{
Objects: []string{"endpointslices"},
},
}

got := kMeta.GetPodMetadata(testIP)
got := kMeta.GetPodMetadataFromPodIP(testIP)
assert.Equal(t, expected, got, "GetPodMetadata should return the stored PodMetadata for %s", testIP)

unknown := kMeta.GetPodMetadata("9.9.9.9")
unknown := kMeta.GetPodMetadataFromPodIP("9.9.9.9")
assert.Equal(t, k8sclient.PodMetadata{}, unknown, "GetPodMetadata should return empty if the IP is not present")

unknown = kMeta.GetPodMetadata("")
unknown = kMeta.GetPodMetadataFromPodIP("")
assert.Equal(t, k8sclient.PodMetadata{}, unknown, "GetPodMetadata should return empty if the IP is empty")

kMetaDisabled := &KubernetesMetadata{
logger: zap.NewNop(),
config: &Config{
Objects: []string{"services"},
},
}
disabled := kMetaDisabled.GetPodMetadataFromPodIP(testIP)
assert.Equal(t, k8sclient.PodMetadata{}, disabled, "GetPodMetadata should return empty when endpointslices are disabled")
}

func TestKubernetesMetadata_GetPodMetadata_Incomplete(t *testing.T) {
esw := &k8sclient.EndpointSliceWatcher{
IPToPodMetadata: &sync.Map{},
}
esw := &k8sclient.EndpointSliceWatcher{}
esw.InitializeIPToPodMetadata()

const testIP = "2.2.2.2"
expected := k8sclient.PodMetadata{
Workload: "incomplete-workload",
Namespace: "",
Node: "",
}
esw.IPToPodMetadata.Store(testIP, expected)
esw.GetIPToPodMetadata().Store(testIP, expected)

kMeta := &KubernetesMetadata{
logger: zap.NewNop(),
endpointSliceWatcher: esw,
config: &Config{
Objects: []string{"endpointslices"},
},
}

got := kMeta.GetPodMetadata(testIP)
got := kMeta.GetPodMetadataFromPodIP(testIP)
assert.Equal(t, expected, got, "GetPodMetadata should return the stored incomplete PodMetadata for IP %s", testIP)
}

func TestKubernetesMetadata_GetPodMetadataFromServiceAndNamespace(t *testing.T) {
esw := &k8sclient.EndpointSliceWatcher{}
esw.InitializeServiceNamespaceToPodMetadata()

const svcKey = "myservice@dev"
expected := k8sclient.PodMetadata{
Workload: "my-workload",
Namespace: "dev",
Node: "node-xyz",
}
esw.GetServiceNamespaceToPodMetadata().Store(svcKey, expected)

kMeta := &KubernetesMetadata{
logger: zap.NewNop(),
endpointSliceWatcher: esw,
config: &Config{
Objects: []string{"endpointslices"},
},
}

got := kMeta.GetPodMetadataFromServiceAndNamespace(svcKey)
assert.Equal(t, expected, got, "GetPodMetadataFromService should return the stored PodMetadata for %s", svcKey)

unknown := kMeta.GetPodMetadataFromServiceAndNamespace("nosuchservice@dev")
assert.Equal(t, k8sclient.PodMetadata{}, unknown, "Expected empty result for unknown service key")

emptyVal := kMeta.GetPodMetadataFromServiceAndNamespace("")
assert.Equal(t, k8sclient.PodMetadata{}, emptyVal, "Expected empty result for empty service key")

kMetaDisabled := &KubernetesMetadata{
logger: zap.NewNop(),
config: &Config{
Objects: []string{"services"},
},
}
disabled := kMetaDisabled.GetPodMetadataFromServiceAndNamespace(svcKey)
assert.Equal(t, k8sclient.PodMetadata{}, disabled, "GetPodMetadataFromService should return empty when endpointslices are disabled")
}

func TestKubernetesMetadata_GetServiceAndNamespaceFromClusterIP(t *testing.T) {
mockSvcWatcher := &k8sclient.ServiceWatcher{}
mockSvcWatcher.InitializeIPToServiceAndNamespace()

const knownIP = "10.0.0.42"
const knownSvcNS = "myservice@mynamespace"
mockSvcWatcher.GetIPToServiceAndNamespace().Store(knownIP, knownSvcNS)

mockESWatcher := &k8sclient.EndpointSliceWatcher{}
mockESWatcher.InitializeIPToPodMetadata()
mockESWatcher.InitializeServiceNamespaceToPodMetadata()

kMeta := &KubernetesMetadata{
logger: zap.NewNop(),
endpointSliceWatcher: mockESWatcher,
serviceWatcher: mockSvcWatcher,
config: &Config{
Objects: []string{"services", "endpointslices"},
},
}

got := kMeta.GetServiceAndNamespaceFromClusterIP(knownIP)
assert.Equal(t, knownSvcNS, got, "Expected to retrieve myservice@mynamespace for IP %s", knownIP)

gotUnknown := kMeta.GetServiceAndNamespaceFromClusterIP("10.0.0.99")
assert.Equal(t, "", gotUnknown, "Expected empty string for unknown cluster IP")

gotEmpty := kMeta.GetServiceAndNamespaceFromClusterIP("")
assert.Equal(t, "", gotEmpty, "Expected empty string when IP is empty")

kMetaDisabled := &KubernetesMetadata{
logger: zap.NewNop(),
config: &Config{
Objects: []string{"endpointslices"},
},
}
disabled := kMetaDisabled.GetServiceAndNamespaceFromClusterIP(knownIP)
assert.Equal(t, "", disabled, "GetServiceAndNamespaceFromClusterIP should return empty when services are disabled")
}
Loading
Loading