Skip to content

Commit 1e8ff89

Browse files
committed
fix updater, deployer, waiter to support new architecture
1 parent c9e343e commit 1e8ff89

File tree

6 files changed

+118
-44
lines changed

6 files changed

+118
-44
lines changed

cwm_worker_operator/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,7 @@
150150
THROTTLER_THROTTLE_MAX_REQUESTS = int(os.environ.get('THROTTLER_THROTTLE_MAX_REQUESTS') or '10000000')
151151
# period of time during which the maximum number of requests should be reached to throttle
152152
THROTTLER_CHECK_TTL_SECONDS = int(os.environ.get('THROTTLER_CHECK_TTL_SECONDS') or '60')
153+
154+
MINIO_TENANT_ENDPOINT = os.environ.get('MINIO_TENANT_ENDPOINT')
155+
MINIO_TENANT_ADMIN_USER = os.environ.get('MINIO_TENANT_ADMIN_USER')
156+
MINIO_TENANT_ADMIN_PASSWORD = os.environ.get('MINIO_TENANT_ADMIN_PASSWORD')

cwm_worker_operator/deployments_manager.py

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
import requests
1414
from ruamel import yaml
15+
from minio.minioadmin import MinioAdmin
16+
from minio.credentials.providers import StaticProvider
17+
from minio import Minio
1518

1619
from cwm_worker_operator import logs
1720
from cwm_worker_operator import config
@@ -48,6 +51,17 @@ def parse_datetime_from_kubelet_log_line(line):
4851
return datetime.datetime.strptime('{}{} {}+00:00'.format(datetime.datetime.now().year, datepart, timepart), '%YI%m%d %H:%M:%S.%f%z')
4952

5053

54+
def get_minio_admin():
55+
return MinioAdmin(
56+
config.MINIO_TENANT_ENDPOINT,
57+
StaticProvider(config.MINIO_TENANT_ADMIN_USER, config.MINIO_TENANT_ADMIN_PASSWORD)
58+
)
59+
60+
61+
def get_minio():
62+
return Minio(config.MINIO_TENANT_ENDPOINT, config.MINIO_TENANT_ADMIN_USER, config.MINIO_TENANT_ADMIN_PASSWORD)
63+
64+
5165
class NodeCleanupPod:
5266

5367
def __init__(self, namespace_name, pod_name, node_name):
@@ -151,22 +165,65 @@ def init_cache(self):
151165
print("Failed to initialize chart cache for version {}".format(version))
152166

153167
def init(self, deployment_config):
154-
cwm_worker_deployment.deployment.init(deployment_config)
168+
pass
169+
# cwm_worker_deployment.deployment.init(deployment_config)
155170

156171
def deploy_external_service(self, deployment_config):
157-
cwm_worker_deployment.deployment.deploy_external_service(deployment_config)
172+
pass
173+
# cwm_worker_deployment.deployment.deploy_external_service(deployment_config)
158174

159175
def deploy_extra_objects(self, deployment_config, extra_objects):
160-
cwm_worker_deployment.deployment.deploy_extra_objects(deployment_config, extra_objects)
176+
pass
177+
# cwm_worker_deployment.deployment.deploy_extra_objects(deployment_config, extra_objects)
161178

162179
def deploy_preprocess_specs(self, specs):
163-
return cwm_worker_deployment.deployment.deploy_preprocess_specs(specs)
164-
165-
def deploy(self, deployment_config, **kwargs):
166-
return cwm_worker_deployment.deployment.deploy(deployment_config, **kwargs)
180+
res = {}
181+
for key, spec in specs.items():
182+
# currently we don't have any preprocess spec, but we should set True to prevent re-preprocessing
183+
res[key] = True
184+
return res
185+
186+
def deploy(self, deployment_config, dry_run=False, **kwargs):
187+
# return cwm_worker_deployment.deployment.deploy(deployment_config, **kwargs)
188+
username = deployment_config['cwm-worker-deployment']['namespace']
189+
password = deployment_config['minio']['access_key']
190+
if dry_run:
191+
return f"username: {username}"
192+
else:
193+
minio_admin = get_minio_admin()
194+
minio_admin.user_add(username, password)
195+
with tempfile.NamedTemporaryFile('w') as f:
196+
json.dump({
197+
"Version": "2012-10-17",
198+
"Statement": [
199+
{
200+
"Action": ["s3:*"],
201+
"Effect": "Allow",
202+
"Resource": [f"arn:aws:s3:::{username}",f"arn:aws:s3:::{username}/*"]
203+
},
204+
{
205+
"Action": ["s3:DeleteBucket"],
206+
"Effect": "Deny",
207+
"Resource": [f"arn:aws:s3:::{username}"]
208+
},
209+
{
210+
"Action": ["s3:ListAllMyBuckets"],
211+
"Effect": "Allow",
212+
"Resource": ["arn:aws:s3:::*"]
213+
}
214+
]
215+
}, f)
216+
f.seek(0)
217+
minio_admin.policy_add(username, f.name)
218+
minio_admin.policy_set(username, username)
219+
minio = get_minio()
220+
if not minio.bucket_exists(username):
221+
minio.make_bucket(username)
222+
return f"OK, username/bucket: {username}"
167223

168224
def is_ready(self, namespace_name, deployment_type, minimal_check=False):
169-
return cwm_worker_deployment.deployment.is_ready(namespace_name, deployment_type, minimal_check=minimal_check)
225+
#return cwm_worker_deployment.deployment.is_ready(namespace_name, deployment_type, minimal_check=minimal_check)
226+
return get_minio().bucket_exists(namespace_name)
170227

171228
def get_health(self, namespace_name, deployment_type):
172229
return cwm_worker_deployment.deployment.get_health(namespace_name, deployment_type)
@@ -209,8 +266,8 @@ def delete(self, namespace_name, deployment_type, **kwargs):
209266
cwm_worker_deployment.deployment.delete(namespace_name, deployment_type, **kwargs)
210267

211268
def iterate_all_releases(self):
212-
for release in cwm_worker_deployment.helm.iterate_all_releases("minio"):
213-
yield release
269+
for user in json.loads(get_minio_admin().user_list()):
270+
yield user
214271

215272
def get_prometheus_metrics(self, namespace_name):
216273
metrics = {}

cwm_worker_operator/domains_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ def __init__(self, data, domains_config, request_hostname=None, is_data_from_cac
252252
'payload': hostname['payload']
253253
}
254254
self.primary_hostname = data.get('primary instance')
255-
self.client_id = data.get("client_id")
256-
self.secret = data.get("secret")
255+
self.secret = self.client_id = data.get("client_id")
256+
# self.secret = data.get("secret")
257257
self.cache_enabled = bool(data.get('cache'))
258258
try:
259259
self.cache_exclude_extensions = [ext.strip() for ext in data.get('cache-exclude', '').split('|') if ext.strip()]

cwm_worker_operator/updater.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from cwm_worker_operator.daemon import Daemon
1515
from cwm_worker_operator.domains_config import DomainsConfig
1616
from cwm_worker_operator.multiprocessor import Multiprocessor
17+
from cwm_worker_operator.deployments_manager import DeploymentsManager
1718

1819

1920
DATETIME_FORMAT = '%Y%m%dT%H%M%S%z'
@@ -49,14 +50,14 @@ def check_update_release(domains_config, updater_metrics, namespace_name, last_u
4950
domains_config.set_worker_force_delete(worker_id, allow_cancel=False)
5051
if updater_metrics:
5152
updater_metrics.force_delete(worker_id, start_time)
52-
elif instance_update == 'update':
53-
msg = "domain force update (from cwm updates api)"
53+
elif instance_update in ('update', 'create'):
54+
msg = f"domain force {instance_update} (from cwm updates api)"
5455
logs.debug(msg, debug_verbosity=4, worker_id=worker_id, start_time=start_time)
5556
domains_config.set_worker_force_update(worker_id)
5657
if updater_metrics:
5758
updater_metrics.not_deployed_force_update(worker_id, start_time)
5859
else:
59-
hours_since_last_update = (common.now() - last_updated).total_seconds() / 60 / 60
60+
hours_since_last_update = ((common.now() - last_updated).total_seconds() / 60 / 60)
6061
volume_config = domains_config.get_cwm_api_volume_config(worker_id=worker_id)
6162
disable_force_delete = volume_config.disable_force_delete
6263
disable_force_update = volume_config.disable_force_update
@@ -150,12 +151,14 @@ def get_instances_updates(domains_config: DomainsConfig, cwm_api_manager: CwmApi
150151

151152
def update(namespace_name, last_updated, status, revision,
152153
worker_id, instance_update, start_time,
153-
cwm_api_manager=None, domains_config=None, updater_metrics=None):
154+
cwm_api_manager=None, domains_config=None, updater_metrics=None, deployments_manager=None):
154155
if not domains_config:
155156
domains_config = DomainsConfig()
156157
if not cwm_api_manager:
157158
cwm_api_manager = CwmApiManager()
158-
last_updated = get_datetime_object(last_updated)
159+
if not deployments_manager:
160+
deployments_manager = DeploymentsManager()
161+
last_updated = get_datetime_object(last_updated) if last_updated else None
159162
start_time = get_datetime_object(start_time)
160163
check_update_release(domains_config, updater_metrics, namespace_name, last_updated, status, revision,
161164
instance_update, worker_id, start_time)
@@ -196,19 +199,27 @@ def run_single_iteration(domains_config, metrics, deployments_manager, cwm_api_m
196199
multiprocessor = UpdaterMultiprocessor(config.UPDATER_MAX_PARALLEL_DEPLOY_PROCESSES if is_async else 1)
197200
updater_metrics = metrics
198201
instances_updates = get_instances_updates(domains_config, cwm_api_manager)
199-
all_releases = {release["namespace"]: release for release in deployments_manager.iterate_all_releases()}
200-
for release in all_releases.values():
201-
namespace_name = release["namespace"]
202-
datestr, timestr, *_ = release["updated"].split(" ")
203-
last_updated = common.strptime("{}T{}".format(datestr, timestr.split(".")[0]), "%Y-%m-%dT%H:%M:%S")
204-
status = release["status"]
205-
# app_version = release["app_version"]
206-
revision = int(release["revision"])
207-
start_time = common.now()
208-
worker_id = common.get_worker_id_from_namespace_name(namespace_name)
209-
instance_update = instances_updates.get(worker_id)
210-
multiprocessor.process(domains_config, updater_metrics, namespace_name, last_updated, status, revision,
211-
worker_id, instance_update, start_time, cwm_api_manager)
202+
updated_instances = set()
203+
# all_releases = {release["namespace"]: release for release in deployments_manager.iterate_all_releases()}
204+
# for release in all_releases.values():
205+
# updated_instances.add(namespace_name)
206+
# namespace_name = release["namespace"]
207+
# datestr, timestr, *_ = release["updated"].split(" ")
208+
# last_updated = common.strptime("{}T{}".format(datestr, timestr.split(".")[0]), "%Y-%m-%dT%H:%M:%S")
209+
# status = release["status"]
210+
# # app_version = release["app_version"]
211+
# revision = int(release["revision"])
212+
# start_time = common.now()
213+
# worker_id = common.get_worker_id_from_namespace_name(namespace_name)
214+
# instance_update = instances_updates.get(worker_id)
215+
# multiprocessor.process(domains_config, updater_metrics, namespace_name, last_updated, status, revision,
216+
# worker_id, instance_update, start_time, cwm_api_manager)
217+
for namespace_name, operation in instances_updates.items():
218+
if operation == 'update' and namespace_name not in updated_instances:
219+
start_time = common.now()
220+
worker_id = common.get_worker_id_from_namespace_name(namespace_name)
221+
multiprocessor.process(domains_config, updater_metrics, namespace_name, None, None, None,
222+
worker_id, 'create', start_time, cwm_api_manager)
212223
multiprocessor.finalize()
213224

214225

cwm_worker_operator/waiter.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,22 @@
1717

1818

1919
def _check_for_deployment_complete(domains_config, deployments_manager, waiter_metrics, start_time, log_kwargs, namespace_name, flow_manager, worker_id, volume_config):
20-
has_hostnames_without_cert_but_with_challenge = False
21-
check_hostname_challenge = None
22-
for hostname in volume_config.hostnames:
23-
if hostname not in volume_config.hostname_certs and hostname in volume_config.hostname_challenges:
24-
has_hostnames_without_cert_but_with_challenge = True
25-
check_hostname_challenge = {
26-
'host': hostname,
27-
**volume_config.hostname_challenges[hostname]
28-
}
29-
break
30-
if deployments_manager.is_ready(namespace_name, "minio", minimal_check=has_hostnames_without_cert_but_with_challenge):
31-
internal_hostname = deployments_manager.get_hostname(namespace_name, "minio")
20+
# has_hostnames_without_cert_but_with_challenge = False
21+
# check_hostname_challenge = None
22+
# for hostname in volume_config.hostnames:
23+
# if hostname not in volume_config.hostname_certs and hostname in volume_config.hostname_challenges:
24+
# has_hostnames_without_cert_but_with_challenge = True
25+
# check_hostname_challenge = {
26+
# 'host': hostname,
27+
# **volume_config.hostname_challenges[hostname]
28+
# }
29+
# break
30+
if deployments_manager.is_ready(namespace_name, "minio"): # , minimal_check=has_hostnames_without_cert_but_with_challenge):
31+
# internal_hostname = deployments_manager.get_hostname(namespace_name, "minio")
32+
internal_hostname = {'http': '--', 'https': '--'}
3233
ok = True
33-
if config.WAITER_VERIFY_WORKER_ACCESS:
34-
ok = deployments_manager.verify_worker_access(internal_hostname, log_kwargs, check_hostname_challenge=check_hostname_challenge)
34+
# if config.WAITER_VERIFY_WORKER_ACCESS:
35+
# ok = deployments_manager.verify_worker_access(internal_hostname, log_kwargs, check_hostname_challenge=check_hostname_challenge)
3536
if ok:
3637
flow_manager.set_worker_available(worker_id, internal_hostname)
3738
if waiter_metrics:

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ click==7.1.2
77
uvicorn[standard]==0.27.0.post1
88
fastapi==0.109.1
99
gunicorn==21.2.0
10+
minio==7.2.3

0 commit comments

Comments
 (0)