77from contextlib import contextmanager
88from importlib .metadata import version
99from os import environ
10- from typing import Any , Dict , List , Optional , Tuple
10+ from typing import Any , Optional , Tuple
1111
1212OP_TYPE_READ , OP_TYPE_WRITE = "read" , "write"
1313OP_STATUS_SUCCESS , OP_STATUS_FAILURE = "success" , "err"
@@ -49,7 +49,13 @@ def start(self, labels) -> float:
4949 pass
5050
5151 @abstractmethod
52- def stop (self , labels , start_time : float , attempts : int = 1 , error : Exception = None ) -> None :
52+ def stop (
53+ self ,
54+ labels ,
55+ start_time : float ,
56+ attempts : int = 1 ,
57+ error : Optional [Exception ] = None ,
58+ ) -> None :
5359 pass
5460
5561 @abstractmethod
@@ -77,7 +83,13 @@ class DummyMetrics(BaseMetrics):
7783 def start (self , labels ) -> float :
7884 return 0.0
7985
80- def stop (self , labels , start_time : float , attempts : int = 1 , error : Exception = None ) -> None :
86+ def stop (
87+ self ,
88+ labels ,
89+ start_time : float ,
90+ attempts : int = 1 ,
91+ error : Optional [Exception ] = None ,
92+ ) -> None :
8193 return None
8294
8395 def reset (self ) -> None :
@@ -87,72 +99,31 @@ def push(self) -> None:
8799 return None
88100
89101
90- class _GaugeStore :
91- """
92- Minimal state store for ObservableGauge.
93-
94- OpenTelemetry Python implements "gauge" as an ObservableGauge (async instrument).
95- We keep the latest value per label-set and expose it via a callback.
96- """
97-
98- def __init__ (self , labelnames : Tuple [str , ...]):
99- self ._labelnames = labelnames
100- self ._values : Dict [Tuple [Any , ...], float ] = {}
101-
102- def _key (self , labelvalues : Tuple [Any , ...]) -> Tuple [Any , ...]:
103- if len (labelvalues ) != len (self ._labelnames ):
104- raise ValueError (
105- f"Expected { len (self ._labelnames )} labels { self ._labelnames } , got { len (labelvalues )} : { labelvalues } "
106- )
107- return labelvalues
108-
109- def set (self , labelvalues : Tuple [Any , ...], value : float ) -> None :
110- self ._values [self ._key (labelvalues )] = float (value )
111-
112- def inc (self , labelvalues : Tuple [Any , ...], amount : float = 1.0 ) -> None :
113- k = self ._key (labelvalues )
114- self ._values [k ] = float (self ._values .get (k , 0.0 ) + amount )
115-
116- def dec (self , labelvalues : Tuple [Any , ...], amount : float = 1.0 ) -> None :
117- k = self ._key (labelvalues )
118- self ._values [k ] = float (self ._values .get (k , 0.0 ) - amount )
119-
120- def clear (self ) -> None :
121- self ._values .clear ()
122-
123- def observations (self , Observation ) -> List [Any ]:
124- # Observation type is imported lazily from opentelemetry.metrics
125- out = []
126- for labelvalues , value in self ._values .items ():
127- attrs = dict (zip (self ._labelnames , labelvalues ))
128- out .append (Observation (value , attributes = attrs ))
129- return out
130-
131-
132102class OtlpMetrics (BaseMetrics ):
133103 """
134104 Canonical OpenTelemetry metrics implementation.
135105
136106 This exports metrics via OTLP/HTTP to a Prometheus server with OTLP receiver enabled:
137107 POST http(s)://<host>:<port>/api/v1/otlp/v1/metrics
138108
139- Naming notes (to preserve existing Prometheus series names as much as possible) :
140- - Counters are created WITHOUT `_total` suffix. Prometheus OTLP translation adds `_total` .
141- - Histogram is created WITHOUT `_seconds` suffix but with unit="s". Prometheus translation
142- typically results in `*_seconds_*` series (depending on Prometheus translation settings) .
109+ Naming notes:
110+ - Metric names follow OpenTelemetry conventions (dot-separated namespaces, e.g. `sdk.operations.total`) .
111+ - Prometheus OTLP translation typically converts dots to underscores and may add suffixes like
112+ `_total` for counters and `_bucket/_sum/_count` for histograms .
143113 """
144114
145115 def __init__ (self , otlp_metrics_endpoint : str ):
146116 from opentelemetry .exporter .otlp .proto .http .metric_exporter import (
147117 OTLPMetricExporter ,
148118 )
149- from opentelemetry .metrics import Observation
150119 from opentelemetry .sdk .metrics import MeterProvider
151120 from opentelemetry .sdk .metrics .export import PeriodicExportingMetricReader
121+ from opentelemetry .sdk .metrics .view import (
122+ ExplicitBucketHistogramAggregation ,
123+ View ,
124+ )
152125 from opentelemetry .sdk .resources import Resource
153126
154- self ._Observation = Observation
155-
156127 # Resource attributes: Prometheus maps service.name -> job, service.instance.id -> instance.
157128 resource = Resource .create (
158129 {
@@ -169,91 +140,126 @@ def __init__(self, otlp_metrics_endpoint: str):
169140 exporter = OTLPMetricExporter (endpoint = otlp_metrics_endpoint )
170141 reader = PeriodicExportingMetricReader (exporter ) # we force_flush() explicitly in push()
171142
172- self ._provider = MeterProvider (resource = resource , metric_readers = [reader ])
143+ latency_view = View (
144+ instrument_name = "sdk.operation.latency" ,
145+ aggregation = ExplicitBucketHistogramAggregation (
146+ boundaries = (
147+ 0.001 ,
148+ 0.002 ,
149+ 0.003 ,
150+ 0.004 ,
151+ 0.005 ,
152+ 0.0075 ,
153+ 0.010 ,
154+ 0.020 ,
155+ 0.050 ,
156+ 0.100 ,
157+ 0.200 ,
158+ 0.500 ,
159+ 1.000 ,
160+ )
161+ ),
162+ )
163+
164+ self ._provider = MeterProvider (
165+ resource = resource ,
166+ metric_readers = [reader ],
167+ views = [latency_view ],
168+ )
173169 self ._meter = self ._provider .get_meter ("ydb-slo" )
174170
175171 # Instruments (sync)
176172 self ._errors = self ._meter .create_counter (
177- name = "sdk_errors " ,
173+ name = "sdk.errors.total " ,
178174 description = "Total number of errors encountered, categorized by error type." ,
179175 )
180176 self ._operations_total = self ._meter .create_counter (
181- name = "sdk_operations " ,
177+ name = "sdk.operations.total " ,
182178 description = "Total number of operations, categorized by type attempted by the SDK." ,
183179 )
184180 self ._operations_success_total = self ._meter .create_counter (
185- name = "sdk_operations_success " ,
181+ name = "sdk.operations.success.total " ,
186182 description = "Total number of successful operations, categorized by type." ,
187183 )
188184 self ._operations_failure_total = self ._meter .create_counter (
189- name = "sdk_operations_failure " ,
185+ name = "sdk.operations.failure.total " ,
190186 description = "Total number of failed operations, categorized by type." ,
191187 )
192188 self ._latency = self ._meter .create_histogram (
193- name = "sdk_operation_latency " ,
189+ name = "sdk.operation.latency " ,
194190 unit = "s" ,
195191 description = "Latency of operations performed by the SDK in seconds, categorized by type and status." ,
196192 )
197193
198- # Pending operations: sync UpDownCounter (canonical for "in flight" style metrics).
199194 self ._pending = self ._meter .create_up_down_counter (
200- name = "sdk_pending_operations " ,
195+ name = "sdk.pending.operations " ,
201196 description = "Current number of pending operations, categorized by type." ,
202197 )
203198
204- # Retry attempts: ObservableGauge (we keep last value per label set and expose it via callback).
205- self ._retry_attempts_store = _GaugeStore (labelnames = ("operation_type" ,))
206-
207- def retry_attempts_cb (options = None ):
208- return self ._retry_attempts_store .observations (self ._Observation )
209-
210- self ._meter .create_observable_gauge (
211- name = "sdk_retry_attempts" ,
212- description = "Current retry attempts, categorized by operation type." ,
213- callbacks = [retry_attempts_cb ],
199+ self ._retry_attempts_total = self ._meter .create_counter (
200+ name = "sdk.retry.attempts.total" ,
201+ description = "Total number of retry attempts, categorized by ref and operation type." ,
214202 )
215203
216204 self .reset ()
217205
218206 def start (self , labels ) -> float :
219207 labels_t = _normalize_labels (labels )
220- self ._pending .add (1 , attributes = {"operation_type" : labels_t [0 ]})
208+ self ._pending .add (
209+ 1 ,
210+ attributes = {
211+ "ref" : REF ,
212+ "operation_type" : labels_t [0 ],
213+ },
214+ )
221215 return time .time ()
222216
223- def stop (self , labels , start_time : float , attempts : int = 1 , error : Exception = None ) -> None :
217+ def stop (
218+ self ,
219+ labels ,
220+ start_time : float ,
221+ attempts : int = 1 ,
222+ error : Optional [Exception ] = None ,
223+ ) -> None :
224224 labels_t = _normalize_labels (labels )
225225 duration = time .time () - start_time
226226
227+ op_type = labels_t [0 ]
228+ base_attrs = {
229+ "ref" : REF ,
230+ "operation_type" : op_type ,
231+ }
232+
227233 # Update instruments
228- self ._retry_attempts_store . set ( labels_t , float (attempts ))
229- self ._pending .add (- 1 , attributes = { "operation_type" : labels_t [ 0 ]} )
234+ self ._retry_attempts_total . add ( int (attempts ), attributes = base_attrs )
235+ self ._pending .add (- 1 , attributes = base_attrs )
230236
231237 # Counters + latency
232- self ._operations_total .add (1 , attributes = { "operation_type" : labels_t [ 0 ]} )
238+ self ._operations_total .add (1 , attributes = base_attrs )
233239
234240 if error is not None :
235241 self ._errors .add (
236242 1 ,
237243 attributes = {
238- "operation_type" : labels_t [ 0 ] ,
244+ ** base_attrs ,
239245 "error_type" : type (error ).__name__ ,
240246 },
241247 )
242- self ._operations_failure_total .add (1 , attributes = { "operation_type" : labels_t [ 0 ]} )
248+ self ._operations_failure_total .add (1 , attributes = base_attrs )
243249 self ._latency .record (
244250 duration ,
245251 attributes = {
246- "operation_type" : labels_t [ 0 ] ,
252+ ** base_attrs ,
247253 "operation_status" : OP_STATUS_FAILURE ,
248254 },
249255 )
250256 return
251257
252- self ._operations_success_total .add (1 , attributes = { "operation_type" : labels_t [ 0 ]} )
258+ self ._operations_success_total .add (1 , attributes = base_attrs )
253259 self ._latency .record (
254260 duration ,
255261 attributes = {
256- "operation_type" : labels_t [ 0 ] ,
262+ ** base_attrs ,
257263 "operation_status" : OP_STATUS_SUCCESS ,
258264 },
259265 )
@@ -265,8 +271,7 @@ def push(self) -> None:
265271
266272 def reset (self ) -> None :
267273 # OpenTelemetry counters/histograms are cumulative and cannot be reset.
268- # Reset only affects the ObservableGauge-backed state.
269- self ._retry_attempts_store .clear ()
274+ # Reset is implemented as an immediate push/flush.
270275 self .push ()
271276
272277
0 commit comments