Skip to content

Commit a902ba9

Browse files
committed
add kafka streamer
1 parent eb5cbd4 commit a902ba9

File tree

7 files changed

+189
-2
lines changed

7 files changed

+189
-2
lines changed

cwm_worker_operator/cli.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@ def _callback(*args, **kwargs):
6969
], 'help': 'Check and update a single worker, used by workers-checker to run async operations'}
7070
}},
7171
{'name': 'throttler'},
72+
{
73+
'name': 'kafka_streamer',
74+
'extra_params': [
75+
click.Option(['--topic']),
76+
click.Option(['--no-kafka-commit'], is_flag=True),
77+
click.Option(['--no-kafka-delete'], is_flag=True),
78+
]
79+
},
7280
]:
7381
try:
7482
daemon_module = importlib.import_module('cwm_worker_operator.{}'.format(daemon['name'].replace('-', '_')))
@@ -81,7 +89,8 @@ def _callback(*args, **kwargs):
8189
name='start_daemon',
8290
callback=daemon_module.start_daemon,
8391
params=[
84-
*([click.Option(['--once'], is_flag=True)] if daemon.get('with_once') != False else [])
92+
*([click.Option(['--once'], is_flag=True)] if daemon.get('with_once') != False else []),
93+
*(daemon['extra_params'] if 'extra_params' in daemon else []),
8594
]
8695
),
8796
**{

cwm_worker_operator/common.py

+9
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ def get_worker_id_from_namespace_name(namespace_name):
6666
return worker_id
6767

6868

69+
@lru_cache(maxsize=9999)
70+
def get_namespace_name_from_bucket_name(bucket_name):
71+
if is_worker_namespace(bucket_name):
72+
return bucket_name
73+
else:
74+
return None
75+
76+
6977
def is_worker_namespace(namespace_name):
7078
return (
7179
namespace_name.startswith('cwm-worker-')
@@ -75,6 +83,7 @@ def is_worker_namespace(namespace_name):
7583
]
7684
)
7785

86+
7887
def is_hostnames_match(full_hostname, partial_hostname):
7988
if full_hostname.lower() == partial_hostname.lower():
8089
return True

cwm_worker_operator/config.py

+14
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,17 @@
154154
MINIO_TENANT_ENDPOINT = os.environ.get('MINIO_TENANT_ENDPOINT')
155155
MINIO_TENANT_ADMIN_USER = os.environ.get('MINIO_TENANT_ADMIN_USER')
156156
MINIO_TENANT_ADMIN_PASSWORD = os.environ.get('MINIO_TENANT_ADMIN_PASSWORD')
157+
158+
KAFKA_STREAMER_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS')
159+
KAFKA_STREAMER_TOPIC = os.environ.get('KAFKA_STREAMER_TOPIC') or 'minio-tenant-main-audit-logs'
160+
KAFKA_STREAMER_POD_NAMESPACE = os.environ.get('KAFKA_MINIO_AUDIT_POD_NAMESPACE') or 'strimzi'
161+
KAFKA_STREAMER_POD_NAME = os.environ.get('KAFKA_MINIO_AUDIT_POD_NAME') or 'minio-audit-kafka-0'
162+
KAFKA_STREAMER_OPERATOR_GROUP_ID = os.environ.get('KAFKA_STREAMER_OPERATOR_GROUP_ID') or f'cwm_worker_operator_{KAFKA_STREAMER_TOPIC}'
163+
KAFKA_STREAMER_CONSUMER_CONFIG = json.loads(os.environ.get('KAFKA_STREAMER_CONSUMER_CONFIG_JSON') or '''{
164+
"auto.offset.reset": "earliest",
165+
"enable.auto.commit": false,
166+
"api.version.request.timeout.ms": 25000
167+
}''')
168+
KAFKA_STREAMER_POLL_TIME_SECONDS = int(os.environ.get('KAFKA_STREAMER_POLL_TIME_SECONDS') or '60')
169+
KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS = float(os.environ.get('KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS') or '1.0')
170+
KAFKA_STREAMER_SLEEP_TIME_BETWEEN_ITERATIONS_SECONDS = int(os.environ.get('KAFKA_STREAMER_SLEEP_TIME_BETWEEN_ITERATIONS_SECONDS') or '1')

cwm_worker_operator/deployments_manager.py

-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ def deploy_preprocess_specs(self, specs):
275275
res[key] = True
276276
return res
277277

278-
279278
def deploy_minio(self, deployment_config, dry_run=False):
280279
username = deployment_config['cwm-worker-deployment']['namespace']
281280
password = deployment_config['minio']['access_key']

cwm_worker_operator/domains_config.py

+12
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,15 @@ def get_deployment_api_metrics(self, namespace_name):
756756
for key in r.keys(base_key + "*")
757757
}
758758

759+
def update_deployment_api_metrics(self, namespace_name, data):
760+
from .kafka_streamer import DEPLOYMENT_API_METRICS_BASE_DATA
761+
assert data.keys() == DEPLOYMENT_API_METRICS_BASE_DATA.keys()
762+
base_key = "{}:".format(self.keys.deployment_api_metric._(namespace_name))
763+
with self.keys.deployment_api_metric.get_redis() as r:
764+
for metric_key, value in data.items():
765+
key = f'{base_key}{metric_key}'
766+
r.incrby(key, value)
767+
759768
def set_worker_aggregated_metrics(self, worker_id, agg_metrics):
760769
self.keys.worker_aggregated_metrics.set(worker_id, json.dumps(agg_metrics))
761770

@@ -786,6 +795,9 @@ def get_deployment_last_action(self, namespace_name):
786795
latest_value = value
787796
return latest_value if latest_value else None
788797

798+
def set_deployment_last_action(self, namespace_name):
799+
self.keys.deployment_last_action.set(namespace_name, common.now().strftime("%Y%m%dT%H%M%S"))
800+
789801
def get_key_summary_single_multi_domain(self, r, key_name, key, max_keys_per_summary, is_api=False):
790802
if isinstance(key, DomainsConfigKeyStatic):
791803
match = key._()

cwm_worker_operator/kafka_streamer.py

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""
2+
Streams / aggregates data from a Kafka topic
3+
This daemon can run multiple instances in parallel, each instance handling a different topic.
4+
"""
5+
import os
6+
import json
7+
import subprocess
8+
from textwrap import dedent
9+
10+
from confluent_kafka import Consumer
11+
12+
from cwm_worker_operator.daemon import Daemon
13+
from cwm_worker_operator import config, common, logs
14+
from cwm_worker_operator.domains_config import DomainsConfig
15+
16+
17+
MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC = 'minio-tenant-main-audit-logs'
18+
DEPLOYMENT_API_METRICS_BASE_DATA = {
19+
'bytes_in': 0,
20+
'bytes_out': 0,
21+
'num_requests_in': 0,
22+
'num_requests_out': 0,
23+
'num_requests_misc': 0,
24+
}
25+
26+
27+
def get_request_type(name):
28+
if name in ['WebUpload', 'PutObject', 'DeleteObject']:
29+
return 'in'
30+
elif name in ['WebDownload', 'GetObject']:
31+
return 'out'
32+
else:
33+
return 'misc'
34+
35+
36+
def process_minio_tenant_main_audit_logs(data, agg_data):
37+
data_api = data.get('api', {})
38+
bucket = data_api.get('bucket') or None
39+
if bucket:
40+
namespace_name = common.get_namespace_name_from_bucket_name(bucket)
41+
if namespace_name:
42+
if namespace_name not in agg_data:
43+
logs.debug(f"process_minio_tenant_main_audit_logs: {namespace_name}", 8)
44+
agg_data[namespace_name] = DEPLOYMENT_API_METRICS_BASE_DATA.copy()
45+
logs.debug('process_minio_tenant_main_audit_logs', 10, data_api=data_api)
46+
tx = data_api.get('tx') or 0
47+
rx = data_api.get('rx') or 0
48+
agg_data[namespace_name][f'bytes_in'] += rx
49+
agg_data[namespace_name][f'bytes_out'] += tx
50+
request_type = get_request_type(data_api.get('name'))
51+
agg_data[namespace_name][f'num_requests_{request_type}'] += 1
52+
53+
54+
def commit_minio_tenant_main_audit_logs(domains_config, agg_data):
55+
logs.debug(f"commit_minio_tenant_main_audit_logs: {agg_data}", 8)
56+
for namespace_name, data in agg_data.items():
57+
domains_config.update_deployment_api_metrics(namespace_name, data)
58+
domains_config.set_deployment_last_action(namespace_name)
59+
60+
61+
def process_data(topic, data, agg_data):
62+
if topic == MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC:
63+
process_minio_tenant_main_audit_logs(data, agg_data)
64+
else:
65+
raise NotImplementedError(f"topic {topic} is not supported")
66+
67+
68+
def commit(topic, consumer, domains_config, agg_data, no_kafka_commit=False):
69+
if topic == MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC:
70+
commit_minio_tenant_main_audit_logs(domains_config, agg_data)
71+
else:
72+
raise NotImplementedError(f"topic {topic} is not supported")
73+
if not no_kafka_commit:
74+
consumer.commit()
75+
76+
77+
def delete_records(topic, latest_partition_offset):
78+
partitions = [
79+
{'topic': topic, 'partition': p, 'offset': o}
80+
for p, o in latest_partition_offset.items()
81+
]
82+
if len(partitions) > 0:
83+
offset_json = json.dumps({'partitions': partitions, 'version': 1})
84+
logs.debug(f"Deleting records: {offset_json}", 8)
85+
subprocess.check_call([
86+
'kubectl', 'exec', '-n', config.KAFKA_STREAMER_POD_NAMESPACE, config.KAFKA_STREAMER_POD_NAME, '--', 'bash', '-c', dedent(f'''
87+
TMPFILE=$(mktemp) &&\
88+
echo '{offset_json}' > $TMPFILE &&\
89+
bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file $TMPFILE &&\
90+
rm $TMPFILE
91+
''').strip()
92+
], env={**os.environ, 'DEBUG': ''})
93+
94+
95+
def run_single_iteration(domains_config: DomainsConfig, topic=None, no_kafka_commit=False, no_kafka_delete=False, **_):
96+
start_time = common.now()
97+
if not topic:
98+
topic = config.KAFKA_STREAMER_TOPIC
99+
assert topic, "topic is required"
100+
logs.debug(f"running iteration for topic: {topic}", 8)
101+
consumer = Consumer({
102+
'bootstrap.servers': config.KAFKA_STREAMER_BOOTSTRAP_SERVERS,
103+
'group.id': config.KAFKA_STREAMER_OPERATOR_GROUP_ID,
104+
**config.KAFKA_STREAMER_CONSUMER_CONFIG
105+
})
106+
consumer.subscribe([topic])
107+
latest_partition_offset = {}
108+
try:
109+
agg_data = {}
110+
while (common.now() - start_time).total_seconds() < config.KAFKA_STREAMER_POLL_TIME_SECONDS:
111+
msg = consumer.poll(timeout=config.KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS)
112+
if msg is None:
113+
# logs.debug("Waiting for messages...", 10)
114+
pass
115+
elif msg.error():
116+
raise Exception(f"Message ERROR: {msg.error()}")
117+
else:
118+
offset = msg.offset()
119+
partition = msg.partition()
120+
latest_partition_offset[partition] = offset
121+
data = json.loads(msg.value())
122+
process_data(topic, data, agg_data)
123+
commit(topic, consumer, domains_config, agg_data, no_kafka_commit=no_kafka_commit)
124+
except KeyboardInterrupt:
125+
pass
126+
finally:
127+
consumer.close()
128+
if not no_kafka_delete:
129+
delete_records(topic, latest_partition_offset)
130+
131+
132+
def start_daemon(once=False, domains_config=None, topic=None, no_kafka_commit=False, no_kafka_delete=False):
133+
assert topic
134+
Daemon(
135+
name=f"kafka_streamer_{topic}",
136+
sleep_time_between_iterations_seconds=config.KAFKA_STREAMER_SLEEP_TIME_BETWEEN_ITERATIONS_SECONDS,
137+
domains_config=domains_config,
138+
run_single_iteration_callback=run_single_iteration,
139+
run_single_iteration_extra_kwargs={'topic': topic, 'no_kafka_commit': no_kafka_commit, 'no_kafka_delete': no_kafka_delete},
140+
).start(
141+
once=once,
142+
with_prometheus=False,
143+
)

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ uvicorn[standard]==0.27.0.post1
88
fastapi==0.109.1
99
gunicorn==21.2.0
1010
minio==7.2.3
11+
confluent-kafka==2.3.0

0 commit comments

Comments
 (0)