4
4
"""
5
5
import os
6
6
import json
7
+ import functools
7
8
import subprocess
8
9
from textwrap import dedent
9
10
@@ -33,34 +34,56 @@ def get_request_type(name):
33
34
return 'misc'
34
35
35
36
36
- def process_minio_tenant_main_audit_logs (data , agg_data ):
37
+ def process_minio_tenant_main_audit_logs_update_agg_data (agg_data , namespace_name , request_type , tx , rx ):
38
+ if namespace_name not in agg_data :
39
+ logs .debug (f"process_minio_tenant_main_audit_logs: { namespace_name } " , 10 )
40
+ agg_data [namespace_name ] = DEPLOYMENT_API_METRICS_BASE_DATA .copy ()
41
+ agg_data [namespace_name ][f'bytes_in' ] += int (rx )
42
+ agg_data [namespace_name ][f'bytes_out' ] += int (tx )
43
+ agg_data [namespace_name ][f'num_requests_{ request_type } ' ] += 1
44
+
45
+
46
+ def process_minio_tenant_main_audit_logs (data , agg_data , domains_config ):
37
47
data_api = data .get ('api' , {})
38
- bucket = data_api .get ('bucket' ) or None
48
+ bucket = data_api .get ('bucket' )
39
49
if bucket :
40
50
namespace_name = common .get_namespace_name_from_bucket_name (bucket )
41
51
if namespace_name :
42
- if namespace_name not in agg_data :
43
- logs .debug (f"process_minio_tenant_main_audit_logs: { namespace_name } " , 8 )
44
- agg_data [namespace_name ] = DEPLOYMENT_API_METRICS_BASE_DATA .copy ()
45
- logs .debug ('process_minio_tenant_main_audit_logs' , 10 , data_api = data_api )
46
52
tx = data_api .get ('tx' ) or 0
47
53
rx = data_api .get ('rx' ) or 0
48
- agg_data [namespace_name ][f'bytes_in' ] += rx
49
- agg_data [namespace_name ][f'bytes_out' ] += tx
50
54
request_type = get_request_type (data_api .get ('name' ))
51
- agg_data [namespace_name ][f'num_requests_{ request_type } ' ] += 1
55
+ process_minio_tenant_main_audit_logs_update_agg_data (agg_data , namespace_name , request_type , tx , rx )
56
+ logs .debug ('process_minio_tenant_main_audit_logs (minio)' , 10 , data_api = data_api )
57
+ elif data .get ('message' ) and (data .get ('ident' ) or '' ).startswith ('nginx-' ):
58
+ message = json .loads (data ['message' ])
59
+ host = message .get ('host' )
60
+ upstream_cache_status = message .get ('upstream_cache_status' )
61
+ if host and upstream_cache_status == 'HIT' :
62
+ try :
63
+ worker_id = domains_config .get_cwm_api_volume_config (hostname = host ).id
64
+ except :
65
+ worker_id = None
66
+ if worker_id :
67
+ namespace_name = common .get_namespace_name_from_worker_id (worker_id )
68
+ if namespace_name :
69
+ request = message .get ('request' ) or ''
70
+ request_type = 'out' if request .startswith ('GET ' ) else 'misc'
71
+ tx = message .get ('bytes_sent' ) or 0
72
+ rx = message .get ('request_length' ) or 0
73
+ process_minio_tenant_main_audit_logs_update_agg_data (agg_data , namespace_name , request_type , tx , rx )
74
+ logs .debug ('process_minio_tenant_main_audit_logs (cdn)' , 10 , message = message )
52
75
53
76
54
77
def commit_minio_tenant_main_audit_logs (domains_config , agg_data ):
55
- logs .debug (f"commit_minio_tenant_main_audit_logs: { agg_data } " , 8 )
78
+ logs .debug (f"commit_minio_tenant_main_audit_logs: { agg_data } " , 10 )
56
79
for namespace_name , data in agg_data .items ():
57
80
domains_config .update_deployment_api_metrics (namespace_name , data )
58
81
domains_config .set_deployment_last_action (namespace_name )
59
82
60
83
61
- def process_data (topic , data , agg_data ):
84
+ def process_data (topic , data , agg_data , domains_config ):
62
85
if topic == MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC :
63
- process_minio_tenant_main_audit_logs (data , agg_data )
86
+ process_minio_tenant_main_audit_logs (data , agg_data , domains_config )
64
87
else :
65
88
raise NotImplementedError (f"topic { topic } is not supported" )
66
89
@@ -81,7 +104,7 @@ def delete_records(topic, latest_partition_offset):
81
104
]
82
105
if len (partitions ) > 0 :
83
106
offset_json = json .dumps ({'partitions' : partitions , 'version' : 1 })
84
- logs .debug (f"Deleting records: { offset_json } " , 8 )
107
+ logs .debug (f"Deleting records: { offset_json } " , 10 )
85
108
subprocess .check_call ([
86
109
'kubectl' , 'exec' , '-n' , config .KAFKA_STREAMER_POD_NAMESPACE , config .KAFKA_STREAMER_POD_NAME , '--' , 'bash' , '-c' , dedent (f'''
87
110
TMPFILE=$(mktemp) &&\
@@ -96,7 +119,7 @@ def run_single_iteration(domains_config: DomainsConfig, topic, daemon, no_kafka_
96
119
start_time = common .now ()
97
120
assert topic , "topic is required"
98
121
assert config .KAFKA_STREAMER_BOOTSTRAP_SERVERS
99
- logs .debug (f"running iteration for topic: { topic } " , 8 )
122
+ logs .debug (f"running iteration for topic: { topic } " , 10 )
100
123
consumer = Consumer ({
101
124
'bootstrap.servers' : config .KAFKA_STREAMER_BOOTSTRAP_SERVERS ,
102
125
'group.id' : config .KAFKA_STREAMER_OPERATOR_GROUP_ID ,
@@ -106,20 +129,21 @@ def run_single_iteration(domains_config: DomainsConfig, topic, daemon, no_kafka_
106
129
latest_partition_offset = {}
107
130
try :
108
131
agg_data = {}
132
+ commit_ = functools .partial (commit , topic , consumer , domains_config , agg_data , no_kafka_commit = no_kafka_commit )
109
133
while (common .now () - start_time ).total_seconds () < config .KAFKA_STREAMER_POLL_TIME_SECONDS and not daemon .terminate_requested :
110
134
msg = consumer .poll (timeout = config .KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS )
111
135
if msg is None :
112
- # logs.debug("Waiting for messages...", 10)
113
- pass
136
+ logs .debug ("Waiting for messages..." , 10 )
137
+ commit_ ()
114
138
elif msg .error ():
115
139
raise Exception (f"Message ERROR: { msg .error ()} " )
116
140
else :
117
141
offset = msg .offset ()
118
142
partition = msg .partition ()
119
143
latest_partition_offset [partition ] = offset
120
144
data = json .loads (msg .value ())
121
- process_data (topic , data , agg_data )
122
- commit ( topic , consumer , domains_config , agg_data , no_kafka_commit = no_kafka_commit )
145
+ process_data (topic , data , agg_data , domains_config )
146
+ commit_ ( )
123
147
except KeyboardInterrupt :
124
148
pass
125
149
finally :
0 commit comments