|  | 
| 10 | 10 | """ | 
| 11 | 11 | import logging.config | 
| 12 | 12 | from collections import defaultdict | 
|  | 13 | +from copy import deepcopy | 
| 13 | 14 | from dataclasses import asdict | 
| 14 | 15 | from typing import Dict, Optional | 
| 15 | 16 | 
 | 
| @@ -98,6 +99,9 @@ def __pre_check( | 
| 98 | 99 |         else: | 
| 99 | 100 |             if len(master_ips) != group_num: | 
| 100 | 101 |                 raise Exception("new machine num != group_num.") | 
|  | 102 | +        # shard_num 必须大于 group_num | 
|  | 103 | +        if new_shard_num < group_num: | 
|  | 104 | +            raise Exception("shard_num:{} < group_num:{}".format(new_shard_num, group_num)) | 
| 101 | 105 |         if old_shard_num != new_shard_num: | 
| 102 | 106 |             raise Exception("old_shard_num {} != new_shard_num {}.".format(old_shard_num, new_shard_num)) | 
| 103 | 107 |         # 2024-05 要求可以不用整除,允许机器上的实例有多有少 | 
| @@ -401,6 +405,42 @@ def get_redis_install_sub_pipelines( | 
| 401 | 405 |             redis_install_sub_pipelines.append(RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)) | 
| 402 | 406 |         return redis_install_sub_pipelines | 
| 403 | 407 | 
 | 
|  | 408 | +    # > 4.0 版本需要重做一下slave,否则可能会丢数据 | 
|  | 409 | +    # rediscluster架构需要特殊考虑 | 
|  | 410 | +    def redis_local_redo_dr(self, act_kwargs, sync_relations) -> list: | 
|  | 411 | +        sub_pipelines, resync_args = [], deepcopy(act_kwargs) | 
|  | 412 | +        resync_args.cluster = { | 
|  | 413 | +            "bk_biz_id": int(act_kwargs.cluster["bk_biz_id"]), | 
|  | 414 | +            "cluster_id": int(act_kwargs.cluster["cluster_id"]), | 
|  | 415 | +            "cluster_type": act_kwargs.cluster["cluster_type"], | 
|  | 416 | +            "immute_domain": act_kwargs.cluster["immute_domain"], | 
|  | 417 | +            "bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]), | 
|  | 418 | +            "instances": [], | 
|  | 419 | +        } | 
|  | 420 | +        for sync_link in sync_relations: | 
|  | 421 | +            one_resync_args = deepcopy(resync_args) | 
|  | 422 | +            new_master, new_slave = sync_link["sync_dst1"], sync_link["sync_dst2"] | 
|  | 423 | +            for instances in sync_link["ins_link"]: | 
|  | 424 | +                master_port, slave_port = instances["sync_dst1"], instances["sync_dst2"] | 
|  | 425 | +                one_resync_args.cluster["instances"].append( | 
|  | 426 | +                    { | 
|  | 427 | +                        "master_ip": new_master, | 
|  | 428 | +                        "master_port": int(master_port), | 
|  | 429 | +                        "slave_ip": new_slave, | 
|  | 430 | +                        "slave_port": int(slave_port), | 
|  | 431 | +                    } | 
|  | 432 | +                ) | 
|  | 433 | +            one_resync_args.exec_ip = new_slave | 
|  | 434 | +            one_resync_args.get_redis_payload_func = RedisActPayload.redis_local_redo_dr.__name__ | 
|  | 435 | +            sub_pipelines.append( | 
|  | 436 | +                { | 
|  | 437 | +                    "act_name": _("{}-本地重建Slave").format(new_slave), | 
|  | 438 | +                    "act_component_code": ExecuteDBActuatorScriptComponent.code, | 
|  | 439 | +                    "kwargs": asdict(one_resync_args), | 
|  | 440 | +                } | 
|  | 441 | +            ) | 
|  | 442 | +        return sub_pipelines | 
|  | 443 | + | 
| 404 | 444 |     def redis_backend_scale_flow(self): | 
| 405 | 445 |         """ | 
| 406 | 446 |         redis 扩缩容流程: | 
| @@ -549,6 +589,18 @@ def redis_backend_scale_flow(self): | 
| 549 | 589 |                         kwargs={**asdict(act_kwargs), **asdict(dns_kwargs)}, | 
| 550 | 590 |                     ) | 
| 551 | 591 | 
 | 
|  | 592 | +            # > 4.0 版本需要重做一下slave,否则可能会丢数据 | 
|  | 593 | +            big_version = int(str.split(act_kwargs.cluster["db_version"], "-")[1]) | 
|  | 594 | +            if ( | 
|  | 595 | +                act_kwargs.cluster["cluster_type"] | 
|  | 596 | +                in [ | 
|  | 597 | +                    ClusterType.TendisRedisInstance.value, | 
|  | 598 | +                    ClusterType.TendisTwemproxyRedisInstance.value, | 
|  | 599 | +                ] | 
|  | 600 | +                and big_version >= 4 | 
|  | 601 | +            ): | 
|  | 602 | +                sub_pipeline.add_parallel_acts(acts_list=self.redis_local_redo_dr(act_kwargs, sync_relations)) | 
|  | 603 | + | 
| 552 | 604 |             sub_pipeline.add_act(act_name=_("Redis-人工确认"), act_component_code=PauseComponent.code, kwargs={}) | 
| 553 | 605 | 
 | 
| 554 | 606 |             # 删除老实例的nodes域名 | 
|  | 
0 commit comments