|
4 | 4 | import typing as t
|
5 | 5 |
|
6 | 6 | from ddtrace.internal.injection import HookType
|
| 7 | +from ddtrace.internal.logger import get_logger |
7 | 8 | from ddtrace.internal.test_visibility.coverage_lines import CoverageLines
|
8 | 9 |
|
9 | 10 |
|
| 11 | +log = get_logger(__name__) |
| 12 | + |
| 13 | + |
10 | 14 | # This is primarily to make mypy happy without having to nest the rest of this module behind a version check
|
11 | 15 | assert sys.version_info >= (3, 12) # nosec
|
12 | 16 |
|
|
17 | 21 | RETURN_CONST = dis.opmap["RETURN_CONST"]
|
18 | 22 | EMPTY_MODULE_BYTES = bytes([RESUME, 0, RETURN_CONST, 0])
|
19 | 23 |
|
20 |
| -# Register the coverage tool with the low-impact monitoring system |
21 |
| -try: |
22 |
| - sys.monitoring.use_tool_id(sys.monitoring.COVERAGE_ID, "datadog") # noqa |
23 |
| -except ValueError: |
24 |
| - # TODO: Another coverage tool is already in use. Either warn the user |
25 |
| - # or free the tool and register ours. |
26 |
| - def instrument_all_lines( |
27 |
| - code: CodeType, hook: HookType, path: str, package: str |
28 |
| - ) -> t.Tuple[CodeType, CoverageLines]: |
29 |
| - # No-op |
| 24 | +_CODE_HOOKS: t.Dict[CodeType, t.Tuple[HookType, str, t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str]]]]]] = {} |
| 25 | + |
| 26 | + |
| 27 | +def instrument_all_lines(code: CodeType, hook: HookType, path: str, package: str) -> t.Tuple[CodeType, CoverageLines]: |
| 28 | + coverage_tool = sys.monitoring.get_tool(sys.monitoring.COVERAGE_ID) |
| 29 | + if coverage_tool is not None and coverage_tool != "datadog": |
| 30 | + log.debug("Coverage tool '%s' already registered, not gathering coverage", coverage_tool) |
30 | 31 | return code, CoverageLines()
|
31 | 32 |
|
32 |
| -else: |
33 |
| - _CODE_HOOKS: t.Dict[CodeType, t.Tuple[HookType, str, t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str]]]]]] = {} |
| 33 | + if coverage_tool is None: |
| 34 | + log.debug("Registering code coverage tool") |
| 35 | + _register_monitoring() |
| 36 | + |
| 37 | + return _instrument_all_lines_with_monitoring(code, hook, path, package) |
| 38 | + |
| 39 | + |
| 40 | +def _line_event_handler(code: CodeType, line: int) -> t.Any: |
| 41 | + hook, path, import_names = _CODE_HOOKS[code] |
| 42 | + import_name = import_names.get(line, None) |
| 43 | + return hook((line, path, import_name)) |
| 44 | + |
34 | 45 |
|
35 |
| - def _line_event_handler(code: CodeType, line: int) -> t.Any: |
36 |
| - hook, path, import_names = _CODE_HOOKS[code] |
37 |
| - import_name = import_names.get(line, None) |
38 |
| - return hook((line, path, import_name)) |
| 46 | +def _register_monitoring(): |
| 47 | + """ |
| 48 | + Register the coverage tool with the low-impact monitoring system. |
| 49 | + """ |
| 50 | + sys.monitoring.use_tool_id(sys.monitoring.COVERAGE_ID, "datadog") |
39 | 51 |
|
40 | 52 | # Register the line callback
|
41 | 53 | sys.monitoring.register_callback(
|
42 | 54 | sys.monitoring.COVERAGE_ID, sys.monitoring.events.LINE, _line_event_handler
|
43 | 55 | ) # noqa
|
44 | 56 |
|
45 |
| - def instrument_all_lines( |
46 |
| - code: CodeType, hook: HookType, path: str, package: str |
47 |
| - ) -> t.Tuple[CodeType, CoverageLines]: |
48 |
| - # Enable local line events for the code object |
49 |
| - sys.monitoring.set_local_events(sys.monitoring.COVERAGE_ID, code, sys.monitoring.events.LINE) # noqa |
50 |
| - |
51 |
| - # Collect all the line numbers in the code object |
52 |
| - linestarts = dict(dis.findlinestarts(code)) |
53 |
| - |
54 |
| - lines = CoverageLines() |
55 |
| - import_names: t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str, ...]]]] = {} |
56 |
| - |
57 |
| - # The previous two arguments are kept in order to track the depth of the IMPORT_NAME |
58 |
| - # For example, from ...package import module |
59 |
| - current_arg: int = 0 |
60 |
| - previous_arg: int = 0 |
61 |
| - _previous_previous_arg: int = 0 |
62 |
| - current_import_name: t.Optional[str] = None |
63 |
| - current_import_package: t.Optional[str] = None |
64 |
| - |
65 |
| - line = 0 |
66 |
| - |
67 |
| - ext: list[bytes] = [] |
68 |
| - code_iter = iter(enumerate(code.co_code)) |
69 |
| - try: |
70 |
| - while True: |
71 |
| - offset, opcode = next(code_iter) |
72 |
| - _, arg = next(code_iter) |
73 |
| - |
74 |
| - if opcode == RESUME: |
75 |
| - continue |
76 |
| - |
77 |
| - if offset in linestarts: |
78 |
| - line = linestarts[offset] |
79 |
| - lines.add(line) |
80 |
| - |
81 |
| - # Make sure that the current module is marked as depending on its own package by instrumenting the |
82 |
| - # first executable line |
83 |
| - if code.co_name == "<module>" and len(lines) == 1 and package is not None: |
84 |
| - import_names[line] = (package, ("",)) |
85 |
| - |
86 |
| - if opcode is EXTENDED_ARG: |
87 |
| - ext.append(arg) |
88 |
| - continue |
| 57 | + |
| 58 | +def _instrument_all_lines_with_monitoring( |
| 59 | + code: CodeType, hook: HookType, path: str, package: str |
| 60 | +) -> t.Tuple[CodeType, CoverageLines]: |
| 61 | + # Enable local line events for the code object |
| 62 | + sys.monitoring.set_local_events(sys.monitoring.COVERAGE_ID, code, sys.monitoring.events.LINE) # noqa |
| 63 | + |
| 64 | + # Collect all the line numbers in the code object |
| 65 | + linestarts = dict(dis.findlinestarts(code)) |
| 66 | + |
| 67 | + lines = CoverageLines() |
| 68 | + import_names: t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str, ...]]]] = {} |
| 69 | + |
| 70 | + # The previous two arguments are kept in order to track the depth of the IMPORT_NAME |
| 71 | + # For example, from ...package import module |
| 72 | + current_arg: int = 0 |
| 73 | + previous_arg: int = 0 |
| 74 | + _previous_previous_arg: int = 0 |
| 75 | + current_import_name: t.Optional[str] = None |
| 76 | + current_import_package: t.Optional[str] = None |
| 77 | + |
| 78 | + line = 0 |
| 79 | + |
| 80 | + ext: list[bytes] = [] |
| 81 | + code_iter = iter(enumerate(code.co_code)) |
| 82 | + try: |
| 83 | + while True: |
| 84 | + offset, opcode = next(code_iter) |
| 85 | + _, arg = next(code_iter) |
| 86 | + |
| 87 | + if opcode == RESUME: |
| 88 | + continue |
| 89 | + |
| 90 | + if offset in linestarts: |
| 91 | + line = linestarts[offset] |
| 92 | + lines.add(line) |
| 93 | + |
| 94 | + # Make sure that the current module is marked as depending on its own package by instrumenting the |
| 95 | + # first executable line |
| 96 | + if code.co_name == "<module>" and len(lines) == 1 and package is not None: |
| 97 | + import_names[line] = (package, ("",)) |
| 98 | + |
| 99 | + if opcode is EXTENDED_ARG: |
| 100 | + ext.append(arg) |
| 101 | + continue |
| 102 | + else: |
| 103 | + _previous_previous_arg = previous_arg |
| 104 | + previous_arg = current_arg |
| 105 | + current_arg = int.from_bytes([*ext, arg], "big", signed=False) |
| 106 | + ext.clear() |
| 107 | + |
| 108 | + if opcode == IMPORT_NAME: |
| 109 | + import_depth: int = code.co_consts[_previous_previous_arg] |
| 110 | + current_import_name: str = code.co_names[current_arg] |
| 111 | + # Adjust package name if the import is relative and a parent (ie: if depth is more than 1) |
| 112 | + current_import_package: str = ( |
| 113 | + ".".join(package.split(".")[: -import_depth + 1]) if import_depth > 1 else package |
| 114 | + ) |
| 115 | + |
| 116 | + if line in import_names: |
| 117 | + import_names[line] = ( |
| 118 | + current_import_package, |
| 119 | + tuple(list(import_names[line][1]) + [current_import_name]), |
| 120 | + ) |
89 | 121 | else:
|
90 |
| - _previous_previous_arg = previous_arg |
91 |
| - previous_arg = current_arg |
92 |
| - current_arg = int.from_bytes([*ext, arg], "big", signed=False) |
93 |
| - ext.clear() |
94 |
| - |
95 |
| - if opcode == IMPORT_NAME: |
96 |
| - import_depth: int = code.co_consts[_previous_previous_arg] |
97 |
| - current_import_name: str = code.co_names[current_arg] |
98 |
| - # Adjust package name if the import is relative and a parent (ie: if depth is more than 1) |
99 |
| - current_import_package: str = ( |
100 |
| - ".".join(package.split(".")[: -import_depth + 1]) if import_depth > 1 else package |
| 122 | + import_names[line] = (current_import_package, (current_import_name,)) |
| 123 | + |
| 124 | + # Also track import from statements since it's possible that the "from" target is a module, eg: |
| 125 | + # from my_package import my_module |
| 126 | + # Since the package has not changed, we simply extend the previous import names with the new value |
| 127 | + if opcode == IMPORT_FROM: |
| 128 | + import_from_name = f"{current_import_name}.{code.co_names[current_arg]}" |
| 129 | + if line in import_names: |
| 130 | + import_names[line] = ( |
| 131 | + current_import_package, |
| 132 | + tuple(list(import_names[line][1]) + [import_from_name]), |
101 | 133 | )
|
| 134 | + else: |
| 135 | + import_names[line] = (current_import_package, (import_from_name,)) |
| 136 | + |
| 137 | + except StopIteration: |
| 138 | + pass |
| 139 | + |
| 140 | + # Recursively instrument nested code objects |
| 141 | + for nested_code in (_ for _ in code.co_consts if isinstance(_, CodeType)): |
| 142 | + _, nested_lines = instrument_all_lines(nested_code, hook, path, package) |
| 143 | + lines.update(nested_lines) |
| 144 | + |
| 145 | + # Register the hook and argument for the code object |
| 146 | + _CODE_HOOKS[code] = (hook, path, import_names) |
| 147 | + |
| 148 | + # Special case for empty modules (eg: __init__.py ): |
| 149 | + # Make sure line 0 is marked as executable, and add package dependency |
| 150 | + if not lines and code.co_name == "<module>" and code.co_code == EMPTY_MODULE_BYTES: |
| 151 | + lines.add(0) |
| 152 | + if package is not None: |
| 153 | + import_names[0] = (package, ("",)) |
102 | 154 |
|
103 |
| - if line in import_names: |
104 |
| - import_names[line] = ( |
105 |
| - current_import_package, |
106 |
| - tuple(list(import_names[line][1]) + [current_import_name]), |
107 |
| - ) |
108 |
| - else: |
109 |
| - import_names[line] = (current_import_package, (current_import_name,)) |
110 |
| - |
111 |
| - # Also track import from statements since it's possible that the "from" target is a module, eg: |
112 |
| - # from my_package import my_module |
113 |
| - # Since the package has not changed, we simply extend the previous import names with the new value |
114 |
| - if opcode == IMPORT_FROM: |
115 |
| - import_from_name = f"{current_import_name}.{code.co_names[current_arg]}" |
116 |
| - if line in import_names: |
117 |
| - import_names[line] = ( |
118 |
| - current_import_package, |
119 |
| - tuple(list(import_names[line][1]) + [import_from_name]), |
120 |
| - ) |
121 |
| - else: |
122 |
| - import_names[line] = (current_import_package, (import_from_name,)) |
123 |
| - |
124 |
| - except StopIteration: |
125 |
| - pass |
126 |
| - |
127 |
| - # Recursively instrument nested code objects |
128 |
| - for nested_code in (_ for _ in code.co_consts if isinstance(_, CodeType)): |
129 |
| - _, nested_lines = instrument_all_lines(nested_code, hook, path, package) |
130 |
| - lines.update(nested_lines) |
131 |
| - |
132 |
| - # Register the hook and argument for the code object |
133 |
| - _CODE_HOOKS[code] = (hook, path, import_names) |
134 |
| - |
135 |
| - # Special case for empty modules (eg: __init__.py ): |
136 |
| - # Make sure line 0 is marked as executable, and add package dependency |
137 |
| - if not lines and code.co_name == "<module>" and code.co_code == EMPTY_MODULE_BYTES: |
138 |
| - lines.add(0) |
139 |
| - if package is not None: |
140 |
| - import_names[0] = (package, ("",)) |
141 |
| - |
142 |
| - return code, lines |
| 155 | + return code, lines |
0 commit comments