forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_typing.py
253 lines (199 loc) · 7.5 KB
/
test_typing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# Owner(s): ["module: typing"]
# based on NumPy numpy/typing/tests/test_typing.py
import itertools
import os
import re
import shutil
import unittest
from collections import defaultdict
from threading import Lock
from typing import IO, Optional
from torch.testing._internal.common_utils import (
instantiate_parametrized_tests,
parametrize,
run_tests,
TestCase,
)
try:
from mypy import api
except ImportError:
NO_MYPY = True
else:
NO_MYPY = False
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "typing"))
REVEAL_DIR = os.path.join(DATA_DIR, "reveal")
PASS_DIR = os.path.join(DATA_DIR, "pass")
FAIL_DIR = os.path.join(DATA_DIR, "fail")
MYPY_INI = os.path.join(os.path.dirname(os.path.dirname(DATA_DIR)), "mypy.ini")
CACHE_DIR = os.path.join(DATA_DIR, ".mypy_cache")
def _key_func(key: str) -> str:
"""Split at the first occurance of the ``:`` character.
Windows drive-letters (*e.g.* ``C:``) are ignored herein.
"""
drive, tail = os.path.splitdrive(key)
return os.path.join(drive, tail.split(":", 1)[0])
def _strip_filename(msg: str) -> str:
"""Strip the filename from a mypy message."""
_, tail = os.path.splitdrive(msg)
return tail.split(":", 1)[-1]
def _run_mypy() -> dict[str, list[str]]:
"""Clears the cache and run mypy before running any of the typing tests."""
if os.path.isdir(CACHE_DIR):
shutil.rmtree(CACHE_DIR)
rc: dict[str, list[str]] = {}
for directory in (REVEAL_DIR, PASS_DIR, FAIL_DIR):
# Run mypy
stdout, stderr, _ = api.run(
[
"--show-absolute-path",
"--config-file",
MYPY_INI,
"--cache-dir",
CACHE_DIR,
directory,
]
)
assert not stderr, stderr
stdout = stdout.replace("*", "")
# Parse the output
iterator = itertools.groupby(stdout.split("\n"), key=_key_func)
rc.update((k, list(v)) for k, v in iterator if k)
return rc
def get_test_cases(directory):
for root, _, files in os.walk(directory):
for fname in files:
if fname.startswith("disabled_"):
continue
if os.path.splitext(fname)[-1] == ".py":
fullpath = os.path.join(root, fname)
yield fullpath
_FAIL_MSG1 = """Extra error at line {}
Extra error: {!r}
"""
_FAIL_MSG2 = """Error mismatch at line {}
Expected error: {!r}
Observed error: {!r}
"""
def _test_fail(
path: str, error: str, expected_error: Optional[str], lineno: int
) -> None:
if expected_error is None:
raise AssertionError(_FAIL_MSG1.format(lineno, error))
elif error not in expected_error:
raise AssertionError(_FAIL_MSG2.format(lineno, expected_error, error))
def _construct_format_dict():
dct = {
"ModuleList": "torch.nn.modules.container.ModuleList",
"AdaptiveAvgPool2d": "torch.nn.modules.pooling.AdaptiveAvgPool2d",
"AdaptiveMaxPool2d": "torch.nn.modules.pooling.AdaptiveMaxPool2d",
"Tensor": "torch._tensor.Tensor",
"Adagrad": "torch.optim.adagrad.Adagrad",
"Adam": "torch.optim.adam.Adam",
}
return dct
#: A dictionary with all supported format keys (as keys)
#: and matching values
FORMAT_DICT: dict[str, str] = _construct_format_dict()
def _parse_reveals(file: IO[str]) -> list[str]:
"""Extract and parse all ``" # E: "`` comments from the passed file-like object.
All format keys will be substituted for their respective value from `FORMAT_DICT`,
*e.g.* ``"{Tensor}"`` becomes ``"torch.tensor.Tensor"``.
"""
string = file.read().replace("*", "")
# Grab all `# E:`-based comments
comments_array = [str.partition(" # E: ")[2] for str in string.split("\n")]
comments = "/n".join(comments_array)
# Only search for the `{*}` pattern within comments,
# otherwise there is the risk of accidently grabbing dictionaries and sets
key_set = set(re.findall(r"\{(.*?)\}", comments))
kwargs = {
k: FORMAT_DICT.get(k, f"<UNRECOGNIZED FORMAT KEY {k!r}>") for k in key_set
}
fmt_str = comments.format(**kwargs)
return fmt_str.split("/n")
_REVEAL_MSG = """Reveal mismatch at line {}
Expected reveal: {!r}
Observed reveal: {!r}
"""
def _test_reveal(path: str, reveal: str, expected_reveal: str, lineno: int) -> None:
if reveal not in expected_reveal:
raise AssertionError(_REVEAL_MSG.format(lineno, expected_reveal, reveal))
@unittest.skipIf(NO_MYPY, reason="Mypy is not installed")
class TestTyping(TestCase):
_lock = Lock()
_cached_output: Optional[dict[str, list[str]]] = None
@classmethod
def get_mypy_output(cls) -> dict[str, list[str]]:
with cls._lock:
if cls._cached_output is None:
cls._cached_output = _run_mypy()
return cls._cached_output
@parametrize(
"path",
get_test_cases(PASS_DIR),
name_fn=lambda b: os.path.relpath(b, start=PASS_DIR),
)
def test_success(self, path) -> None:
output_mypy = self.get_mypy_output()
if path in output_mypy:
msg = "Unexpected mypy output\n\n"
msg += "\n".join(_strip_filename(v) for v in output_mypy[path])
raise AssertionError(msg)
@parametrize(
"path",
get_test_cases(FAIL_DIR),
name_fn=lambda b: os.path.relpath(b, start=FAIL_DIR),
)
def test_fail(self, path):
__tracebackhide__ = True # noqa: F841
with open(path) as fin:
lines = fin.readlines()
errors = defaultdict(str)
output_mypy = self.get_mypy_output()
self.assertIn(path, output_mypy)
for error_line in output_mypy[path]:
error_line = _strip_filename(error_line)
match = re.match(
r"(?P<lineno>\d+):(?P<colno>\d+): (error|note): .+$",
error_line,
)
if match is None:
raise ValueError(f"Unexpected error line format: {error_line}")
lineno = int(match.group("lineno"))
errors[lineno] += f"{error_line}\n"
for i, line in enumerate(lines):
lineno = i + 1
if line.startswith("#") or (" E:" not in line and lineno not in errors):
continue
target_line = lines[lineno - 1]
self.assertIn(
"# E:", target_line, f"Unexpected mypy output\n\n{errors[lineno]}"
)
marker = target_line.split("# E:")[-1].strip()
expected_error = errors.get(lineno)
_test_fail(path, marker, expected_error, lineno)
@parametrize(
"path",
get_test_cases(REVEAL_DIR),
name_fn=lambda b: os.path.relpath(b, start=REVEAL_DIR),
)
def test_reveal(self, path):
__tracebackhide__ = True # noqa: F841
with open(path) as fin:
lines = _parse_reveals(fin)
output_mypy = self.get_mypy_output()
assert path in output_mypy
for error_line in output_mypy[path]:
match = re.match(
r"^.+\.py:(?P<lineno>\d+):(?P<colno>\d+): note: .+$",
error_line,
)
if match is None:
raise ValueError(f"Unexpected reveal line format: {error_line}")
lineno = int(match.group("lineno")) - 1
assert "Revealed type is" in error_line
marker = lines[lineno]
_test_reveal(path, marker, error_line, 1 + lineno)
instantiate_parametrized_tests(TestTyping)
if __name__ == "__main__":
run_tests()