From a736fe97fe6c986eeed2a05a51cf51eaf942ed96 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Mon, 22 Jan 2024 23:35:32 -0500 Subject: [PATCH 1/6] feat: init update async instrument --- .../instrument/asynchronous_instrument.rb | 73 +++++++++++++++ .../metrics/instrument/observable_counter.rb | 32 +++++-- .../metrics/instrument/observable_gauge.rb | 32 +++++-- .../instrument/observable_up_down_counter.rb | 32 +++++-- .../sdk/metrics/meter_provider.rb | 15 ++++ .../state/asynchronous_metric_stream.rb | 88 +++++++++++++++++++ 6 files changed, 245 insertions(+), 27 deletions(-) create mode 100644 metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb create mode 100644 metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb new file mode 100644 index 0000000000..4e3db7ded2 --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module Instrument + # {AsynchronousInstrument} contains the common functionality shared across + # the asynchronous instruments SDK instruments. + class AsynchronousInstrument + def initialize(name, unit, description, callback, instrumentation_scope, meter_provider) + @name = name + @unit = unit + @description = description + @instrumentation_scope = instrumentation_scope + @meter_provider = meter_provider + @metric_streams = [] + @callbacks = [] + + register_callback(callback) + meter_provider.register_asynchronous_instrument(self) + end + + # @api private + def register_with_new_metric_store(metric_store, aggregation: default_aggregation) + ms = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new( + @name, + @description, + @unit, + instrument_kind, + @meter_provider, + @instrumentation_scope, + aggregation, + @callbacks + ) + @metric_streams << ms + metric_store.add_metric_stream(ms) + end + + def register_callback(callback) + callbacks = [callback] unless callback.instance_of? Array + + callbacks.each do |cb| + if cb.instance_of? Proc + @callbacks << cb + else + OpenTelemetry.logger.warn "The callback registeration failed for instrument #{@name}" + end + end + @meter_provider.register_callback(self, @callbacks) # meter_provider should register list of callback + end + + def remove_callback(callback) + orig_callback_size = @callbacks.size + callbacks = [callback] unless callback.instance_of? Array + + callbacks.each { |cb| @callback.delete(cb) } + @meter_provider.register_callback(self, @callbacks) if @callback.size != orig_callback_size + end + + private + + def update(timeout, attributes) + @metric_streams.each { |ms| ms.invoke_callback(timeout, attributes) } + end + end + end + end + end +end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb index 4e5d49102c..323139e03e 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb @@ -8,16 +8,30 @@ module OpenTelemetry module SDK module Metrics module Instrument - # {ObservableCounter} is the SDK implementation of {OpenTelemetry::Metrics::ObservableCounter}. - class ObservableCounter < OpenTelemetry::Metrics::Instrument::ObservableCounter - attr_reader :name, :unit, :description + # {ObservableCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. + class ObservableCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument + # Returns the instrument kind as a Symbol + # + # @return [Symbol] + def instrument_kind + :observable_counter + end + + # Observe the ObservableCounter with fixed timeout duartion. + # + # @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value. + # @param [Hash{String => String, Numeric, Boolean, Array}] attributes + # Values must be non-nil and (array of) string, boolean or numeric type. + # Array values must not contain nil elements and all elements must be of + # the same basic type (string, numeric, boolean). + def observe(timeout: nil, attributes: {}) + update(timeout, attributes) + end + + private - def initialize(name, unit, description, callback, meter) - @name = name - @unit = unit - @description = description - @callback = callback - @meter = meter + def default_aggregation + OpenTelemetry::SDK::Metrics::Aggregation::Sum.new end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb index 7a122184f4..ade8a2a28b 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb @@ -8,16 +8,30 @@ module OpenTelemetry module SDK module Metrics module Instrument - # {ObservableGauge} is the SDK implementation of {OpenTelemetry::Metrics::ObservableGauge}. - class ObservableGauge < OpenTelemetry::Metrics::Instrument::ObservableGauge - attr_reader :name, :unit, :description + # {ObservableGauge} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. + class ObservableGauge < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument + # Returns the instrument kind as a Symbol + # + # @return [Symbol] + def instrument_kind + :observable_gauge + end + + # Observe the ObservableCounter with fixed timeout duartion. + # + # @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value. + # @param [Hash{String => String, Numeric, Boolean, Array}] attributes + # Values must be non-nil and (array of) string, boolean or numeric type. + # Array values must not contain nil elements and all elements must be of + # the same basic type (string, numeric, boolean). + def observe(timeout: nil, attributes: {}) + update(timeout, attributes) + end + + private - def initialize(name, unit, description, callback, meter) - @name = name - @unit = unit - @description = description - @callback = callback - @meter = meter + def default_aggregation + OpenTelemetry::SDK::Metrics::Aggregation::Sum.new end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb index 8eb812c5fb..933f514b49 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb @@ -8,16 +8,30 @@ module OpenTelemetry module SDK module Metrics module Instrument - # {ObservableUpDownCounter} is the SDK implementation of {OpenTelemetry::Metrics::ObservableUpDownCounter}. - class ObservableUpDownCounter < OpenTelemetry::Metrics::Instrument::ObservableUpDownCounter - attr_reader :name, :unit, :description + # {ObservableUpDownCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. + class ObservableUpDownCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument + # Returns the instrument kind as a Symbol + # + # @return [Symbol] + def instrument_kind + :observable_up_down_counter + end + + # Observe the ObservableCounter with fixed timeout duartion. + # + # @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value. + # @param [Hash{String => String, Numeric, Boolean, Array}] attributes + # Values must be non-nil and (array of) string, boolean or numeric type. + # Array values must not contain nil elements and all elements must be of + # the same basic type (string, numeric, boolean). + def observe(timeout: nil, attributes: {}) + update(timeout, attributes) + end + + private - def initialize(name, unit, description, callback, meter) - @name = name - @unit = unit - @description = description - @callback = callback - @meter = meter + def default_aggregation + OpenTelemetry::SDK::Metrics::Aggregation::Sum.new end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb index 205ff5db0d..977fea7124 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb @@ -22,6 +22,7 @@ def initialize(resource: OpenTelemetry::SDK::Resources::Resource.create) @stopped = false @metric_readers = [] @resource = resource + @registered_callback = {} end # Returns a {Meter} instance. @@ -125,6 +126,20 @@ def register_synchronous_instrument(instrument) end end + def register_asynchronous_instrument(instrument) + register_synchronous_instrument(instrument) + end + + def register_callback(instrument, callback) + instruments = [instrument] unless instrument.instance_of? Array + instruments.each { |inst| inst.register_callback(callback) } + end + + def remove_callback(instrument, callback) + instruments = [instrument] unless instrument.instance_of? Array + instruments.each { |inst| inst.remove_callback(callback) } + end + # The type of the Instrument(s) (optional). # The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters. # The name of the Meter (optional). diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb new file mode 100644 index 0000000000..7daf500a21 --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module State + # @api private + # + # The MetricStream class provides SDK internal functionality that is not a part of the + # public API. + class AsynchronousMetricStream + attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points + + def initialize( + name, + description, + unit, + instrument_kind, + meter_provider, + instrumentation_scope, + aggregation, + callback + ) + @name = name + @description = description + @unit = unit + @instrument_kind = instrument_kind + @meter_provider = meter_provider + @instrumentation_scope = instrumentation_scope + @aggregation = aggregation + @callback = callback + + @mutex = Mutex.new + end + + def collect(start_time, end_time) + invoke_callback(nil, {}) # number of callbacks are finalized, collect the metric data in aggregation + + @mutex.synchronize do + MetricData.new( + @name, + @description, + @unit, + @instrument_kind, + @meter_provider.resource, + @instrumentation_scope, + @aggregation.collect(start_time, end_time), + start_time, + end_time + ) + end + end + + def invoke_callback(timeout, attributes) + timeout_ = timeout || 30 + Timeout.timeout(timeout) do + @callback.each do |cb| + value = cb.call + @mutex.synchronize { + @aggregation.update(value, attributes) + } + end + + end + end + + def to_s + instrument_info = String.new + instrument_info << "name=#{@name}" + instrument_info << " description=#{@description}" if @description + instrument_info << " unit=#{@unit}" if @unit + @data_points.map do |attributes, value| + metric_stream_string = String.new + metric_stream_string << instrument_info + metric_stream_string << " attributes=#{attributes}" if attributes + metric_stream_string << " #{value}" + metric_stream_string + end.join("\n") + end + end + end + end + end +end From 4111c98b198f1bf52051028e0acdfb9ef15594cf Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Fri, 1 Mar 2024 14:31:29 -0500 Subject: [PATCH 2/6] feat: asynchronous inst - using meter for multi instrument callback registeration --- .../opentelemetry/sdk/metrics/instrument.rb | 1 + .../instrument/asynchronous_instrument.rb | 25 ++-- .../metrics/instrument/observable_counter.rb | 1 + .../metrics/instrument/observable_gauge.rb | 1 + .../instrument/observable_up_down_counter.rb | 4 +- .../lib/opentelemetry/sdk/metrics/meter.rb | 24 ++++ .../sdk/metrics/meter_provider.rb | 11 -- .../lib/opentelemetry/sdk/metrics/state.rb | 1 + .../state/asynchronous_metric_stream.rb | 25 ++-- .../instrument/observable_counter_test.rb | 94 +++++++++++++++ .../instrument/observable_gauge_test.rb | 54 +++++++++ .../observable_up_down_counter_test.rb | 54 +++++++++ .../opentelemetry/sdk/metrics/meter_test.rb | 109 ++++++++++++++++-- 13 files changed, 359 insertions(+), 45 deletions(-) create mode 100644 metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb create mode 100644 metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb create mode 100644 metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument.rb index c440634c8c..e7306d3d43 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument.rb @@ -14,6 +14,7 @@ module Instrument end require 'opentelemetry/sdk/metrics/instrument/synchronous_instrument' +require 'opentelemetry/sdk/metrics/instrument/asynchronous_instrument' require 'opentelemetry/sdk/metrics/instrument/counter' require 'opentelemetry/sdk/metrics/instrument/histogram' require 'opentelemetry/sdk/metrics/instrument/observable_counter' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb index 4e3db7ded2..77338506b0 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb @@ -40,29 +40,24 @@ def register_with_new_metric_store(metric_store, aggregation: default_aggregatio metric_store.add_metric_stream(ms) end + # For multiple callbacks in single instrument def register_callback(callback) - callbacks = [callback] unless callback.instance_of? Array - - callbacks.each do |cb| - if cb.instance_of? Proc - @callbacks << cb - else - OpenTelemetry.logger.warn "The callback registeration failed for instrument #{@name}" - end + if callback.instance_of?(Proc) + @callbacks << callback # since @callbacks pass to ms, so no need to add it again + else + OpenTelemetry.logger.warn "Only accept single Proc for registering callback (given callback #{callback.class}" end - @meter_provider.register_callback(self, @callbacks) # meter_provider should register list of callback end - def remove_callback(callback) - orig_callback_size = @callbacks.size - callbacks = [callback] unless callback.instance_of? Array - - callbacks.each { |cb| @callback.delete(cb) } - @meter_provider.register_callback(self, @callbacks) if @callback.size != orig_callback_size + # For callback functions registered after an asynchronous instrument is created, + def unregister(callback) + @callbacks.delete(callback) end private + # update the observed value (after calling observe) + # invoke callback will execute callback and export metric_data that is observed def update(timeout, attributes) @metric_streams.each { |ms| ms.invoke_callback(timeout, attributes) } end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb index 323139e03e..da2323fd53 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb @@ -9,6 +9,7 @@ module SDK module Metrics module Instrument # {ObservableCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. + # Asynchronous Counter is an asynchronous Instrument which reports non-additive, monotonically increasing value(s) when the instrument is being observed. class ObservableCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument # Returns the instrument kind as a Symbol # diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb index ade8a2a28b..7049255dcb 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_gauge.rb @@ -9,6 +9,7 @@ module SDK module Metrics module Instrument # {ObservableGauge} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. + # Asynchronous Gauge is an asynchronous Instrument which reports non-additive value(s) (e.g. the room temperature) class ObservableGauge < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument # Returns the instrument kind as a Symbol # diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb index 933f514b49..1db04fc0bf 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb @@ -9,6 +9,7 @@ module SDK module Metrics module Instrument # {ObservableUpDownCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. + # Asynchronous UpDownCounter is an asynchronous Instrument which reports additive value(s) (e.g. the process heap size) class ObservableUpDownCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument # Returns the instrument kind as a Symbol # @@ -18,6 +19,7 @@ def instrument_kind end # Observe the ObservableCounter with fixed timeout duartion. + # Everytime observe, the value should be sent to backend through exporter # # @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value. # @param [Hash{String => String, Numeric, Boolean, Array}] attributes @@ -31,7 +33,7 @@ def observe(timeout: nil, attributes: {}) private def default_aggregation - OpenTelemetry::SDK::Metrics::Aggregation::Sum.new + OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality: :cumulative) end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb index faf8e9adb1..4e1d1c16e4 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb @@ -26,6 +26,30 @@ def initialize(name, version, meter_provider) @meter_provider = meter_provider end + + # Multiple-instrument callbacks + # Callbacks registered after the time of instrument creation MAY be associated with multiple instruments. + # Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#multiple-instrument-callbacks + # Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#synchronous-instrument-api + # + # @param [Array] instruments A list (or tuple, etc.) of Instruments used in the callback function. + # @param [Proc] callback A callback function + # + # It is RECOMMENDED that the API authors use one of the following forms for the callback function: + # The list (or tuple, etc.) returned by the callback function contains (Instrument, Measurement) pairs. + # the Observable Result parameter receives an additional (Instrument, Measurement) pairs + def register_callback(instruments, callback) + instruments.each do |instrument| + instrument.register_callback(callback) + end + end + + def unregister(instruments, callback) + instruments.each do |instrument| + instrument.unregister(callback) + end + end + # @api private def add_metric_reader(metric_reader) @instrument_registry.each do |_n, instrument| diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb index 977fea7124..b5ed5d86de 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb @@ -22,7 +22,6 @@ def initialize(resource: OpenTelemetry::SDK::Resources::Resource.create) @stopped = false @metric_readers = [] @resource = resource - @registered_callback = {} end # Returns a {Meter} instance. @@ -130,16 +129,6 @@ def register_asynchronous_instrument(instrument) register_synchronous_instrument(instrument) end - def register_callback(instrument, callback) - instruments = [instrument] unless instrument.instance_of? Array - instruments.each { |inst| inst.register_callback(callback) } - end - - def remove_callback(instrument, callback) - instruments = [instrument] unless instrument.instance_of? Array - instruments.each { |inst| inst.remove_callback(callback) } - end - # The type of the Instrument(s) (optional). # The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters. # The name of the Meter (optional). diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state.rb index 0512f5136f..17846c0670 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state.rb @@ -20,3 +20,4 @@ module State require 'opentelemetry/sdk/metrics/state/metric_data' require 'opentelemetry/sdk/metrics/state/metric_store' require 'opentelemetry/sdk/metrics/state/metric_stream' +require 'opentelemetry/sdk/metrics/state/asynchronous_metric_stream' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb index 7daf500a21..8247350abb 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb @@ -33,12 +33,16 @@ def initialize( @instrumentation_scope = instrumentation_scope @aggregation = aggregation @callback = callback + @start_time = now_in_nano @mutex = Mutex.new end + # When collect, if there are asynchronous SDK Instruments involved, their callback functions will be triggered. + # Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#collect + # invoke_callback will update the data_points in aggregation def collect(start_time, end_time) - invoke_callback(nil, {}) # number of callbacks are finalized, collect the metric data in aggregation + invoke_callback(nil, {}) @mutex.synchronize do MetricData.new( @@ -49,6 +53,7 @@ def collect(start_time, end_time) @meter_provider.resource, @instrumentation_scope, @aggregation.collect(start_time, end_time), + @aggregation.aggregation_temporality, start_time, end_time ) @@ -56,15 +61,13 @@ def collect(start_time, end_time) end def invoke_callback(timeout, attributes) - timeout_ = timeout || 30 - Timeout.timeout(timeout) do - @callback.each do |cb| - value = cb.call - @mutex.synchronize { - @aggregation.update(value, attributes) - } + @mutex.synchronize do + Timeout.timeout(timeout || 30) do + @callback.each do |cb| + value = cb.call + @aggregation.update(value, attributes) + end end - end end @@ -81,6 +84,10 @@ def to_s metric_stream_string end.join("\n") end + + def now_in_nano + (Time.now.to_r * 1_000_000_000).to_i + end end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb new file mode 100644 index 0000000000..3f5370408c --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::Instrument::ObservableCounter do + let(:metric_exporter) { OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new } + let(:meter) { OpenTelemetry.meter_provider.meter('test') } + + before do + reset_metrics_sdk + OpenTelemetry::SDK.configure + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + end + + it 'counts without observe' do + callback = Proc.new { 10 } + meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + # puts "last_snapshot.inspect: #{last_snapshot.inspect}" + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + end + + it 'counts with observe' do + callback = Proc.new { 10 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) + observable_counter.observe(timeout: 10, attributes: {'foo' => 'bar'}) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal('foo' => 'bar') + + _(last_snapshot[0].data_points[1].value).must_equal(10) + _(last_snapshot[0].data_points[1].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + end + + it 'counts with observe after initialization' do + callback_1 = Proc.new { 10 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_1) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 1 + + callback_2 = Proc.new { 20 } + observable_counter.register_callback(callback_2) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 2 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(30) # two callback aggregate value to 30 + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + end + + it 'remove the callback after initialization result no metrics data' do + callback_1 = Proc.new { 10 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_1) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 1 + + observable_counter.unregister(callback_1) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 0 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points.size).must_equal 0 + end + +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb new file mode 100644 index 0000000000..d18b5661db --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::Instrument::ObservableGauge do + let(:metric_exporter) { OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new } + let(:meter) { OpenTelemetry.meter_provider.meter('test') } + + before do + reset_metrics_sdk + OpenTelemetry::SDK.configure + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + end + + it 'counts without observe' do + callback = Proc.new { 10 } + meter.create_observable_gauge('gauge', unit: 'smidgen', description: 'a small amount of something', callback: callback) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('gauge') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + end + + it 'counts with observe' do + callback = Proc.new { 10 } + observable_gauge = meter.create_observable_gauge('gauge', unit: 'smidgen', description: 'a small amount of something', callback: callback) + observable_gauge.observe(timeout: 10, attributes: {'foo' => 'bar'}) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('gauge') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal('foo' => 'bar') + + _(last_snapshot[0].data_points[1].value).must_equal(10) + _(last_snapshot[0].data_points[1].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb new file mode 100644 index 0000000000..edc1bf1ea1 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::Instrument::ObservableUpDownCounter do + let(:metric_exporter) { OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new } + let(:meter) { OpenTelemetry.meter_provider.meter('test') } + + before do + reset_metrics_sdk + OpenTelemetry::SDK.configure + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + end + + it 'counts without observe' do + callback = Proc.new { 10 } + meter.create_observable_up_down_counter('updown_counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('updown_counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:cumulative) + end + + it 'counts with observe' do + callback = Proc.new { 10 } + up_down_counter = meter.create_observable_up_down_counter('updown_counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) + up_down_counter.observe(timeout: 10, attributes: {'foo' => 'bar'}) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('updown_counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal('foo' => 'bar') + + _(last_snapshot[0].data_points[1].value).must_equal(10) + _(last_snapshot[0].data_points[1].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:cumulative) + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb index 6a6cba2fdc..c4d547b286 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb @@ -34,28 +34,119 @@ describe '#create_observable_counter' do it 'creates a observable_counter instrument' do - # TODO: Implement observable instruments - skip - instrument = meter.create_observable_counter('a_observable_counter', unit: 'minutes', description: 'useful description', callback: nil) + instrument = meter.create_observable_counter('a_observable_counter', unit: 'minutes', description: 'useful description', callback: proc {10}) _(instrument).must_be_instance_of OpenTelemetry::SDK::Metrics::Instrument::ObservableCounter end end describe '#create_observable_gauge' do it 'creates a observable_gauge instrument' do - # TODO: Implement observable instruments - skip - instrument = meter.create_observable_gauge('a_observable_gauge', unit: 'minutes', description: 'useful description', callback: nil) + instrument = meter.create_observable_gauge('a_observable_gauge', unit: 'minutes', description: 'useful description', callback: proc {10}) _(instrument).must_be_instance_of OpenTelemetry::SDK::Metrics::Instrument::ObservableGauge end end describe '#create_observable_up_down_counter' do it 'creates a observable_up_down_counter instrument' do - # TODO: Implement observable instruments - skip - instrument = meter.create_observable_up_down_counter('a_observable_up_down_counter', unit: 'minutes', description: 'useful description', callback: nil) + instrument = meter.create_observable_up_down_counter('a_observable_up_down_counter', unit: 'minutes', description: 'useful description', callback: proc {10}) _(instrument).must_be_instance_of OpenTelemetry::SDK::Metrics::Instrument::ObservableUpDownCounter end end + + describe 'callback' do + describe '#register_callback' do + let(:metric_exporter) { OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new } + let(:meter) { OpenTelemetry.meter_provider.meter('test') } + + before do + reset_metrics_sdk + OpenTelemetry::SDK.configure + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + end + + it 'create callback with multi asychronous instrument' do + callback_1 = Proc.new { 10 } + counter_1 = meter.create_observable_counter('counter_1', unit: 'smidgen', description: '', callback: callback_1) + counter_2 = meter.create_observable_counter('counter_2', unit: 'smidgen', description: '', callback: callback_1) + + callback_2 = Proc.new { 20 } + meter.register_callback([counter_1, counter_2], callback_2) + + _(counter_1.instance_variable_get(:@callbacks).size).must_equal 2 + _(counter_2.instance_variable_get(:@callbacks).size).must_equal 2 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter_1') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(30) + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + + _(last_snapshot[1].name).must_equal('counter_2') + _(last_snapshot[1].unit).must_equal('smidgen') + _(last_snapshot[1].description).must_equal('') + _(last_snapshot[1].instrumentation_scope.name).must_equal('test') + _(last_snapshot[1].data_points[0].value).must_equal(30) + _(last_snapshot[1].data_points[0].attributes).must_equal({}) + _(last_snapshot[1].aggregation_temporality).must_equal(:delta) + end + + it 'remove callback with multi asychronous instrument' do + callback_1 = Proc.new { 10 } + counter_1 = meter.create_observable_counter('counter_1', unit: 'smidgen', description: '', callback: callback_1) + counter_2 = meter.create_observable_counter('counter_2', unit: 'smidgen', description: '', callback: callback_1) + + callback_2 = Proc.new { 20 } + meter.register_callback([counter_1, counter_2], callback_2) + + _(counter_1.instance_variable_get(:@callbacks).size).must_equal 2 + _(counter_2.instance_variable_get(:@callbacks).size).must_equal 2 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter_1') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(30) + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + + _(last_snapshot[1].name).must_equal('counter_2') + _(last_snapshot[1].unit).must_equal('smidgen') + _(last_snapshot[1].description).must_equal('') + _(last_snapshot[1].instrumentation_scope.name).must_equal('test') + _(last_snapshot[1].data_points[0].value).must_equal(30) + _(last_snapshot[1].data_points[0].attributes).must_equal({}) + _(last_snapshot[1].aggregation_temporality).must_equal(:delta) + + # unregister the callback_2 from instruments counter_1 and counter_2 + meter.unregister([counter_1, counter_2], callback_2) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter_1') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal({}) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + + _(last_snapshot[1].name).must_equal('counter_2') + _(last_snapshot[1].unit).must_equal('smidgen') + _(last_snapshot[1].description).must_equal('') + _(last_snapshot[1].instrumentation_scope.name).must_equal('test') + _(last_snapshot[1].data_points[0].value).must_equal(10) + _(last_snapshot[1].data_points[0].attributes).must_equal({}) + _(last_snapshot[1].aggregation_temporality).must_equal(:delta) + end + end + end end From 71eda0054a3397265a2347e1f5f98782c39d6657 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Mon, 4 Mar 2024 11:31:49 -0500 Subject: [PATCH 3/6] feat: asyc instrument - lint and allow setting default attributes for instrument --- .../instrument/asynchronous_instrument.rb | 16 +++++- .../lib/opentelemetry/sdk/metrics/meter.rb | 1 - .../state/asynchronous_metric_stream.rb | 8 ++- .../instrument/observable_counter_test.rb | 40 +++++++++++---- .../instrument/observable_gauge_test.rb | 6 +-- .../observable_up_down_counter_test.rb | 6 +-- .../opentelemetry/sdk/metrics/meter_test.rb | 50 +++++++++---------- 7 files changed, 80 insertions(+), 47 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb index 77338506b0..f1419c7f71 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb @@ -19,6 +19,8 @@ def initialize(name, unit, description, callback, instrumentation_scope, meter_p @meter_provider = meter_provider @metric_streams = [] @callbacks = [] + @timeout = nil + @attributes = {} register_callback(callback) meter_provider.register_asynchronous_instrument(self) @@ -34,7 +36,9 @@ def register_with_new_metric_store(metric_store, aggregation: default_aggregatio @meter_provider, @instrumentation_scope, aggregation, - @callbacks + @callbacks, + @timeout, + @attributes ) @metric_streams << ms metric_store.add_metric_stream(ms) @@ -43,7 +47,7 @@ def register_with_new_metric_store(metric_store, aggregation: default_aggregatio # For multiple callbacks in single instrument def register_callback(callback) if callback.instance_of?(Proc) - @callbacks << callback # since @callbacks pass to ms, so no need to add it again + @callbacks << callback else OpenTelemetry.logger.warn "Only accept single Proc for registering callback (given callback #{callback.class}" end @@ -54,6 +58,14 @@ def unregister(callback) @callbacks.delete(callback) end + def timeout(timeout) + @timeout = timeout + end + + def add_attributes(attributes) + @attributes.merge!(attributes) if attributes.instance_of?(Hash) + end + private # update the observed value (after calling observe) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb index 4e1d1c16e4..751206011e 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb @@ -26,7 +26,6 @@ def initialize(name, version, meter_provider) @meter_provider = meter_provider end - # Multiple-instrument callbacks # Callbacks registered after the time of instrument creation MAY be associated with multiple instruments. # Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#multiple-instrument-callbacks diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb index 8247350abb..f6700dbd4d 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb @@ -23,7 +23,9 @@ def initialize( meter_provider, instrumentation_scope, aggregation, - callback + callback, + timeout, + attributes ) @name = name @description = description @@ -34,6 +36,8 @@ def initialize( @aggregation = aggregation @callback = callback @start_time = now_in_nano + @timeout = timeout + @attributes = attributes @mutex = Mutex.new end @@ -42,7 +46,7 @@ def initialize( # Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#collect # invoke_callback will update the data_points in aggregation def collect(start_time, end_time) - invoke_callback(nil, {}) + invoke_callback(@timeout, @attributes) @mutex.synchronize do MetricData.new( diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb index 3f5370408c..ea66d42bbc 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb @@ -17,7 +17,7 @@ end it 'counts without observe' do - callback = Proc.new { 10 } + callback = proc { 10 } meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) metric_exporter.pull @@ -33,10 +33,29 @@ _(last_snapshot[0].aggregation_temporality).must_equal(:delta) end + it 'counts with set timeout and attributes' do + callback = proc { 10 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) + observable_counter.add_attributes({ 'foo' => 'bar' }) + observable_counter.timeout(10) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + # puts "last_snapshot.inspect: #{last_snapshot.inspect}" + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(10) + _(last_snapshot[0].data_points[0].attributes).must_equal({ 'foo' => 'bar' }) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) + end + it 'counts with observe' do - callback = Proc.new { 10 } + callback = proc { 10 } observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) - observable_counter.observe(timeout: 10, attributes: {'foo' => 'bar'}) + observable_counter.observe(timeout: 10, attributes: { 'foo' => 'bar' }) metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last @@ -54,12 +73,12 @@ end it 'counts with observe after initialization' do - callback_1 = Proc.new { 10 } - observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_1) + callback_first = proc { 10 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_first) _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 1 - callback_2 = Proc.new { 20 } - observable_counter.register_callback(callback_2) + callback_second = proc { 20 } + observable_counter.register_callback(callback_second) _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 2 metric_exporter.pull @@ -74,11 +93,11 @@ end it 'remove the callback after initialization result no metrics data' do - callback_1 = Proc.new { 10 } - observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_1) + callback_first = proc { 10 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_first) _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 1 - observable_counter.unregister(callback_1) + observable_counter.unregister(callback_first) _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 0 metric_exporter.pull @@ -90,5 +109,4 @@ _(last_snapshot[0].instrumentation_scope.name).must_equal('test') _(last_snapshot[0].data_points.size).must_equal 0 end - end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb index d18b5661db..70a3a1735b 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_gauge_test.rb @@ -17,7 +17,7 @@ end it 'counts without observe' do - callback = Proc.new { 10 } + callback = proc { 10 } meter.create_observable_gauge('gauge', unit: 'smidgen', description: 'a small amount of something', callback: callback) metric_exporter.pull @@ -33,9 +33,9 @@ end it 'counts with observe' do - callback = Proc.new { 10 } + callback = proc { 10 } observable_gauge = meter.create_observable_gauge('gauge', unit: 'smidgen', description: 'a small amount of something', callback: callback) - observable_gauge.observe(timeout: 10, attributes: {'foo' => 'bar'}) + observable_gauge.observe(timeout: 10, attributes: { 'foo' => 'bar' }) metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb index edc1bf1ea1..b5a6753130 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb @@ -17,7 +17,7 @@ end it 'counts without observe' do - callback = Proc.new { 10 } + callback = proc { 10 } meter.create_observable_up_down_counter('updown_counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) metric_exporter.pull @@ -33,9 +33,9 @@ end it 'counts with observe' do - callback = Proc.new { 10 } + callback = proc { 10 } up_down_counter = meter.create_observable_up_down_counter('updown_counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) - up_down_counter.observe(timeout: 10, attributes: {'foo' => 'bar'}) + up_down_counter.observe(timeout: 10, attributes: { 'foo' => 'bar' }) metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb index c4d547b286..72b63ba34f 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb @@ -34,21 +34,21 @@ describe '#create_observable_counter' do it 'creates a observable_counter instrument' do - instrument = meter.create_observable_counter('a_observable_counter', unit: 'minutes', description: 'useful description', callback: proc {10}) + instrument = meter.create_observable_counter('a_observable_counter', unit: 'minutes', description: 'useful description', callback: proc { 10 }) _(instrument).must_be_instance_of OpenTelemetry::SDK::Metrics::Instrument::ObservableCounter end end describe '#create_observable_gauge' do it 'creates a observable_gauge instrument' do - instrument = meter.create_observable_gauge('a_observable_gauge', unit: 'minutes', description: 'useful description', callback: proc {10}) + instrument = meter.create_observable_gauge('a_observable_gauge', unit: 'minutes', description: 'useful description', callback: proc { 10 }) _(instrument).must_be_instance_of OpenTelemetry::SDK::Metrics::Instrument::ObservableGauge end end describe '#create_observable_up_down_counter' do it 'creates a observable_up_down_counter instrument' do - instrument = meter.create_observable_up_down_counter('a_observable_up_down_counter', unit: 'minutes', description: 'useful description', callback: proc {10}) + instrument = meter.create_observable_up_down_counter('a_observable_up_down_counter', unit: 'minutes', description: 'useful description', callback: proc { 10 }) _(instrument).must_be_instance_of OpenTelemetry::SDK::Metrics::Instrument::ObservableUpDownCounter end end @@ -65,20 +65,20 @@ end it 'create callback with multi asychronous instrument' do - callback_1 = Proc.new { 10 } - counter_1 = meter.create_observable_counter('counter_1', unit: 'smidgen', description: '', callback: callback_1) - counter_2 = meter.create_observable_counter('counter_2', unit: 'smidgen', description: '', callback: callback_1) + callback_first = proc { 10 } + counter_first = meter.create_observable_counter('counter_first', unit: 'smidgen', description: '', callback: callback_first) + counter_second = meter.create_observable_counter('counter_second', unit: 'smidgen', description: '', callback: callback_first) - callback_2 = Proc.new { 20 } - meter.register_callback([counter_1, counter_2], callback_2) + callback_second = proc { 20 } + meter.register_callback([counter_first, counter_second], callback_second) - _(counter_1.instance_variable_get(:@callbacks).size).must_equal 2 - _(counter_2.instance_variable_get(:@callbacks).size).must_equal 2 + _(counter_first.instance_variable_get(:@callbacks).size).must_equal 2 + _(counter_second.instance_variable_get(:@callbacks).size).must_equal 2 metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last - _(last_snapshot[0].name).must_equal('counter_1') + _(last_snapshot[0].name).must_equal('counter_first') _(last_snapshot[0].unit).must_equal('smidgen') _(last_snapshot[0].description).must_equal('') _(last_snapshot[0].instrumentation_scope.name).must_equal('test') @@ -86,7 +86,7 @@ _(last_snapshot[0].data_points[0].attributes).must_equal({}) _(last_snapshot[0].aggregation_temporality).must_equal(:delta) - _(last_snapshot[1].name).must_equal('counter_2') + _(last_snapshot[1].name).must_equal('counter_second') _(last_snapshot[1].unit).must_equal('smidgen') _(last_snapshot[1].description).must_equal('') _(last_snapshot[1].instrumentation_scope.name).must_equal('test') @@ -96,20 +96,20 @@ end it 'remove callback with multi asychronous instrument' do - callback_1 = Proc.new { 10 } - counter_1 = meter.create_observable_counter('counter_1', unit: 'smidgen', description: '', callback: callback_1) - counter_2 = meter.create_observable_counter('counter_2', unit: 'smidgen', description: '', callback: callback_1) + callback_first = proc { 10 } + counter_first = meter.create_observable_counter('counter_first', unit: 'smidgen', description: '', callback: callback_first) + counter_second = meter.create_observable_counter('counter_second', unit: 'smidgen', description: '', callback: callback_first) - callback_2 = Proc.new { 20 } - meter.register_callback([counter_1, counter_2], callback_2) + callback_second = proc { 20 } + meter.register_callback([counter_first, counter_second], callback_second) - _(counter_1.instance_variable_get(:@callbacks).size).must_equal 2 - _(counter_2.instance_variable_get(:@callbacks).size).must_equal 2 + _(counter_first.instance_variable_get(:@callbacks).size).must_equal 2 + _(counter_second.instance_variable_get(:@callbacks).size).must_equal 2 metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last - _(last_snapshot[0].name).must_equal('counter_1') + _(last_snapshot[0].name).must_equal('counter_first') _(last_snapshot[0].unit).must_equal('smidgen') _(last_snapshot[0].description).must_equal('') _(last_snapshot[0].instrumentation_scope.name).must_equal('test') @@ -117,7 +117,7 @@ _(last_snapshot[0].data_points[0].attributes).must_equal({}) _(last_snapshot[0].aggregation_temporality).must_equal(:delta) - _(last_snapshot[1].name).must_equal('counter_2') + _(last_snapshot[1].name).must_equal('counter_second') _(last_snapshot[1].unit).must_equal('smidgen') _(last_snapshot[1].description).must_equal('') _(last_snapshot[1].instrumentation_scope.name).must_equal('test') @@ -125,13 +125,13 @@ _(last_snapshot[1].data_points[0].attributes).must_equal({}) _(last_snapshot[1].aggregation_temporality).must_equal(:delta) - # unregister the callback_2 from instruments counter_1 and counter_2 - meter.unregister([counter_1, counter_2], callback_2) + # unregister the callback_second from instruments counter_first and counter_second + meter.unregister([counter_first, counter_second], callback_second) metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last - _(last_snapshot[0].name).must_equal('counter_1') + _(last_snapshot[0].name).must_equal('counter_first') _(last_snapshot[0].unit).must_equal('smidgen') _(last_snapshot[0].description).must_equal('') _(last_snapshot[0].instrumentation_scope.name).must_equal('test') @@ -139,7 +139,7 @@ _(last_snapshot[0].data_points[0].attributes).must_equal({}) _(last_snapshot[0].aggregation_temporality).must_equal(:delta) - _(last_snapshot[1].name).must_equal('counter_2') + _(last_snapshot[1].name).must_equal('counter_second') _(last_snapshot[1].unit).must_equal('smidgen') _(last_snapshot[1].description).must_equal('') _(last_snapshot[1].instrumentation_scope.name).must_equal('test') From 0f62d1545e9441ebfec1fe8b581cf66f8e263149 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Mon, 4 Mar 2024 20:01:03 -0500 Subject: [PATCH 4/6] feat: asyc instrument - separate callback from instrument init and after init --- .../instrument/asynchronous_instrument.rb | 25 ++++++++++-- .../lib/opentelemetry/sdk/metrics/meter.rb | 1 + .../instrument/observable_counter_test.rb | 39 ++++++++++++++++++- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb index f1419c7f71..a96bf4cb1f 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb @@ -22,7 +22,7 @@ def initialize(name, unit, description, callback, instrumentation_scope, meter_p @timeout = nil @attributes = {} - register_callback(callback) + init_callback(callback) meter_provider.register_asynchronous_instrument(self) end @@ -44,16 +44,29 @@ def register_with_new_metric_store(metric_store, aggregation: default_aggregatio metric_store.add_metric_stream(ms) end - # For multiple callbacks in single instrument + # The API MUST support creation of asynchronous instruments by passing zero or more callback functions + # to be permanently registered to the newly created instrument. + def init_callback(callback) + if callback.instance_of?(Proc) + @callbacks << callback + elsif callback.instance_of?(Array) + callback.each { |cb| @callbacks << cb if cb.instance_of?(Proc) } + else + OpenTelemetry.logger.warn "Only accept single Proc or Array of Proc for initialization with callback (given callback #{callback.class}" + end + end + + # Where the API supports registration of callback functions after asynchronous instrumentation creation, + # the user MUST be able to undo registration of the specific callback after its registration by some means. def register_callback(callback) if callback.instance_of?(Proc) @callbacks << callback + callback else OpenTelemetry.logger.warn "Only accept single Proc for registering callback (given callback #{callback.class}" end end - # For callback functions registered after an asynchronous instrument is created, def unregister(callback) @callbacks.delete(callback) end @@ -78,3 +91,9 @@ def update(timeout, attributes) end end end + +def create_callback(callbacks) + return callbacks if callbacks.instance_of?(Proc) + + puts 'a' +end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb index 751206011e..0ffa1b0220 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb @@ -37,6 +37,7 @@ def initialize(name, version, meter_provider) # It is RECOMMENDED that the API authors use one of the following forms for the callback function: # The list (or tuple, etc.) returned by the callback function contains (Instrument, Measurement) pairs. # the Observable Result parameter receives an additional (Instrument, Measurement) pairs + # Here it chose the second form def register_callback(instruments, callback) instruments.each do |instrument| instrument.register_callback(callback) diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb index ea66d42bbc..4b432ca83f 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb @@ -55,7 +55,7 @@ it 'counts with observe' do callback = proc { 10 } observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback) - observable_counter.observe(timeout: 10, attributes: { 'foo' => 'bar' }) + observable_counter.observe(timeout: 10, attributes: { 'foo' => 'bar' }) # observe will make another data points modification metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots.last @@ -109,4 +109,41 @@ _(last_snapshot[0].instrumentation_scope.name).must_equal('test') _(last_snapshot[0].data_points.size).must_equal 0 end + + it 'creation of instruments with more than one callabck' do + callback_first = proc { 10 } + callback_second = proc { 20 } + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: [callback_first, callback_second]) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 2 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + _(last_snapshot[0].data_points[0].value).must_equal(30) + end + + it 'creation of instruments with more than one invalid callabck should result no callback' do + callback_first = 'callback_first' + callback_second = 'callback_second' + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: [callback_first, callback_second]) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 0 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + _(last_snapshot[0].data_points.size).must_equal 0 + end + + it 'creation of instruments with invalid argument result no callback' do + callback_first = 'callback_first' + observable_counter = meter.create_observable_counter('counter', unit: 'smidgen', description: 'a small amount of something', callback: callback_first) + _(observable_counter.instance_variable_get(:@callbacks).size).must_equal 0 + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots.last + _(last_snapshot[0].data_points.size).must_equal 0 + end end From 5fdd84aaf1ba7d403f5ffb0e8cdabd6e7a1e5613 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 29 May 2024 12:18:32 -0400 Subject: [PATCH 5/6] feat: asyc - revision --- .../sdk/metrics/instrument/asynchronous_instrument.rb | 6 ------ .../sdk/metrics/instrument/observable_counter.rb | 2 +- .../sdk/metrics/instrument/observable_up_down_counter.rb | 2 +- metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb | 5 +---- .../metrics/instrument/observable_up_down_counter_test.rb | 4 ++-- 5 files changed, 5 insertions(+), 14 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb index a96bf4cb1f..4a522ba2ad 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/asynchronous_instrument.rb @@ -91,9 +91,3 @@ def update(timeout, attributes) end end end - -def create_callback(callbacks) - return callbacks if callbacks.instance_of?(Proc) - - puts 'a' -end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb index da2323fd53..3ca7382b88 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_counter.rb @@ -9,7 +9,7 @@ module SDK module Metrics module Instrument # {ObservableCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}. - # Asynchronous Counter is an asynchronous Instrument which reports non-additive, monotonically increasing value(s) when the instrument is being observed. + # Asynchronous Counter is an asynchronous Instrument which reports monotonically increasing value(s) when the instrument is being observed. class ObservableCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument # Returns the instrument kind as a Symbol # diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb index 1db04fc0bf..57e3dfdbcf 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/instrument/observable_up_down_counter.rb @@ -33,7 +33,7 @@ def observe(timeout: nil, attributes: {}) private def default_aggregation - OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality: :cumulative) + OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality: :delta) end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb index b5ed5d86de..738669b2ba 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb @@ -124,10 +124,7 @@ def register_synchronous_instrument(instrument) end end end - - def register_asynchronous_instrument(instrument) - register_synchronous_instrument(instrument) - end + alias_method :register_asynchronous_instrument, :register_synchronous_instrument # The type of the Instrument(s) (optional). # The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters. diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb index b5a6753130..e194709c63 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_up_down_counter_test.rb @@ -29,7 +29,7 @@ _(last_snapshot[0].instrumentation_scope.name).must_equal('test') _(last_snapshot[0].data_points[0].value).must_equal(10) _(last_snapshot[0].data_points[0].attributes).must_equal({}) - _(last_snapshot[0].aggregation_temporality).must_equal(:cumulative) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) end it 'counts with observe' do @@ -49,6 +49,6 @@ _(last_snapshot[0].data_points[1].value).must_equal(10) _(last_snapshot[0].data_points[1].attributes).must_equal({}) - _(last_snapshot[0].aggregation_temporality).must_equal(:cumulative) + _(last_snapshot[0].aggregation_temporality).must_equal(:delta) end end From 0a924459865589d8d50490cc37537131edb74bfc Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Thu, 24 Oct 2024 15:46:28 -0400 Subject: [PATCH 6/6] lint --- metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb | 2 +- .../sdk/metrics/instrument/observable_counter_test.rb | 2 +- metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb index 7e4a8f0553..243fb64e5c 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb @@ -126,7 +126,7 @@ def register_synchronous_instrument(instrument) end end end - alias_method :register_asynchronous_instrument, :register_synchronous_instrument + alias register_asynchronous_instrument register_synchronous_instrument # A View provides SDK users with the flexibility to customize the metrics that are output by the SDK. # diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb index a76f2ea30b..ba1972084f 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/observable_counter_test.rb @@ -88,7 +88,7 @@ _(last_snapshot[0].unit).must_equal('smidgen') _(last_snapshot[0].description).must_equal('a small amount of something') _(last_snapshot[0].instrumentation_scope.name).must_equal('test') - _(last_snapshot[0].data_points[0].value).must_equal(30) # two callback aggregate value to 30 + _(last_snapshot[0].data_points[0].value).must_equal(30) # two callback aggregate value to 30 _(last_snapshot[0].data_points[0].attributes).must_equal({}) end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb index 95c3f19fa6..b9a4025add 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_test.rb @@ -128,7 +128,6 @@ # unregister the callback_second from instruments counter_first and counter_second meter.unregister([counter_first, counter_second], callback_second) - metric_exporter.reset metric_exporter.pull last_snapshot = metric_exporter.metric_snapshots