Skip to content

Commit 44eb993

Browse files
zfrendoiSecloud
authored andcommitted
feat: restoreFlow流程代码结构优化 #11975
1 parent 91b2392 commit 44eb993

17 files changed

+427
-450
lines changed

dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py renamed to dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_remote_sub_flow.py

Lines changed: 212 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
specific language governing permissions and limitations under the License.
1010
"""
1111
import copy
12-
import logging
13-
import os.path
12+
import logging.config
13+
import os
1414
from dataclasses import asdict
1515
from datetime import datetime
1616

@@ -36,12 +36,218 @@
3636
ExecActuatorKwargs,
3737
)
3838
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
39+
from backend.flow.utils.spider.tendb_cluster_info import get_remotedb_info
3940

4041
logger = logging.getLogger("flow")
4142

4243

44+
def remote_instance_migrate_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
45+
"""
46+
定义 tendbHa tendbCluster remote节点 主从迁移数据恢复 流程。实例级别迁移,使用远程备份文件 (只做流程,元数据请在主流程控制)
47+
@param root_id: flow流程的root_id
48+
@param ticket_data: 关联单据 ticket对象
49+
@param cluster_info: 关联的cluster对象
50+
"""
51+
52+
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)
53+
# 已经安装好的2个ip,需要导入同步数据
54+
# 下发dbactor》通过master/slave 获取备份的文件》判断备份文件》恢复数据》change master
55+
cluster = {
56+
"cluster_id": cluster_info["cluster_id"],
57+
"master_ip": cluster_info["master_ip"],
58+
"slave_ip": cluster_info["slave_ip"],
59+
"master_port": cluster_info["master_port"],
60+
"new_master_ip": cluster_info["new_master_ip"],
61+
"new_slave_ip": cluster_info["new_slave_ip"],
62+
"new_slave_port": cluster_info["new_slave_port"],
63+
"new_master_port": cluster_info["new_master_port"],
64+
"bk_cloud_id": cluster_info["bk_cloud_id"],
65+
"file_target_path": cluster_info["file_target_path"],
66+
"change_master_force": cluster_info["change_master_force"],
67+
"backupinfo": cluster_info["backupinfo"],
68+
"charset": cluster_info["charset"],
69+
}
70+
exec_act_kwargs = ExecActuatorKwargs(
71+
bk_cloud_id=int(cluster["bk_cloud_id"]), cluster_type=ClusterType.TenDBCluster, cluster=cluster
72+
)
73+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.mysql_mkdir_dir.__name__
74+
exec_act_kwargs.exec_ip = [cluster["new_slave_ip"], cluster["new_master_ip"]]
75+
sub_pipeline.add_act(
76+
act_name=_("创建目录 {}".format(cluster["file_target_path"])),
77+
act_component_code=ExecuteDBActuatorScriptComponent.code,
78+
kwargs=asdict(exec_act_kwargs),
79+
)
80+
81+
sub_pipeline.add_act(
82+
act_name=_("下发db-actor到节点"),
83+
act_component_code=TransFileComponent.code,
84+
kwargs=asdict(
85+
DownloadMediaKwargs(
86+
bk_cloud_id=int(cluster["bk_cloud_id"]),
87+
exec_ip=[cluster["master_ip"], cluster["new_slave_ip"], cluster["new_master_ip"]],
88+
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
89+
)
90+
),
91+
)
92+
93+
backup_info = cluster["backupinfo"]
94+
# 主从并发下载备份介质 下载为异步下载,定时调起接口扫描下载结果
95+
task_ids = [i["task_id"] for i in backup_info["file_list_details"]]
96+
download_sub_pipeline_list = []
97+
download_kwargs = DownloadBackupFileKwargs(
98+
bk_cloud_id=cluster["bk_cloud_id"],
99+
task_ids=task_ids,
100+
dest_ip=cluster["new_master_ip"],
101+
dest_dir=cluster["file_target_path"],
102+
reason="spider remote node sync data",
103+
)
104+
download_sub_pipeline_list.append(
105+
{
106+
"act_name": _("下载全库备份介质到 {}".format(cluster["new_master_ip"])),
107+
"act_component_code": MySQLDownloadBackupfileComponent.code,
108+
"kwargs": asdict(download_kwargs),
109+
}
110+
)
111+
download_kwargs.dest_ip = cluster["new_slave_ip"]
112+
download_sub_pipeline_list.append(
113+
{
114+
"act_name": _("下载全库备份介质到 {}".format(cluster["new_slave_ip"])),
115+
"act_component_code": MySQLDownloadBackupfileComponent.code,
116+
"kwargs": asdict(download_kwargs),
117+
}
118+
)
119+
sub_pipeline.add_parallel_acts(download_sub_pipeline_list)
120+
121+
# 阶段4 恢复数据remote主从节点的数据
122+
restore_list = []
123+
cluster["restore_ip"] = cluster["new_master_ip"]
124+
cluster["restore_port"] = cluster["new_master_port"]
125+
cluster["source_ip"] = cluster["master_ip"]
126+
cluster["source_port"] = cluster["master_port"]
127+
cluster["change_master"] = False
128+
exec_act_kwargs.exec_ip = cluster["new_master_ip"]
129+
exec_act_kwargs.job_timeout = MYSQL_DATA_RESTORE_TIME
130+
exec_act_kwargs.cluster = copy.deepcopy(cluster)
131+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_restore_remotedb_payload.__name__
132+
restore_list.append(
133+
{
134+
"act_name": _("恢复新主节点数据 {}:{}".format(exec_act_kwargs.exec_ip, cluster["restore_port"])),
135+
"act_component_code": ExecuteDBActuatorScriptComponent.code,
136+
"kwargs": asdict(exec_act_kwargs),
137+
"write_payload_var": "change_master_info",
138+
}
139+
)
140+
141+
cluster["restore_ip"] = cluster["new_slave_ip"]
142+
cluster["restore_port"] = cluster["new_slave_port"]
143+
cluster["source_ip"] = cluster["master_ip"]
144+
cluster["source_port"] = cluster["master_port"]
145+
cluster["change_master"] = False
146+
exec_act_kwargs.cluster = copy.deepcopy(cluster)
147+
exec_act_kwargs.exec_ip = cluster["new_slave_ip"]
148+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_restore_remotedb_payload.__name__
149+
restore_list.append(
150+
{
151+
"act_name": _("恢复新从节点数据 {}:{}".format(exec_act_kwargs.exec_ip, cluster["restore_port"])),
152+
"act_component_code": ExecuteDBActuatorScriptComponent.code,
153+
"kwargs": asdict(exec_act_kwargs),
154+
}
155+
)
156+
sub_pipeline.add_parallel_acts(acts_list=restore_list)
157+
158+
# 阶段5 change master: 新从库指向新主库
159+
cluster["target_ip"] = cluster["new_master_ip"]
160+
cluster["target_port"] = cluster["new_master_port"]
161+
cluster["repl_ip"] = cluster["new_slave_ip"]
162+
exec_act_kwargs.cluster = copy.deepcopy(cluster)
163+
exec_act_kwargs.exec_ip = cluster["new_master_ip"]
164+
exec_act_kwargs.job_timeout = MYSQL_USUAL_JOB_TIME
165+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_grant_remotedb_repl_user.__name__
166+
sub_pipeline.add_act(
167+
act_name=_("新增repl帐户{}".format(exec_act_kwargs.exec_ip)),
168+
act_component_code=ExecuteDBActuatorScriptComponent.code,
169+
kwargs=asdict(exec_act_kwargs),
170+
write_payload_var="show_master_status_info",
171+
)
172+
173+
cluster["repl_ip"] = cluster["new_slave_ip"]
174+
cluster["repl_port"] = cluster["new_slave_port"]
175+
cluster["target_ip"] = cluster["new_master_ip"]
176+
cluster["target_port"] = cluster["new_master_port"]
177+
cluster["change_master_type"] = MysqlChangeMasterType.MASTERSTATUS.value
178+
exec_act_kwargs.cluster = copy.deepcopy(cluster)
179+
exec_act_kwargs.exec_ip = cluster["new_slave_ip"]
180+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_remotedb_change_master.__name__
181+
sub_pipeline.add_act(
182+
act_name=_("建立主从关系:新从库指向新主库 {} {}:".format(exec_act_kwargs.exec_ip, cluster["repl_port"])),
183+
act_component_code=ExecuteDBActuatorScriptComponent.code,
184+
kwargs=asdict(exec_act_kwargs),
185+
)
186+
187+
# 阶段6 change master: 新主库指向旧主库
188+
cluster["target_ip"] = cluster["master_ip"]
189+
cluster["target_port"] = cluster["master_port"]
190+
cluster["repl_ip"] = cluster["new_master_ip"]
191+
exec_act_kwargs.cluster = copy.deepcopy(cluster)
192+
exec_act_kwargs.exec_ip = cluster["master_ip"]
193+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_grant_remotedb_repl_user.__name__
194+
sub_pipeline.add_act(
195+
act_name=_("新增repl帐户{}".format(exec_act_kwargs.exec_ip)),
196+
act_component_code=ExecuteDBActuatorScriptComponent.code,
197+
kwargs=asdict(exec_act_kwargs),
198+
)
199+
200+
cluster["repl_ip"] = cluster["new_master_ip"]
201+
cluster["repl_port"] = cluster["new_master_port"]
202+
cluster["target_ip"] = cluster["master_ip"]
203+
cluster["target_port"] = cluster["master_port"]
204+
cluster["change_master_type"] = MysqlChangeMasterType.BACKUPFILE.value
205+
exec_act_kwargs.cluster = copy.deepcopy(cluster)
206+
exec_act_kwargs.exec_ip = cluster["new_master_ip"]
207+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_remotedb_change_master.__name__
208+
sub_pipeline.add_act(
209+
act_name=_("建立主从关系:新主库指向旧主库 {}:{}".format(exec_act_kwargs.exec_ip, cluster["repl_port"])),
210+
act_component_code=ExecuteDBActuatorScriptComponent.code,
211+
kwargs=asdict(exec_act_kwargs),
212+
)
213+
return sub_pipeline.build_sub_process(sub_name=_("RemoteDB主从节点成对迁移子流程{}".format(exec_act_kwargs.exec_ip)))
214+
215+
216+
def remote_node_uninstall_sub_flow(root_id: str, ticket_data: dict, ip: str):
217+
"""
218+
卸载remotedb 指定ip节点下的所有实例
219+
@param root_id: flow流程的root_id
220+
@param ticket_data: 单据 data 对象
221+
@param ip: 指定卸载的ip
222+
"""
223+
sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data)
224+
cluster = {"uninstall_ip": ip, "bk_cloud_id": ticket_data["bk_cloud_id"]}
225+
instances = get_remotedb_info(cluster["uninstall_ip"], cluster["bk_cloud_id"])
226+
sub_pipeline_list = []
227+
for instance in instances:
228+
cluster["backend_port"] = instance["port"]
229+
sub_pipeline_list.append(
230+
{
231+
"act_name": _("卸载MySQL实例:{}:{}".format(cluster["uninstall_ip"], cluster["backend_port"])),
232+
"act_component_code": ExecuteDBActuatorScriptComponent.code,
233+
"kwargs": asdict(
234+
ExecActuatorKwargs(
235+
exec_ip=cluster["uninstall_ip"],
236+
bk_cloud_id=cluster["bk_cloud_id"],
237+
cluster=cluster,
238+
get_mysql_payload_func=MysqlActPayload.get_uninstall_mysql_payload.__name__,
239+
)
240+
),
241+
}
242+
)
243+
244+
sub_pipeline.add_parallel_acts(acts_list=sub_pipeline_list)
245+
return sub_pipeline.build_sub_process(sub_name=_("Remote node {} 卸载整机实例".format(cluster["uninstall_ip"])))
246+
247+
43248
def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
44249
"""
250+
定义 tendbHa tendbCluster 从节点恢复流程,从远程备份下载备份文件.
45251
tendb remote slave 节点 恢复。(只做流程,元数据请在主流程控制)
46252
@param root_id: flow流程的root_id
47253
@param ticket_data: 关联单据 ticket对象
@@ -175,6 +381,8 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
175381

176382
def priv_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict, ips: list):
177383
"""
384+
定义 tendbHa 从集群备份的主从节点恢复权限。这里主要用于恢复从节点权限,补充主从权限差异时slave重建直接从
385+
主节点克隆权限可能有权限不全的问题。
178386
tendb privilege recover 指定实例权限恢复。
179387
@param root_id: flow流程的root_id
180388
@param ticket_data: 关联单据 ticket对象
@@ -195,6 +403,7 @@ def priv_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict, i
195403

196404
priv_file_task_ids = [file["task_id"] for file in backup_info["file_list"].values()]
197405
priv_files = [os.path.basename(file["file_name"]) for file in backup_info["file_list"].values()]
406+
logger.info(priv_files)
198407
storages = cluster_model.storageinstance_set.filter(machine__ip__in=ips)
199408
priv_sub_pipeline_list = []
200409
for storage in storages:
@@ -246,7 +455,7 @@ def priv_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict, i
246455

247456
if len(priv_sub_pipeline_list) > 0:
248457
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=priv_sub_pipeline_list)
249-
return sub_pipeline.build_sub_process(sub_name=_("{}恢复权限".format(cluster_model.id)))
458+
return sub_pipeline.build_sub_process(sub_name=_("集群{}恢复权限".format(cluster_model.id)))
250459
else:
251460
return None
252461
# 如果流程未空,如何?

0 commit comments

Comments
 (0)