1
- use futures :: future ;
1
+ use chrono :: Utc ;
2
2
use kube:: { api:: ListParams , Api , Client , Resource , ResourceExt } ;
3
3
use prometheus:: { opts, IntCounterVec , Registry } ;
4
- use std:: { collections:: HashMap , sync:: Arc } ;
5
- use tracing:: { error, info, instrument} ;
4
+ use serde:: { Deserialize , Deserializer } ;
5
+ use std:: sync:: Arc ;
6
+ use tracing:: { error, info, instrument, warn} ;
6
7
7
- use crate :: {
8
- get_config,
9
- postgres:: { Postgres , UserStatements } ,
10
- DbSyncPort , Error , State ,
11
- } ;
8
+ use crate :: { get_config, Config , DbSyncPort , Error , State } ;
12
9
13
10
#[ derive( Clone ) ]
14
11
pub struct Metrics {
@@ -135,13 +132,14 @@ pub async fn run_metrics_collector(state: Arc<State>) {
135
132
. await
136
133
. expect ( "failed to create kube client" ) ;
137
134
138
- let config = get_config ( ) ;
139
-
140
- let mut metrics_state: HashMap < String , HashMap < String , UserStatements > > = HashMap :: new ( ) ;
141
-
142
135
let crds_api = Api :: < DbSyncPort > :: all ( client. clone ( ) ) ;
143
136
137
+ let config = get_config ( ) ;
138
+ let mut last_execution = Utc :: now ( ) ;
139
+
144
140
loop {
141
+ tokio:: time:: sleep ( config. metrics_delay ) . await ;
142
+
145
143
let crds_result = crds_api. list ( & ListParams :: default ( ) ) . await ;
146
144
if let Err ( error) = crds_result {
147
145
error ! ( error = error. to_string( ) , "error to get k8s resources" ) ;
@@ -150,97 +148,115 @@ pub async fn run_metrics_collector(state: Arc<State>) {
150
148
}
151
149
let crds = crds_result. unwrap ( ) ;
152
150
153
- for crd in crds . items . iter ( ) . filter ( |i| i . status . is_some ( ) ) {
154
- let status = crd . status . as_ref ( ) . unwrap ( ) ;
151
+ let end = Utc :: now ( ) ;
152
+ let interval = ( end - last_execution ) . num_seconds ( ) ;
155
153
156
- let pg_connections_result = state. get_pg_by_network ( & crd. spec . network ) ;
157
- if let Err ( error) = pg_connections_result {
158
- error ! ( error = error. to_string( ) ) ;
159
- state. metrics . metrics_failure ( & error) ;
154
+ last_execution = end;
155
+
156
+ let query = format ! (
157
+ "sum by (user) (avg_over_time(pgbouncer_pools_client_active_connections{{user=~\" dmtr_.*\" }}[{interval}s] @ {})) > 0" ,
158
+ end. timestamp_millis( ) / 1000
159
+ ) ;
160
+ dbg ! ( & query) ;
161
+
162
+ let response = collect_prometheus_metrics ( config, query) . await ;
163
+ if let Err ( err) = response {
164
+ error ! ( error = err. to_string( ) , "error to make prometheus request" ) ;
165
+ state. metrics . metrics_failure ( & err) ;
166
+ continue ;
167
+ }
168
+ let response = response. unwrap ( ) ;
169
+
170
+ for result in response. data . result {
171
+ let crd = crds
172
+ . iter ( )
173
+ . filter ( |c| c. status . is_some ( ) )
174
+ . find ( |c| c. status . as_ref ( ) . unwrap ( ) . username . eq ( & result. metric . user ) ) ;
175
+
176
+ if crd. is_none ( ) {
177
+ warn ! ( user = result. metric. user, "username doesnt have a crd" ) ;
160
178
continue ;
161
179
}
162
180
163
- let user_statements_result =
164
- get_user_statements ( & status. username , pg_connections_result. unwrap ( ) ) . await ;
165
- if let Err ( error) = user_statements_result {
166
- error ! ( error = error. to_string( ) , "error get user statements" ) ;
181
+ let crd = crd. unwrap ( ) ;
182
+
183
+ let dcu_per_second = config. dcu_per_second . get ( & crd. spec . network ) ;
184
+ if dcu_per_second. is_none ( ) {
185
+ let error = Error :: ConfigError ( format ! (
186
+ "dcu_per_second not configured to {} network" ,
187
+ & crd. spec. network
188
+ ) ) ;
189
+ error ! ( error = error. to_string( ) ) ;
167
190
state. metrics . metrics_failure ( & error) ;
168
191
continue ;
169
192
}
170
193
171
- let user_statements = user_statements_result. unwrap ( ) ;
172
-
173
- let latest_user_statement = metrics_state
174
- . entry ( crd. spec . network . clone ( ) )
175
- . or_default ( )
176
- . get ( & user_statements. usename ) ;
177
-
178
- if let Some ( latest_user_statement) = latest_user_statement {
179
- let total_exec_time =
180
- user_statements. total_exec_time - latest_user_statement. total_exec_time ;
181
-
182
- if total_exec_time == 0.0 {
183
- continue ;
184
- }
185
-
186
- let dcu_per_second = config. dcu_per_second . get ( & crd. spec . network ) ;
187
- if dcu_per_second. is_none ( ) {
188
- let error = Error :: ConfigError ( format ! (
189
- "dcu_per_second not configured to {} network" ,
190
- & crd. spec. network
191
- ) ) ;
192
- error ! ( error = error. to_string( ) ) ;
193
- state. metrics . metrics_failure ( & error) ;
194
- continue ;
195
- }
196
-
197
- let dcu_per_second = dcu_per_second. unwrap ( ) ;
198
-
199
- let dcu = ( total_exec_time / 1000. ) * dcu_per_second;
200
- state. metrics . count_dcu_consumed (
201
- & crd. namespace ( ) . unwrap ( ) ,
202
- & crd. spec . network ,
203
- dcu,
204
- ) ;
205
- }
194
+ let dcu_per_second = dcu_per_second. unwrap ( ) ;
195
+ let total_exec_time = result. value * ( interval as f64 ) ;
206
196
207
- metrics_state
208
- . entry ( crd. spec . network . clone ( ) )
209
- . and_modify ( |statements| {
210
- statements. insert ( user_statements. usename . clone ( ) , user_statements) ;
211
- } ) ;
197
+ let dcu = total_exec_time * dcu_per_second;
198
+ state
199
+ . metrics
200
+ . count_dcu_consumed ( & crd. namespace ( ) . unwrap ( ) , & crd. spec . network , dcu) ;
212
201
}
213
-
214
- tokio:: time:: sleep ( config. metrics_delay ) . await ;
215
202
}
216
203
} ) ;
217
204
}
218
205
219
- async fn get_user_statements (
220
- username : & str ,
221
- pg_connections : & [ Postgres ] ,
222
- ) -> Result < UserStatements , Error > {
223
- let tasks = future:: join_all (
224
- pg_connections
225
- . iter ( )
226
- . map ( |pg| pg. find_metrics_by_user ( username) ) ,
227
- )
228
- . await ;
229
-
230
- let mut user_statements_all_host = UserStatements {
231
- usename : username. into ( ) ,
232
- total_exec_time : 0. ,
233
- } ;
234
-
235
- for user_statements_by_host_result in tasks. into_iter ( ) {
236
- let user_statements_by_host = user_statements_by_host_result?;
237
- if user_statements_by_host. is_none ( ) {
238
- continue ;
239
- }
240
-
241
- let user_statements_by_host = user_statements_by_host. unwrap ( ) ;
242
- user_statements_all_host. total_exec_time += user_statements_by_host. total_exec_time ;
206
+ async fn collect_prometheus_metrics (
207
+ config : & Config ,
208
+ query : String ,
209
+ ) -> Result < PrometheusResponse , Error > {
210
+ let client = reqwest:: Client :: builder ( ) . build ( ) . unwrap ( ) ;
211
+
212
+ let response = client
213
+ . get ( format ! ( "{}/query?query={query}" , config. prometheus_url) )
214
+ . send ( )
215
+ . await ?;
216
+
217
+ let status = response. status ( ) ;
218
+ if status. is_client_error ( ) || status. is_server_error ( ) {
219
+ error ! ( status = status. to_string( ) , "request status code fail" ) ;
220
+ return Err ( Error :: HttpError ( format ! (
221
+ "Prometheus request error. Status: {} Query: {}" ,
222
+ status, query
223
+ ) ) ) ;
243
224
}
244
225
245
- Ok ( user_statements_all_host)
226
+ Ok ( response. json ( ) . await . unwrap ( ) )
227
+ }
228
+
229
+ #[ derive( Debug , Deserialize ) ]
230
+ struct PrometheusDataResultMetric {
231
+ user : String ,
232
+ }
233
+
234
+ #[ derive( Debug , Deserialize ) ]
235
+ struct PrometheusDataResult {
236
+ metric : PrometheusDataResultMetric ,
237
+ #[ serde( deserialize_with = "deserialize_value" ) ]
238
+ value : f64 ,
239
+ }
240
+
241
+ #[ derive( Debug , Deserialize ) ]
242
+ #[ serde( rename_all = "camelCase" ) ]
243
+ struct PrometheusData {
244
+ result : Vec < PrometheusDataResult > ,
245
+ }
246
+
247
+ #[ derive( Debug , Deserialize ) ]
248
+ struct PrometheusResponse {
249
+ data : PrometheusData ,
250
+ }
251
+
252
+ fn deserialize_value < ' de , D > ( deserializer : D ) -> Result < f64 , D :: Error >
253
+ where
254
+ D : Deserializer < ' de > ,
255
+ {
256
+ let value: Vec < serde_json:: Value > = Deserialize :: deserialize ( deserializer) ?;
257
+ Ok ( value. into_iter ( ) . as_slice ( ) [ 1 ]
258
+ . as_str ( )
259
+ . unwrap ( )
260
+ . parse :: < f64 > ( )
261
+ . unwrap ( ) )
246
262
}
0 commit comments