-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathrouting.py
465 lines (400 loc) · 19.4 KB
/
routing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# Author: Felix Fontein <[email protected]>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or
# https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
# SPDX-FileCopyrightText: 2021, Ansible Project
"""
Functions for parsing and interpreting collection metadata.
"""
from __future__ import annotations
import asyncio
import datetime
import os
import typing as t
from collections import defaultdict
from collections.abc import Mapping, MutableMapping, Sequence
import asyncio_pool # type: ignore[import]
from antsibull_core import app_context, yaml
from antsibull_core.utils.collections import compare_all_but
from ..constants import DOCUMENTABLE_PLUGINS
from ..utils.get_pkg_data import get_antsibull_data
from . import AnsibleCollectionMetadata
from .fqcn import get_fqcn_parts
# A nested structure as follows:
# plugin_type:
# plugin_name: # FQCN
# tombstone: t.Optional[{tombstone record}]
# deprecation: t.Optional[{deprecation record}]
# redirect: t.Optional[str]
# redirect_is_symlink: t.Optional[bool]
CollectionRoutingT = Mapping[str, Mapping[str, Mapping[str, t.Any]]]
MutableCollectionRoutingT = MutableMapping[
str, MutableMapping[str, MutableMapping[str, t.Any]]
]
COLLECTIONS_WITH_FLATMAPPING = (
"community.general",
"community.network",
)
def calculate_plugin_fqcns(
collection_name: str, src_basename: str, dst_basename: str, rel_path: str
) -> tuple[str, str]:
"""
Calculate source and dest FQCNs for plugins which have been renamed via symlink.
:collection_name: FQCN of the collection
:src_basename: Alias name for the plugin
:dst_basename: Canonical name of the plugin
:rel_path: Filesystem path to the directory that the plugin lives in relative to the plugin
type directory.
:returns: Two-tuple of fqcn of the alias name of the plugin and fqcn of the canonical
plugin name.
"""
src_components = os.path.normpath(os.path.join(rel_path, src_basename)).split(
os.sep
)
dest_components = os.path.normpath(os.path.join(rel_path, dst_basename)).split(
os.sep
)
# Compose source FQCN
src_name = ".".join(src_components)
src_fqcn = f"{collection_name}.{src_name}"
# Compose destination FQCN
dst_name = ".".join(dest_components)
dst_fqcn = f"{collection_name}.{dst_name}"
return src_fqcn, dst_fqcn
def find_symlink_redirects(
collection_name: str, plugin_type: str, directory_path: str
) -> dict[str, str]:
"""
Finds plugin redirects that are defined by symlinks.
:collection_name: FQCN of the collection we're searching within
:plugin_type: Type of plugins that we're scanning
:directory_path: Full path to the directory we're scanning.
:returns: Dict mapping fqcn of the alias names to fqcn of the canonical plugin names.
"""
plugin_type_routing = {}
if os.path.isdir(directory_path):
for path, dummy, files in os.walk(directory_path):
rel_path = os.path.relpath(path, directory_path)
for filename in files:
src_basename, ext = os.path.splitext(filename)
if ext != ".py" and not (plugin_type == "module" and ext == ".ps1"):
continue
file_path = os.path.join(path, filename)
if not os.path.islink(file_path):
continue
dst_basename = os.path.splitext(os.readlink(file_path))[0]
src_fqcn, dst_fqcn = calculate_plugin_fqcns(
collection_name, src_basename, dst_basename, rel_path
)
plugin_type_routing[src_fqcn] = dst_fqcn
return plugin_type_routing
def process_dates(plugin_record: dict[str, t.Any]) -> dict[str, t.Any]:
for tlkey in ("tombstone", "deprecation"):
if tlkey in plugin_record and "removal_date" in plugin_record[tlkey]:
date = plugin_record[tlkey]["removal_date"]
if isinstance(date, datetime.datetime):
date = date.date()
if isinstance(date, datetime.date):
date = date.isoformat()
plugin_record[tlkey]["removal_date"] = date
return plugin_record
def find_flatmapping_short_long_maps(
plugin_routing_type: dict[str, dict[str, t.Any]],
) -> tuple[Mapping[str, str], Mapping[str, tuple[str, bool]]]:
"""
Collect all short and long names, and mappings between them.
Short names are FQCN like community.general.rax_facts, and long names are FQCN like
community.general.cloud.rackspace.rax_facts.
Returns two tuples. The first element maps short names to long names. The second element
maps long names to pairs (short name, is symbolic link).
"""
shortname_to_longname: dict[str, str] = {}
longname_to_shortname: dict[str, tuple[str, bool]] = {}
for plugin_name, routing_data in plugin_routing_type.items():
coll_ns, coll_name, plug_name = get_fqcn_parts(plugin_name)
if "tombstone" not in routing_data and "redirect" in routing_data:
redirect = routing_data["redirect"]
redir_coll_ns, redir_coll_name, redir_plug_name = get_fqcn_parts(redirect)
if coll_ns == redir_coll_ns and coll_name == redir_coll_name:
if "." not in plug_name and redir_plug_name.endswith(f".{plug_name}"):
is_symlink = routing_data.get("redirect_is_symlink") or False
shortname_to_longname[plugin_name] = redirect
longname_to_shortname[redirect] = (plugin_name, is_symlink)
elif "." in plug_name:
# Sometimes plugins/modules/foo_facts.py could be a link to
# plugins/modules/subdir1/subdir2/foo_info.py, and
# plugins/modules/subdir1/subdir2/foo_facts.py also links to the same
# _info module. In that case, artificially construct the shortname
# <-> longname mapping
dummy, short_name = plug_name.rsplit(".", 1)
short_fqcn = f"{coll_ns}.{coll_name}.{short_name}"
if (
plugin_routing_type.get(short_fqcn, {}).get("redirect")
== redirect
):
shortname_to_longname[short_fqcn] = plugin_name
longname_to_shortname[plugin_name] = (short_fqcn, False)
return shortname_to_longname, longname_to_shortname
def remove_flatmapping_artifacts(
plugin_routing: dict[str, dict[str, dict[str, t.Any]]]
) -> None:
"""
For collections which use flatmapping (like community.general and community.network),
there will be several redirects which look confusing, like the community.general.rax_facts
module redirects to community.general.cloud.rackspace.rax_facts, which in turn redirects to
community.general.cloud.rackspace.rax_info. Such redirects are condensed by this function
into one redirect from community.general.rax_facts to community.general.rax_info.
"""
for _, plugin_routing_type in plugin_routing.items():
# First collect all short and long names.
shortname_to_longname, longname_to_shortname = find_flatmapping_short_long_maps(
plugin_routing_type
)
# Now shorten redirects
for plugin_name, routing_data in list(plugin_routing_type.items()):
if "redirect" in routing_data:
redirect = routing_data["redirect"]
if shortname_to_longname.get(plugin_name) == redirect:
routing_data.pop("redirect")
routing_data.pop("redirect_is_symlink", None)
if (
"deprecation" not in routing_data
and "tombstone" not in routing_data
):
plugin_routing_type.pop(plugin_name, None)
elif redirect in longname_to_shortname:
routing_data["redirect"], is_symlink = longname_to_shortname[
redirect
]
if routing_data.get("redirect_is_symlink") and not is_symlink:
routing_data.pop("redirect_is_symlink")
if plugin_name in longname_to_shortname:
if (
"tombstone" not in routing_data
and "deprecation" not in routing_data
):
plugin_routing_type.pop(plugin_name, None)
def load_meta_runtime(
collection_name: str, collection_metadata: AnsibleCollectionMetadata
) -> Mapping[str, t.Any]:
"""
Load meta/runtime.yml for collections, and ansible_builtin_runtime.yml for ansible-core.
Also extracts additional metadata stored in meta/runtime.yml, like requires_ansible,
and stores it in collection_metadata
"""
if collection_name == "ansible.builtin":
meta_runtime_path = os.path.join(
collection_metadata.path, "config", "ansible_builtin_runtime.yml"
)
else:
meta_runtime_path = os.path.join(
collection_metadata.path, "meta", "runtime.yml"
)
if os.path.exists(meta_runtime_path):
meta_runtime = yaml.load_yaml_file(meta_runtime_path)
else:
meta_runtime = {}
if collection_name != "ansible.builtin":
requires_ansible = meta_runtime.get("requires_ansible")
if isinstance(requires_ansible, str):
collection_metadata.requires_ansible = requires_ansible
return meta_runtime
def _add_symlink_redirects(
collection_name: str,
collection_metadata: AnsibleCollectionMetadata,
plugin_routing_out: dict[str, dict[str, dict[str, t.Any]]],
) -> None:
for plugin_type in DOCUMENTABLE_PLUGINS:
directory_name = "modules" if plugin_type == "module" else plugin_type
directory_path = os.path.join(
collection_metadata.path, "plugins", directory_name
)
plugin_type_routing = plugin_routing_out[plugin_type]
symlink_redirects = find_symlink_redirects(
collection_name, plugin_type, directory_path
)
for redirect_name, redirect_dst in symlink_redirects.items():
if redirect_name not in plugin_type_routing:
plugin_type_routing[redirect_name] = {}
if "redirect" not in plugin_type_routing[redirect_name]:
plugin_type_routing[redirect_name]["redirect"] = redirect_dst
if plugin_type_routing[redirect_name]["redirect"] == redirect_dst:
plugin_type_routing[redirect_name]["redirect_is_symlink"] = True
def _add_core_symlink_redirects(
collection_metadata: AnsibleCollectionMetadata,
plugin_routing_out: dict[str, dict[str, dict[str, t.Any]]],
) -> None:
for plugin_type in DOCUMENTABLE_PLUGINS:
directory_name = (
"modules"
if plugin_type == "module"
else os.path.join("plugins", plugin_type)
)
directory_path = os.path.join(collection_metadata.path, directory_name)
plugin_type_routing = plugin_routing_out[plugin_type]
symlink_redirects = find_symlink_redirects(
"ansible.builtin", plugin_type, directory_path
)
for redirect_name, redirect_dst in symlink_redirects.items():
if redirect_name not in plugin_type_routing:
plugin_type_routing[redirect_name] = {}
if "redirect" not in plugin_type_routing[redirect_name]:
plugin_type_routing[redirect_name]["redirect"] = redirect_dst
async def load_collection_routing(
collection_name: str, collection_metadata: AnsibleCollectionMetadata
) -> dict[str, dict[str, dict[str, t.Any]]]:
"""
Load plugin routing for a collection, and populate the private plugins lists
in collection metadata.
"""
meta_runtime = load_meta_runtime(collection_name, collection_metadata)
plugin_routing_out: dict[str, dict[str, dict[str, t.Any]]] = {}
plugin_routing_in = meta_runtime.get("plugin_routing") or {}
private_plugins: dict[str, list[str]] = {}
collection_metadata.private_plugins = private_plugins
for plugin_type in DOCUMENTABLE_PLUGINS:
plugin_type_id = "modules" if plugin_type == "module" else plugin_type
plugin_type_routing = plugin_routing_in.get(plugin_type_id) or {}
plugin_routing_out[plugin_type] = {}
private_plugins[plugin_type] = []
for plugin_name, plugin_record in plugin_type_routing.items():
fqcn = f"{collection_name}.{plugin_name}"
plugin_routing_out[plugin_type][fqcn] = process_dates(plugin_record)
if plugin_record.get("private", False):
private_plugins[plugin_type].append(plugin_name)
if collection_name == "ansible.builtin":
# ansible-core has a special directory structure we currently do not want
# (or need) to handle
_add_core_symlink_redirects(collection_metadata, plugin_routing_out)
return plugin_routing_out
_add_symlink_redirects(collection_name, collection_metadata, plugin_routing_out)
if collection_name in COLLECTIONS_WITH_FLATMAPPING:
remove_flatmapping_artifacts(plugin_routing_out)
return plugin_routing_out
async def load_all_collection_routing(
collection_metadata: Mapping[str, AnsibleCollectionMetadata]
) -> MutableCollectionRoutingT:
# Collection
lib_ctx = app_context.lib_ctx.get()
async with asyncio_pool.AioPool(size=lib_ctx.thread_max) as pool:
requestors = []
for collection, metadata in collection_metadata.items():
requestors.append(
await pool.spawn(load_collection_routing(collection, metadata))
)
responses = await asyncio.gather(*requestors)
# Merge per-collection routing into one big routing table
global_plugin_routing: MutableCollectionRoutingT = {}
for plugin_type in DOCUMENTABLE_PLUGINS:
global_plugin_routing[plugin_type] = {}
for collection_plugin_routing in responses:
global_plugin_routing[plugin_type].update(
collection_plugin_routing[plugin_type]
)
return global_plugin_routing
def _add_meta_redirect_for_aliases(
plugin_map: MutableMapping[str, t.Any],
plugin_routing: MutableMapping[str, MutableMapping[str, t.Any]],
) -> None:
for plugin_name, plugin_record in plugin_map.items():
if isinstance(plugin_record.get("doc"), Mapping) and isinstance(
plugin_record["doc"].get("aliases"), Sequence
):
for alias in plugin_record["doc"]["aliases"]:
alias_fqcn = f"ansible.builtin.{alias}"
if alias_fqcn not in plugin_routing:
plugin_routing[alias_fqcn] = {}
if "redirect" not in plugin_routing[alias_fqcn]:
plugin_routing[alias_fqcn]["redirect"] = plugin_name
def _remove_redirect_duplicates(
plugin_map: MutableMapping[str, t.Any],
plugin_routing: MutableMapping[str, MutableMapping[str, t.Any]],
) -> None:
for plugin_name, plugin_record in list(plugin_map.items()):
if plugin_name in plugin_routing and "redirect" in plugin_routing[plugin_name]:
destination = plugin_routing[plugin_name]["redirect"]
if destination in plugin_map and destination != plugin_name:
# Heuristic: if we have a redirect, and docs for both this plugin and the
# redirected one are generated from the same plugin filename, then we can
# remove this plugin's docs and generate a redirect stub instead.
a = plugin_record.get("doc")
b = plugin_map[destination].get("doc")
if a and b and compare_all_but(a, b, ["filename"]):
del plugin_map[plugin_name]
def _remove_other_duplicates(
plugin_type: str,
plugin_map: MutableMapping[str, t.Any],
plugin_routing: MutableMapping[str, MutableMapping[str, t.Any]],
) -> None:
# In some cases, both the plugin and its aliases will be listed.
# Basically look for plugins whose name is wrong, a plugin with that
# name exists, and whose docs are identical.
for plugin_name, plugin_record in list(plugin_map.items()):
doc = plugin_record.get("doc") or {}
name = doc.get("module" if plugin_type == "module" else "name")
collection_name = ".".join(plugin_name.split(".")[:2])
full_name = f"{collection_name}.{name}"
if full_name and full_name != plugin_name and full_name in plugin_map:
a = plugin_record.get("doc")
b = plugin_map[full_name].get("doc")
if a and b and compare_all_but(a, b, ["name", "filename"]):
del plugin_map[plugin_name]
if plugin_name not in plugin_routing:
plugin_routing[plugin_name] = {}
plugin_routing[plugin_name]["redirect"] = full_name
def remove_redirect_duplicates(
plugin_info: MutableMapping[str, MutableMapping[str, t.Any]],
collection_routing: MutableCollectionRoutingT,
) -> None:
"""
Remove duplicate plugin docs that come from symlinks (or once ansible-docs supports them,
other plugin routing redirects).
"""
for plugin_type, plugin_map in plugin_info.items():
plugin_routing = collection_routing[plugin_type]
_add_meta_redirect_for_aliases(plugin_map, plugin_routing)
_remove_redirect_duplicates(plugin_map, plugin_routing)
_remove_other_duplicates(plugin_type, plugin_map, plugin_routing)
def find_stubs(
plugin_info: MutableMapping[str, MutableMapping[str, t.Any]],
collection_routing: CollectionRoutingT,
) -> defaultdict[str, defaultdict[str, dict[str, t.Any]]]:
"""
Find plugin stubs to write. Returns a nested structure:
collection:
plugin_type:
plugin_short_name:
tombstone: t.Optional[{tombstone record}]
deprecation: t.Optional[{deprecation record}]
redirect: t.Optional[str]
redirect_is_symlink: t.Optional[bool]
"""
stubs_info: defaultdict[str, defaultdict[str, dict[str, t.Any]]] = defaultdict(
lambda: defaultdict(dict)
)
for plugin_type, plugin_routing in collection_routing.items():
plugin_info_type = plugin_info.get(plugin_type) or {}
for plugin_name, plugin_data in plugin_routing.items():
if "tombstone" not in plugin_data and "redirect" not in plugin_data:
# Ignore pure deprecations
continue
if plugin_name not in plugin_info_type:
coll_ns, coll_name, plug_name = get_fqcn_parts(plugin_name)
stubs_info[f"{coll_ns}.{coll_name}"][plugin_type][
plug_name
] = plugin_data
if "ansible.builtin" in stubs_info:
# Remove all redirects from the ansible-base 2.10 -> collections move.
# We do not want stub pages for all these thousands of plugins/modules.
ansible_2_10_routing = yaml.load_yaml_bytes(
get_antsibull_data("ansible_2_10_routing.yml")
)
for plugin_type, plugins in ansible_2_10_routing[
"ansible_2_10_routing"
].items():
if plugin_type in stubs_info["ansible.builtin"]:
for plugin_short_name in plugins:
stubs_info["ansible.builtin"][plugin_type].pop(
plugin_short_name, None
)
return stubs_info