diff --git a/cmd/config-translator/translator_test.go b/cmd/config-translator/translator_test.go index 147f6fd2b9..e9298c3807 100644 --- a/cmd/config-translator/translator_test.go +++ b/cmd/config-translator/translator_test.go @@ -68,6 +68,8 @@ func TestTracesConfig(t *testing.T) { func TestJMXConfig(t *testing.T) { checkIfSchemaValidateAsExpected(t, "../../translator/config/sampleSchema/validJMX.json", true, map[string]int{}) expectedErrorMap := map[string]int{} + expectedErrorMap["additional_property_not_allowed"] = 1 + expectedErrorMap["number_any_of"] = 1 expectedErrorMap["number_one_of"] = 1 expectedErrorMap["required"] = 1 checkIfSchemaValidateAsExpected(t, "../../translator/config/sampleSchema/invalidJMX.json", false, expectedErrorMap) diff --git a/translator/config/sampleSchema/invalidJMX.json b/translator/config/sampleSchema/invalidJMX.json index 2fdb7bf045..4e8514de0d 100644 --- a/translator/config/sampleSchema/invalidJMX.json +++ b/translator/config/sampleSchema/invalidJMX.json @@ -2,9 +2,7 @@ "metrics": { "metrics_collected": { "jmx": { - "jvm": { - "measurement": [] - } + "measurement": [] } } } diff --git a/translator/config/schema.json b/translator/config/schema.json index 4569df27a8..f09e369519 100644 --- a/translator/config/schema.json +++ b/translator/config/schema.json @@ -1320,9 +1320,6 @@ } }, "additionalProperties": false, - "required": [ - "endpoint" - ], "anyOf": [ { "required": [ diff --git a/translator/tocwconfig/sampleConfig/complete_linux_config.yaml b/translator/tocwconfig/sampleConfig/complete_linux_config.yaml index 227f00e973..83b9cb1a57 100644 --- a/translator/tocwconfig/sampleConfig/complete_linux_config.yaml +++ b/translator/tocwconfig/sampleConfig/complete_linux_config.yaml @@ -388,7 +388,7 @@ service: receivers: - telegraf_net - telegraf_diskio - metrics/jmx/0: + metrics/jmx/cloudwatch/0: exporters: - awscloudwatch processors: @@ -399,15 +399,15 @@ service: - ec2tagger receivers: - jmx/0 - metrics/jmx/1: + metrics/jmx/cloudwatch/1: exporters: - awscloudwatch processors: - filter/jmx/1 - resource/jmx - - cumulativetodelta/jmx - transform/jmx/1 - ec2tagger + - cumulativetodelta/jmx receivers: - jmx/1 traces/xray: diff --git a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml index b91a54d180..7bf6c91798 100644 --- a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml @@ -75,7 +75,7 @@ processors: send_batch_max_size: 0 send_batch_size: 8192 timeout: 1m0s - batch/jmx: + batch/jmx/amp: metadata_cardinality_limit: 1000 send_batch_max_size: 0 send_batch_size: 8192 @@ -178,18 +178,26 @@ service: receivers: - telegraf_cpu - telegraf_disk - metrics/jmx: + metrics/jmx/amp: exporters: - - awscloudwatch - prometheusremotewrite/amp processors: - filter/jmx - resource/jmx - - cumulativetodelta/jmx - - batch/jmx + - batch/jmx/amp - transform/jmx receivers: - jmx + metrics/jmx/cloudwatch: + exporters: + - awscloudwatch + processors: + - filter/jmx + - resource/jmx + - transform/jmx + - cumulativetodelta/jmx + receivers: + - jmx telemetry: logs: development: false diff --git a/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.conf b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.conf new file mode 100644 index 0000000000..9c37f8b52d --- /dev/null +++ b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.conf @@ -0,0 +1,19 @@ +[agent] + collection_jitter = "0s" + debug = false + flush_interval = "1s" + flush_jitter = "0s" + hostname = "" + interval = "60s" + logfile = "" + logtarget = "lumberjack" + metric_batch_size = 1000 + metric_buffer_limit = 10000 + omit_hostname = false + precision = "" + quiet = false + round_interval = false + +[outputs] + + [[outputs.cloudwatch]] diff --git a/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.json b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.json new file mode 100644 index 0000000000..06c225e7a5 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.json @@ -0,0 +1,43 @@ +{ + "metrics": { + "metrics_destinations": { + "amp": { + "workspace_id": "ws-12345" + }, + "cloudwatch": {} + }, + "metrics_collected": { + "jmx": [ + { + "jvm": { + "measurement": [ + "jvm.memory.heap.init", + { + "name": "jvm.memory.heap.used", + "rename": "JVM_MEM_HEAP_USED", + "unit": "unit" + }, + "jvm.memory.nonheap.init" + ] + }, + "append_dimensions": { + "k1": "v1" + } + }, + { + "kafka-consumer": { + "measurement": [ + { + "name": "kafka.consumer.fetch-rate", + "rename": "kafka.fetch-rate" + } + ] + }, + "append_dimensions": { + "k2": "v2" + } + } + ] + } + } +} \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml new file mode 100644 index 0000000000..66759372ba --- /dev/null +++ b/translator/tocwconfig/sampleConfig/jmx_eks_config_linux.yaml @@ -0,0 +1,216 @@ +exporters: + awscloudwatch: + force_flush_interval: 1m0s + max_datums_per_call: 1000 + max_values_per_datum: 150 + middleware: agenthealth/metrics + namespace: CWAgent + region: us-west-2 + resource_to_telemetry_conversion: + enabled: true + prometheusremotewrite/amp: + add_metric_suffixes: true + auth: + authenticator: sigv4auth + compression: "" + disable_keep_alives: false + endpoint: https://aps-workspaces.us-west-2.amazonaws.com/workspaces/ws-12345/api/v1/remote_write + export_created_metric: + enabled: false + http2_ping_timeout: 0s + http2_read_idle_timeout: 0s + max_batch_size_bytes: 3000000 + namespace: "" + proxy_url: "" + read_buffer_size: 0 + remote_write_queue: + enabled: true + num_consumers: 5 + queue_size: 10000 + resource_to_telemetry_conversion: + clear_after_copy: true + enabled: true + retry_on_failure: + enabled: true + initial_interval: 50ms + max_elapsed_time: 5m0s + max_interval: 30s + multiplier: 1.5 + randomization_factor: 0.5 + send_metadata: false + target_info: + enabled: true + timeout: 5s + tls: + ca_file: "" + cert_file: "" + include_system_ca_certs_pool: false + insecure: false + insecure_skip_verify: false + key_file: "" + max_version: "" + min_version: "" + reload_interval: 0s + server_name_override: "" + write_buffer_size: 524288 +extensions: + agenthealth/metrics: + is_usage_data_enabled: true + stats: + operations: + - PutMetricData + usage_flags: + mode: EC2 + region_type: ACJ + sigv4auth: + assume_role: + sts_region: us-west-2 + region: us-west-2 +processors: + batch/jmx/amp/0: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 1m0s + batch/jmx/amp/1: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 1m0s + cumulativetodelta/jmx: + exclude: + match_type: "" + include: + match_type: "" + initial_value: 2 + max_staleness: 0s + filter/jmx/0: + error_mode: propagate + logs: {} + metrics: + include: + match_type: strict + metric_names: + - jvm.memory.heap.init + - jvm.memory.heap.used + - jvm.memory.nonheap.init + spans: {} + traces: {} + filter/jmx/1: + error_mode: propagate + logs: {} + metrics: + include: + match_type: strict + metric_names: + - kafka.consumer.fetch-rate + spans: {} + traces: {} + resource/jmx/0: + attributes: + - action: upsert + converted_type: "" + from_attribute: "" + from_context: "" + key: k1 + pattern: "" + value: v1 + resource/jmx/1: + attributes: + - action: upsert + converted_type: "" + from_attribute: "" + from_context: "" + key: k2 + pattern: "" + value: v2 + transform/jmx/0: + error_mode: propagate + flatten_data: false + log_statements: [] + metric_statements: + - context: metric + statements: + - set(unit, "unit") where name == "jvm.memory.heap.used" + - set(name, "JVM_MEM_HEAP_USED") where name == "jvm.memory.heap.used" + trace_statements: [] + transform/jmx/1: + error_mode: propagate + flatten_data: false + log_statements: [] + metric_statements: + - context: metric + statements: + - set(name, "kafka.fetch-rate") where name == "kafka.consumer.fetch-rate" + trace_statements: [] +receivers: + otlp/jmx: + protocols: + http: + endpoint: 0.0.0.0:4314 + include_metadata: false + logs_url_path: /v1/logs + max_request_body_size: 0 + metrics_url_path: /v1/metrics + traces_url_path: /v1/traces +service: + extensions: + - agenthealth/metrics + - sigv4auth + pipelines: + metrics/jmx/amp/0: + exporters: + - prometheusremotewrite/amp + processors: + - filter/jmx/0 + - resource/jmx/0 + - transform/jmx/0 + - batch/jmx/amp/0 + receivers: + - otlp/jmx + metrics/jmx/amp/1: + exporters: + - prometheusremotewrite/amp + processors: + - filter/jmx/1 + - resource/jmx/1 + - transform/jmx/1 + - batch/jmx/amp/1 + receivers: + - otlp/jmx + metrics/jmx/cloudwatch/0: + exporters: + - awscloudwatch + processors: + - filter/jmx/0 + - resource/jmx/0 + - transform/jmx/0 + - cumulativetodelta/jmx + receivers: + - otlp/jmx + metrics/jmx/cloudwatch/1: + exporters: + - awscloudwatch + processors: + - filter/jmx/1 + - resource/jmx/1 + - transform/jmx/1 + - cumulativetodelta/jmx + receivers: + - otlp/jmx + telemetry: + logs: + development: false + disable_caller: false + disable_stacktrace: false + encoding: console + level: info + sampling: + enabled: true + initial: 2 + thereafter: 500 + tick: 10s + metrics: + address: "" + level: None + traces: {} diff --git a/translator/tocwconfig/tocwconfig_unix_test.go b/translator/tocwconfig/tocwconfig_unix_test.go index a4ab547371..f8206e3a17 100644 --- a/translator/tocwconfig/tocwconfig_unix_test.go +++ b/translator/tocwconfig/tocwconfig_unix_test.go @@ -45,11 +45,19 @@ func TestJMXConfigLinux(t *testing.T) { context.CurrentContext().SetMode(config.ModeEC2) testutil.SetPrometheusRemoteWriteTestingEnv(t) t.Setenv("JMX_JAR_PATH", "../../packaging/opentelemetry-jmx-metrics.jar") - context.CurrentContext().SetMode(config.ModeEC2) expectedEnvVars := map[string]string{} checkTranslation(t, "jmx_config_linux", "linux", expectedEnvVars, "") } +func TestJMXConfigEKS(t *testing.T) { + resetContext(t) + testutil.SetPrometheusRemoteWriteTestingEnv(t) + context.CurrentContext().SetMode(config.ModeEC2) + context.CurrentContext().SetRunInContainer(true) + expectedEnvVars := map[string]string{} + checkTranslation(t, "jmx_eks_config_linux", "linux", expectedEnvVars, "") +} + func TestDeltaConfigLinux(t *testing.T) { resetContext(t) context.CurrentContext().SetMode(config.ModeEC2) diff --git a/translator/translate/otel/common/options.go b/translator/translate/otel/common/options.go index fc58bd9203..dd3d7edb74 100644 --- a/translator/translate/otel/common/options.go +++ b/translator/translate/otel/common/options.go @@ -28,3 +28,27 @@ func (p *NameProvider) Name() string { func (p *NameProvider) SetName(name string) { p.name = name } + +type IndexSetter interface { + SetIndex(int) +} + +func WithIndex(index int) TranslatorOption { + return func(target any) { + if setter, ok := target.(IndexSetter); ok { + setter.SetIndex(index) + } + } +} + +type IndexProvider struct { + index int +} + +func (p *IndexProvider) Index() int { + return p.index +} + +func (p *IndexProvider) SetIndex(index int) { + p.index = index +} diff --git a/translator/translate/otel/common/options_test.go b/translator/translate/otel/common/options_test.go index 1ed2534217..4aeca24f08 100644 --- a/translator/translate/otel/common/options_test.go +++ b/translator/translate/otel/common/options_test.go @@ -15,3 +15,10 @@ func TestWithName(t *testing.T) { opt(p) assert.Equal(t, "b", p.Name()) } + +func TestWithIndex(t *testing.T) { + p := &IndexProvider{index: -1} + opt := WithIndex(1) + opt(p) + assert.Equal(t, 1, p.Index()) +} diff --git a/translator/translate/otel/exporter/debug/translator.go b/translator/translate/otel/exporter/debug/translator.go index d5fe791af9..34a4831ba1 100644 --- a/translator/translate/otel/exporter/debug/translator.go +++ b/translator/translate/otel/exporter/debug/translator.go @@ -16,19 +16,22 @@ import ( const defaultSamplingThereafter = 500 type translator struct { - name string + common.NameProvider factory exporter.Factory } var _ common.Translator[component.Config] = (*translator)(nil) -func NewTranslator() common.Translator[component.Config] { +func NewTranslator(opts ...common.TranslatorOption) common.Translator[component.Config] { t := &translator{factory: debugexporter.NewFactory()} + for _, opt := range opts { + opt(t) + } return t } func (t *translator) ID() component.ID { - return component.NewIDWithName(t.factory.Type(), common.AppSignals) + return component.NewIDWithName(t.factory.Type(), t.Name()) } func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { diff --git a/translator/translate/otel/exporter/debug/translator_test.go b/translator/translate/otel/exporter/debug/translator_test.go index 101e47cc22..535a0670d6 100644 --- a/translator/translate/otel/exporter/debug/translator_test.go +++ b/translator/translate/otel/exporter/debug/translator_test.go @@ -16,8 +16,8 @@ import ( ) func TestTranslator(t *testing.T) { - tt := NewTranslator() - assert.EqualValues(t, "debug/application_signals", tt.ID().String()) + tt := NewTranslator(common.WithName("test")) + assert.EqualValues(t, "debug/test", tt.ID().String()) got, err := tt.Translate(confmap.New()) assert.Error(t, err) assert.Nil(t, got) diff --git a/translator/translate/otel/pipeline/applicationsignals/translator.go b/translator/translate/otel/pipeline/applicationsignals/translator.go index bc922faab9..45d247d3ab 100644 --- a/translator/translate/otel/pipeline/applicationsignals/translator.go +++ b/translator/translate/otel/pipeline/applicationsignals/translator.go @@ -46,7 +46,7 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators } translators := &common.ComponentTranslators{ - Receivers: common.NewTranslatorMap(otlp.NewTranslatorWithName(common.AppSignals, otlp.WithDataType(t.dataType))), + Receivers: common.NewTranslatorMap(otlp.NewTranslator(common.WithName(common.AppSignals), otlp.WithDataType(t.dataType))), Processors: common.NewTranslatorMap[component.Config](), Exporters: common.NewTranslatorMap[component.Config](), Extensions: common.NewTranslatorMap[component.Config](), @@ -56,7 +56,7 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators translators.Processors.Set(awsapplicationsignals.NewTranslator(awsapplicationsignals.WithDataType(t.dataType))) if enabled, _ := common.GetBool(conf, common.AgentDebugConfigKey); enabled { - translators.Exporters.Set(debug.NewTranslator()) + translators.Exporters.Set(debug.NewTranslator(common.WithName(common.AppSignals))) } if t.dataType == component.DataTypeTraces { diff --git a/translator/translate/otel/pipeline/host/translators.go b/translator/translate/otel/pipeline/host/translators.go index 88effe5e9f..93caea2676 100644 --- a/translator/translate/otel/pipeline/host/translators.go +++ b/translator/translate/otel/pipeline/host/translators.go @@ -45,7 +45,7 @@ func NewTranslators(conf *confmap.Conf, os string) (pipeline.TranslatorMap, erro for index := range v { deltaReceivers.Set(otlpreceiver.NewTranslator( otlpreceiver.WithDataType(component.DataTypeMetrics), - otlpreceiver.WithIndex(index), + common.WithIndex(index), )) } case map[string]any: diff --git a/translator/translate/otel/pipeline/jmx/translator.go b/translator/translate/otel/pipeline/jmx/translator.go index 4fb455e078..441e0e5405 100644 --- a/translator/translate/otel/pipeline/jmx/translator.go +++ b/translator/translate/otel/pipeline/jmx/translator.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" + "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/prometheusremotewrite" @@ -23,6 +24,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/resourceprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/jmx" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/otlp" ) const ( @@ -34,29 +36,32 @@ var ( ) type translator struct { - name string - index int + name string + common.IndexProvider + destination string } -type Option func(any) - -func WithIndex(index int) Option { +func WithDestination(destination string) common.TranslatorOption { return func(a any) { if t, ok := a.(*translator); ok { - t.index = index + t.destination = destination } } } var _ common.Translator[*common.ComponentTranslators] = (*translator)(nil) -func NewTranslator(opts ...Option) common.Translator[*common.ComponentTranslators] { - t := &translator{name: common.PipelineNameJmx, index: -1} +func NewTranslator(opts ...common.TranslatorOption) common.Translator[*common.ComponentTranslators] { + t := &translator{name: common.PipelineNameJmx} + t.SetIndex(-1) for _, opt := range opts { opt(t) } - if t.index != -1 { - t.name += "/" + strconv.Itoa(t.index) + if t.destination != "" { + t.name += "/" + t.destination + } + if t.Index() != -1 { + t.name += "/" + strconv.Itoa(t.Index()) } return t } @@ -72,41 +77,36 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.JmxConfigKey} } - if !hasMeasurements(conf, t.index) { + if !hasMeasurements(conf, t.Index()) { baseKey := common.JmxConfigKey - if t.index != -1 { - baseKey = fmt.Sprintf("%s[%d]", baseKey, t.index) + if t.Index() != -1 { + baseKey = fmt.Sprintf("%s[%d]", baseKey, t.Index()) } return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.ConfigKey(baseKey, placeholderTarget, common.MeasurementKey)} } translators := common.ComponentTranslators{ - Receivers: common.NewTranslatorMap(jmx.NewTranslator(jmx.WithIndex(t.index))), + Receivers: common.NewTranslatorMap[component.Config](), Processors: common.NewTranslatorMap( - filterprocessor.NewTranslator(filterprocessor.WithName(common.PipelineNameJmx), filterprocessor.WithIndex(t.index)), - resourceprocessor.NewTranslator(resourceprocessor.WithName(common.PipelineNameJmx)), - cumulativetodeltaprocessor.NewTranslator(common.WithName(common.PipelineNameJmx), cumulativetodeltaprocessor.WithConfigKeys(common.JmxConfigKey)), + filterprocessor.NewTranslator(common.WithName(common.PipelineNameJmx), common.WithIndex(t.Index())), ), Exporters: common.NewTranslatorMap[component.Config](), Extensions: common.NewTranslatorMap[component.Config](), } - if !conf.IsSet(metricsDestinationsKey) || conf.IsSet(common.ConfigKey(metricsDestinationsKey, "cloudwatch")) { - translators.Exporters.Set(awscloudwatch.NewTranslator()) - translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})) - } - if conf.IsSet(metricsDestinationsKey) && conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.AMPKey)) { - translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey)) - translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey)) - if conf.IsSet(common.MetricsAggregationDimensionsKey) { - translators.Processors.Set(rollupprocessor.NewTranslator()) + if context.CurrentContext().RunInContainer() { + translators.Receivers.Set(otlp.NewTranslator(common.WithName(common.PipelineNameJmx))) + if hasAppendDimensions(conf, t.Index()) { + translators.Processors.Set(resourceprocessor.NewTranslator(common.WithName(common.PipelineNameJmx), common.WithIndex(t.Index()))) } - translators.Extensions.Set(sigv4auth.NewTranslator()) + } else { + translators.Receivers.Set(jmx.NewTranslator(jmx.WithIndex(t.Index()))) + translators.Processors.Set(resourceprocessor.NewTranslator(common.WithName(common.PipelineNameJmx))) } mdt := metricsdecorator.NewTranslator( metricsdecorator.WithName(common.PipelineNameJmx), - metricsdecorator.WithIndex(t.index), + metricsdecorator.WithIndex(t.Index()), metricsdecorator.WithConfigKey(common.JmxConfigKey), ) if mdt.IsSet(conf) { @@ -117,6 +117,30 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators translators.Processors.Set(ec2taggerprocessor.NewTranslator()) } + switch t.destination { + case defaultDestination, common.CloudWatchKey: + if !conf.IsSet(metricsDestinationsKey) || conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.CloudWatchKey)) { + translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(common.PipelineNameJmx), cumulativetodeltaprocessor.WithConfigKeys(common.JmxConfigKey))) + translators.Exporters.Set(awscloudwatch.NewTranslator()) + translators.Extensions.Set(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})) + } else { + return nil, fmt.Errorf("pipeline (%s) does not have destination (%s) in configuration", t.name, t.destination) + } + case common.AMPKey: + if conf.IsSet(metricsDestinationsKey) && conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.AMPKey)) { + translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey)) + translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey)) + if conf.IsSet(common.MetricsAggregationDimensionsKey) { + translators.Processors.Set(rollupprocessor.NewTranslator()) + } + translators.Extensions.Set(sigv4auth.NewTranslator()) + } else { + return nil, fmt.Errorf("pipeline (%s) does not have destination (%s) in configuration", t.name, t.destination) + } + default: + return nil, fmt.Errorf("pipeline (%s) does not support destination (%s) in configuration", t.name, t.destination) + } + return &translators, nil } @@ -136,3 +160,15 @@ func hasMeasurements(conf *confmap.Conf, index int) bool { } return result } + +func hasAppendDimensions(conf *confmap.Conf, index int) bool { + jmxMap := common.GetIndexedMap(conf, common.JmxConfigKey, index) + if len(jmxMap) == 0 { + return false + } + appendDimensions, ok := jmxMap[common.AppendDimensionsKey].(map[string]any) + if !ok { + return false + } + return len(appendDimensions) > 0 +} diff --git a/translator/translate/otel/pipeline/jmx/translator_test.go b/translator/translate/otel/pipeline/jmx/translator_test.go index c8078a41b9..99e558e3f4 100644 --- a/translator/translate/otel/pipeline/jmx/translator_test.go +++ b/translator/translate/otel/pipeline/jmx/translator_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/confmap" "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" + "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" ) @@ -24,10 +25,12 @@ func TestTranslator(t *testing.T) { extensions []string } testCases := map[string]struct { - input map[string]any - index int - want *want - wantErr error + input map[string]any + index int + destination string + isContainer bool + want *want + wantErr error }{ "WithoutJMX": { input: map[string]any{}, @@ -128,6 +131,57 @@ func TestTranslator(t *testing.T) { extensions: []string{"agenthealth/metrics"}, }, }, + "WithValidJMX/Object/EKS": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "jmx": map[string]any{ + "jvm": map[string]any{ + "measurement": []any{ + "jvm.memory.heap.init", + }, + }, + }, + }, + }, + }, + index: -1, + isContainer: true, + want: &want{ + pipelineID: "metrics/jmx", + receivers: []string{"otlp/jmx"}, + processors: []string{"filter/jmx", "cumulativetodelta/jmx"}, + exporters: []string{"awscloudwatch"}, + extensions: []string{"agenthealth/metrics"}, + }, + }, + "WithValidJMX/Object/EKS/AppendDimensions": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "jmx": map[string]any{ + "jvm": map[string]any{ + "measurement": []any{ + "jvm.memory.heap.init", + }, + }, + "append_dimensions": map[string]any{ + "key": "value", + }, + }, + }, + }, + }, + index: -1, + isContainer: true, + want: &want{ + pipelineID: "metrics/jmx", + receivers: []string{"otlp/jmx"}, + processors: []string{"filter/jmx", "resource/jmx", "cumulativetodelta/jmx"}, + exporters: []string{"awscloudwatch"}, + extensions: []string{"agenthealth/metrics"}, + }, + }, "WithValidJMX/Object/AMP": { input: map[string]any{ "metrics": map[string]any{ @@ -148,11 +202,42 @@ func TestTranslator(t *testing.T) { }, }, }, - index: -1, + index: -1, + destination: "amp", want: &want{ - pipelineID: "metrics/jmx", + pipelineID: "metrics/jmx/amp", receivers: []string{"jmx"}, - processors: []string{"filter/jmx", "resource/jmx", "cumulativetodelta/jmx", "batch/jmx"}, + processors: []string{"filter/jmx", "resource/jmx", "batch/jmx/amp"}, + exporters: []string{"prometheusremotewrite/amp"}, + extensions: []string{"sigv4auth"}, + }, + }, + "WithValidJMX/Object/AMP/EKS": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]any{ + "jmx": map[string]any{ + "jvm": map[string]any{ + "measurement": []any{ + "jvm.memory.heap.init", + }, + }, + }, + }, + }, + }, + index: -1, + destination: "amp", + isContainer: true, + want: &want{ + pipelineID: "metrics/jmx/amp", + receivers: []string{"otlp/jmx"}, + processors: []string{"filter/jmx", "batch/jmx/amp"}, exporters: []string{"prometheusremotewrite/amp"}, extensions: []string{"sigv4auth"}, }, @@ -176,11 +261,12 @@ func TestTranslator(t *testing.T) { }, }, }, - index: -1, + index: -1, + destination: "cloudwatch", want: &want{ - pipelineID: "metrics/jmx", + pipelineID: "metrics/jmx/cloudwatch", receivers: []string{"jmx"}, - processors: []string{"filter/jmx", "resource/jmx", "cumulativetodelta/jmx", "transform/jmx"}, + processors: []string{"filter/jmx", "resource/jmx", "transform/jmx", "cumulativetodelta/jmx"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics"}, }, @@ -214,15 +300,53 @@ func TestTranslator(t *testing.T) { want: &want{ pipelineID: "metrics/jmx/0", receivers: []string{"jmx/0"}, - processors: []string{"filter/jmx/0", "resource/jmx", "cumulativetodelta/jmx", "transform/jmx/0", "ec2tagger"}, + processors: []string{"filter/jmx/0", "resource/jmx", "transform/jmx/0", "ec2tagger", "cumulativetodelta/jmx"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics"}, }, }, + "WithValidJMX/Array/AMP/EKS": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]any{ + "jmx": []any{ + map[string]any{ + "jvm": map[string]any{ + "measurement": []any{ + "jvm.memory.heap.init", + map[string]any{ + "name": "jvm.classes.loaded", + "rename": "JVM.CLASSES.LOADED", + "unit": "Count", + }, + }, + }, + }, + }, + }, + }, + }, + index: 0, + destination: "amp", + isContainer: true, + want: &want{ + pipelineID: "metrics/jmx/amp/0", + receivers: []string{"otlp/jmx"}, + processors: []string{"filter/jmx/0", "transform/jmx/0", "batch/jmx/amp/0"}, + exporters: []string{"prometheusremotewrite/amp"}, + extensions: []string{"sigv4auth"}, + }, + }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - tt := NewTranslator(WithIndex(testCase.index)) + context.CurrentContext().SetRunInContainer(testCase.isContainer) + tt := NewTranslator(common.WithIndex(testCase.index), WithDestination(testCase.destination)) conf := confmap.NewFromStringMap(testCase.input) got, err := tt.Translate(conf) require.Equal(t, testCase.wantErr, err) diff --git a/translator/translate/otel/pipeline/jmx/translators.go b/translator/translate/otel/pipeline/jmx/translators.go index 8e5b310aa1..0fac9bbba4 100644 --- a/translator/translate/otel/pipeline/jmx/translators.go +++ b/translator/translate/otel/pipeline/jmx/translators.go @@ -10,15 +10,34 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline" ) +const ( + // defaultDestination if not defined + defaultDestination = "" +) + func NewTranslators(conf *confmap.Conf) pipeline.TranslatorMap { translators := common.NewTranslatorMap[*common.ComponentTranslators]() + var destinations []string + if conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.CloudWatchKey)) { + destinations = append(destinations, common.CloudWatchKey) + } + if conf.IsSet(common.ConfigKey(metricsDestinationsKey, common.AMPKey)) { + destinations = append(destinations, common.AMPKey) + } + if len(destinations) == 0 { + destinations = append(destinations, defaultDestination) + } switch v := conf.Get(common.JmxConfigKey).(type) { case []any: for index := range v { - translators.Set(NewTranslator(WithIndex(index))) + for _, destination := range destinations { + translators.Set(NewTranslator(common.WithIndex(index), WithDestination(destination))) + } } case map[string]any: - translators.Set(NewTranslator()) + for _, destination := range destinations { + translators.Set(NewTranslator(WithDestination(destination))) + } } return translators } diff --git a/translator/translate/otel/pipeline/jmx/translators_test.go b/translator/translate/otel/pipeline/jmx/translators_test.go index 5c02d6aa49..5f793b9496 100644 --- a/translator/translate/otel/pipeline/jmx/translators_test.go +++ b/translator/translate/otel/pipeline/jmx/translators_test.go @@ -35,6 +35,23 @@ func TestTranslators(t *testing.T) { component.MustNewIDWithName("metrics", "jmx"), }, }, + "WithSingle/Destinations": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]any{ + "jmx": map[string]any{}, + }, + }, + }, + want: []component.ID{ + component.MustNewIDWithName("metrics", "jmx/amp"), + }, + }, "WithMultiple": { input: map[string]any{ "metrics": map[string]any{ @@ -51,6 +68,30 @@ func TestTranslators(t *testing.T) { component.MustNewIDWithName("metrics", "jmx/1"), }, }, + "WithMultiple/Destinations": { + input: map[string]any{ + "metrics": map[string]any{ + "metrics_destinations": map[string]any{ + "cloudwatch": map[string]any{}, + "amp": map[string]any{ + "workspace_id": "ws-12345", + }, + }, + "metrics_collected": map[string]any{ + "jmx": []any{ + map[string]any{}, + map[string]any{}, + }, + }, + }, + }, + want: []component.ID{ + component.MustNewIDWithName("metrics", "jmx/cloudwatch/0"), + component.MustNewIDWithName("metrics", "jmx/amp/0"), + component.MustNewIDWithName("metrics", "jmx/cloudwatch/1"), + component.MustNewIDWithName("metrics", "jmx/amp/1"), + }, + }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { diff --git a/translator/translate/otel/processor/filterprocessor/translator.go b/translator/translate/otel/processor/filterprocessor/translator.go index 26d5c5811e..f5bf2d8707 100644 --- a/translator/translate/otel/processor/filterprocessor/translator.go +++ b/translator/translate/otel/processor/filterprocessor/translator.go @@ -20,38 +20,21 @@ const ( ) type translator struct { - name string - index int + common.NameProvider + common.IndexProvider factory processor.Factory } -type Option func(any) - -func WithName(name string) Option { - return func(a any) { - if t, ok := a.(*translator); ok { - t.name = name - } - } -} - -func WithIndex(index int) Option { - return func(a any) { - if t, ok := a.(*translator); ok { - t.index = index - } - } -} - var _ common.Translator[component.Config] = (*translator)(nil) -func NewTranslator(opts ...Option) common.Translator[component.Config] { - t := &translator{index: -1, factory: filterprocessor.NewFactory()} +func NewTranslator(opts ...common.TranslatorOption) common.Translator[component.Config] { + t := &translator{factory: filterprocessor.NewFactory()} + t.SetIndex(-1) for _, opt := range opts { opt(t) } - if t.index != -1 { - t.name += "/" + strconv.Itoa(t.index) + if t.Index() != -1 { + t.SetName(t.Name() + "/" + strconv.Itoa(t.Index())) } return t } @@ -59,7 +42,7 @@ func NewTranslator(opts ...Option) common.Translator[component.Config] { var _ common.Translator[component.Config] = (*translator)(nil) func (t *translator) ID() component.ID { - return component.NewIDWithName(t.factory.Type(), t.name) + return component.NewIDWithName(t.factory.Type(), t.Name()) } // Translate creates a processor config based on the fields in the @@ -71,7 +54,7 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { cfg := t.factory.CreateDefaultConfig().(*filterprocessor.Config) - jmxMap := common.GetIndexedMap(conf, common.JmxConfigKey, t.index) + jmxMap := common.GetIndexedMap(conf, common.JmxConfigKey, t.Index()) var includeMetricNames []string for _, jmxTarget := range common.JmxTargets { diff --git a/translator/translate/otel/processor/filterprocessor/translator_test.go b/translator/translate/otel/processor/filterprocessor/translator_test.go index dd16f11ce4..f069782296 100644 --- a/translator/translate/otel/processor/filterprocessor/translator_test.go +++ b/translator/translate/otel/processor/filterprocessor/translator_test.go @@ -119,7 +119,7 @@ func TestTranslator(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - tt := NewTranslator(WithName("jmx"), WithIndex(testCase.index)) + tt := NewTranslator(common.WithName("jmx"), common.WithIndex(testCase.index)) require.EqualValues(t, testCase.wantID, tt.ID().String()) conf := confmap.NewFromStringMap(testCase.input) got, err := tt.Translate(conf) diff --git a/translator/translate/otel/processor/resourceprocessor/translator.go b/translator/translate/otel/processor/resourceprocessor/translator.go index 9dfb46d8a6..d8b77a5c37 100644 --- a/translator/translate/otel/processor/resourceprocessor/translator.go +++ b/translator/translate/otel/processor/resourceprocessor/translator.go @@ -5,44 +5,42 @@ package resourceprocessor import ( "fmt" + "strconv" + "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/processor" + "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" ) type translator struct { - name string + common.NameProvider + common.IndexProvider factory processor.Factory } -type Option func(any) - -func WithName(name string) Option { - return func(a any) { - if t, ok := a.(*translator); ok { - t.name = name - } - } -} - var _ common.Translator[component.Config] = (*translator)(nil) -func NewTranslator(opts ...Option) common.Translator[component.Config] { +func NewTranslator(opts ...common.TranslatorOption) common.Translator[component.Config] { t := &translator{factory: resourceprocessor.NewFactory()} + t.SetIndex(-1) for _, opt := range opts { opt(t) } + if t.Index() != -1 { + t.SetName(t.Name() + "/" + strconv.Itoa(t.Index())) + } return t } var _ common.Translator[component.Config] = (*translator)(nil) func (t *translator) ID() component.ID { - return component.NewIDWithName(t.factory.Type(), t.name) + return component.NewIDWithName(t.factory.Type(), t.Name()) } // Translate creates a processor config based on the fields in the @@ -53,9 +51,30 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { } cfg := t.factory.CreateDefaultConfig().(*resourceprocessor.Config) - + var attributes []any + if strings.HasPrefix(t.Name(), common.PipelineNameJmx) { + attributes = t.getJMXAttributes(conf) + } + if len(attributes) == 0 { + baseKey := common.JmxConfigKey + if t.Index() != -1 { + baseKey = fmt.Sprintf("%s[%d]", baseKey, t.Index()) + } + return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.ConfigKey(baseKey, common.AppendDimensionsKey)} + } c := confmap.NewFromStringMap(map[string]any{ - "attributes": []any{ + "attributes": attributes, + }) + if err := c.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("unable to unmarshal resource processor: %w", err) + } + + return cfg, nil +} + +func (t *translator) getJMXAttributes(conf *confmap.Conf) []any { + if !context.CurrentContext().RunInContainer() { + return []any{ map[string]any{ "action": "delete", "pattern": "telemetry.sdk.*", @@ -65,12 +84,20 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { "key": "service.name", "value": "unknown_service:java", }, - }, - }) - - if err := c.Unmarshal(&cfg); err != nil { - return nil, fmt.Errorf("unable to unmarshal resource processor: %w", err) + } } - - return cfg, nil + jmxMap := common.GetIndexedMap(conf, common.JmxConfigKey, t.Index()) + appendDimensions, ok := jmxMap[common.AppendDimensionsKey].(map[string]any) + if !ok { + return nil + } + var attributes []any + for key, value := range appendDimensions { + attributes = append(attributes, map[string]any{ + "action": "upsert", + "key": key, + "value": value, + }) + } + return attributes } diff --git a/translator/translate/otel/processor/resourceprocessor/translator_test.go b/translator/translate/otel/processor/resourceprocessor/translator_test.go index 6137269dcd..5a76b2feb8 100644 --- a/translator/translate/otel/processor/resourceprocessor/translator_test.go +++ b/translator/translate/otel/processor/resourceprocessor/translator_test.go @@ -11,18 +11,21 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" + "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" ) func TestTranslator(t *testing.T) { testCases := map[string]struct { - name string - input map[string]any - wantID string - want *confmap.Conf - wantErr error + name string + index int + isContainer bool + input map[string]any + wantID string + want *confmap.Conf + wantErr error }{ - "ConfigWithNoJmxSet": { + "WithoutJMX": { name: common.PipelineNameJmx, input: map[string]any{ "metrics": map[string]any{ @@ -31,18 +34,24 @@ func TestTranslator(t *testing.T) { }, }, }, + index: -1, wantID: "resource/jmx", wantErr: &common.MissingKeyError{ID: component.MustNewIDWithName("resource", "jmx"), JsonKey: common.JmxConfigKey}, }, - "ConfigWithJmx": { + "WithJMX": { name: common.PipelineNameJmx, input: map[string]any{ "metrics": map[string]any{ "metrics_collected": map[string]any{ - "jmx": map[string]any{}, + "jmx": map[string]any{ + "append_dimensions": map[string]any{ + "unused": "by resource processor", + }, + }, }, }, }, + index: -1, wantID: "resource/jmx", want: confmap.NewFromStringMap(map[string]any{ "attributes": []any{ @@ -58,15 +67,115 @@ func TestTranslator(t *testing.T) { }, }), }, + "WithJMX/EKS/NoAppendDimensions": { + name: common.PipelineNameJmx, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "jmx": map[string]any{}, + }, + }, + }, + index: -1, + isContainer: true, + wantID: "resource/jmx", + wantErr: &common.MissingKeyError{ + ID: component.MustNewIDWithName("resource", "jmx"), + JsonKey: "metrics::metrics_collected::jmx::append_dimensions", + }, + }, + "WithJMX/EKS/AppendDimensions": { + name: common.PipelineNameJmx, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "jmx": map[string]any{ + "append_dimensions": map[string]any{ + "k1": "v1", + }, + }, + }, + }, + }, + index: -1, + isContainer: true, + wantID: "resource/jmx", + want: confmap.NewFromStringMap(map[string]any{ + "attributes": []any{ + map[string]any{ + "action": "upsert", + "key": "k1", + "value": "v1", + }, + }, + }), + }, + "WithJMX/Array/EKS/InvalidAppendDimensions": { + name: common.PipelineNameJmx, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "jmx": []any{ + map[string]any{ + "append_dimensions": []any{ + "invalid", + }, + }, + }, + }, + }, + }, + index: 0, + isContainer: true, + wantID: "resource/jmx/0", + wantErr: &common.MissingKeyError{ + ID: component.MustNewIDWithName("resource", "jmx/0"), + JsonKey: "metrics::metrics_collected::jmx[0]::append_dimensions", + }, + }, + "WithJMX/Array/EKS/AppendDimensions": { + name: common.PipelineNameJmx, + input: map[string]any{ + "metrics": map[string]any{ + "metrics_collected": map[string]any{ + "jmx": []any{ + map[string]any{ + "append_dimensions": map[string]any{ + "k1": "v1", + }, + }, + map[string]any{ + "append_dimensions": map[string]any{ + "k2": "v2", + }, + }, + }, + }, + }, + }, + index: 1, + isContainer: true, + wantID: "resource/jmx/1", + want: confmap.NewFromStringMap(map[string]any{ + "attributes": []any{ + map[string]any{ + "action": "upsert", + "key": "k2", + "value": "v2", + }, + }, + }), + }, } factory := resourceprocessor.NewFactory() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - tt := NewTranslator(WithName(testCase.name)) + context.CurrentContext().SetRunInContainer(testCase.isContainer) + tt := NewTranslator(common.WithName(testCase.name), common.WithIndex(testCase.index)) conf := confmap.NewFromStringMap(testCase.input) got, err := tt.Translate(conf) assert.EqualValues(t, testCase.wantID, tt.ID().String()) - assert.Equal(t, err, testCase.wantErr) + assert.Equal(t, testCase.wantErr, err) if err == nil { assert.NotNil(t, got) gotCfg, ok := got.(*resourceprocessor.Config) diff --git a/translator/translate/otel/receiver/otlp/translator.go b/translator/translate/otel/receiver/otlp/translator.go index 0c2d76b180..ba39e06db3 100644 --- a/translator/translate/otel/receiver/otlp/translator.go +++ b/translator/translate/otel/receiver/otlp/translator.go @@ -22,6 +22,7 @@ const ( defaultHttpEndpoint = "127.0.0.1:4318" defaultAppSignalsGrpcEndpoint = "0.0.0.0:4315" defaultAppSignalsHttpEndpoint = "0.0.0.0:4316" + defaultJMXHttpEndpoint = "0.0.0.0:4314" ) var ( @@ -32,62 +33,51 @@ var ( ) type translator struct { - name string + common.NameProvider + common.IndexProvider dataType component.DataType - index int factory receiver.Factory } -type Option interface { - apply(t *translator) -} - -type optionFunc func(t *translator) - -func (o optionFunc) apply(t *translator) { - o(t) -} - // WithDataType determines where the translator should look to find // the configuration. -func WithDataType(dataType component.DataType) Option { - return optionFunc(func(t *translator) { - t.dataType = dataType - }) -} -func WithIndex(index int) Option { - return optionFunc(func(t *translator) { - t.index = index - }) +func WithDataType(dataType component.DataType) common.TranslatorOption { + return func(target any) { + if t, ok := target.(*translator); ok { + t.dataType = dataType + } + } } var _ common.Translator[component.Config] = (*translator)(nil) -func NewTranslator(opts ...Option) common.Translator[component.Config] { - return NewTranslatorWithName("", opts...) -} - -func NewTranslatorWithName(name string, opts ...Option) common.Translator[component.Config] { - t := &translator{name: name, index: -1, factory: otlpreceiver.NewFactory()} +func NewTranslator(opts ...common.TranslatorOption) common.Translator[component.Config] { + t := &translator{factory: otlpreceiver.NewFactory()} + t.SetIndex(-1) for _, opt := range opts { - opt.apply(t) + opt(t) } - if name == "" && t.dataType.String() != "" { - t.name = t.dataType.String() - if t.index != -1 { - t.name += strconv.Itoa(t.index) + if t.Name() == "" && t.dataType.String() != "" { + t.SetName(t.dataType.String()) + if t.Index() != -1 { + t.SetName(t.Name() + strconv.Itoa(t.Index())) } } return t } func (t *translator) ID() component.ID { - return component.NewIDWithName(t.factory.Type(), t.name) + return component.NewIDWithName(t.factory.Type(), t.Name()) } func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { cfg := t.factory.CreateDefaultConfig().(*otlpreceiver.Config) - + if t.Name() == common.PipelineNameJmx { + cfg.GRPC = nil + cfg.HTTP.Endpoint = defaultJMXHttpEndpoint + return cfg, nil + } + // init default configuration configKey, ok := configKeys[t.dataType] if !ok { return nil, fmt.Errorf("no config key defined for data type: %s", t.dataType) @@ -95,7 +85,7 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { cfg.GRPC.NetAddr.Endpoint = defaultGrpcEndpoint cfg.HTTP.Endpoint = defaultHttpEndpoint - if t.name == common.AppSignals { + if t.Name() == common.AppSignals { appSignalsConfigKeys, ok := common.AppSignalsConfigKeys[t.dataType] if !ok { return nil, fmt.Errorf("no application_signals config key defined for data type: %s", t.dataType) @@ -114,8 +104,8 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { } var otlpKeyMap map[string]interface{} - if otlpSlice := common.GetArray[any](conf, configKey); t.index != -1 && len(otlpSlice) > t.index { - otlpKeyMap = otlpSlice[t.index].(map[string]interface{}) + if otlpSlice := common.GetArray[any](conf, configKey); t.Index() != -1 && len(otlpSlice) > t.Index() { + otlpKeyMap = otlpSlice[t.Index()].(map[string]interface{}) } else { otlpKeyMap = conf.Get(configKey).(map[string]interface{}) } diff --git a/translator/translate/otel/receiver/otlp/translator_test.go b/translator/translate/otel/receiver/otlp/translator_test.go index b9209385ce..c301ffd176 100644 --- a/translator/translate/otel/receiver/otlp/translator_test.go +++ b/translator/translate/otel/receiver/otlp/translator_test.go @@ -175,7 +175,7 @@ func TestMetricsTranslator(t *testing.T) { conf := confmap.NewFromStringMap(testCase.input) tt := NewTranslator(WithDataType(component.DataTypeMetrics)) if testCase.index != -1 { - tt = NewTranslator(WithDataType(component.DataTypeMetrics), WithIndex(testCase.index)) + tt = NewTranslator(WithDataType(component.DataTypeMetrics), common.WithIndex(testCase.index)) } got, err := tt.Translate(conf) assert.Equal(t, testCase.wantErr, err) @@ -192,7 +192,7 @@ func TestMetricsTranslator(t *testing.T) { } func TestTranslateAppSignals(t *testing.T) { - tt := NewTranslatorWithName(common.AppSignals, WithDataType(component.DataTypeTraces)) + tt := NewTranslator(common.WithName(common.AppSignals), WithDataType(component.DataTypeTraces)) testCases := map[string]struct { input map[string]interface{} want *confmap.Conf @@ -314,3 +314,15 @@ func TestTranslateAppSignals(t *testing.T) { }) } } + +func TestTranslateJMX(t *testing.T) { + tt := NewTranslator(common.WithName(common.PipelineNameJmx)) + got, err := tt.Translate(nil) + assert.NoError(t, err) + assert.NotNil(t, got) + gotCfg, ok := got.(*otlpreceiver.Config) + require.True(t, ok) + assert.Nil(t, gotCfg.GRPC) + assert.NotNil(t, gotCfg.HTTP) + assert.Equal(t, "0.0.0.0:4314", gotCfg.HTTP.Endpoint) +}