-
Notifications
You must be signed in to change notification settings - Fork 285
/
Copy path_utils.py
261 lines (206 loc) · 8.39 KB
/
_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# Copyright 2017 Palantir Technologies, Inc.
from distutils.version import LooseVersion
import functools
import inspect
import logging
import os
import sys
import threading
import jedi
PY2 = sys.version_info.major == 2
JEDI_VERSION = jedi.__version__
if PY2:
import pathlib2 as pathlib
else:
import pathlib
log = logging.getLogger(__name__)
def debounce(interval_s, keyed_by=None):
"""Debounce calls to this function until interval_s seconds have passed."""
def wrapper(func):
timers = {}
lock = threading.Lock()
@functools.wraps(func)
def debounced(*args, **kwargs):
call_args = inspect.getcallargs(func, *args, **kwargs)
key = call_args[keyed_by] if keyed_by else None
def run():
with lock:
del timers[key]
return func(*args, **kwargs)
with lock:
old_timer = timers.get(key)
if old_timer:
old_timer.cancel()
timer = threading.Timer(interval_s, run)
timers[key] = timer
timer.start()
return debounced
return wrapper
def find_parents(root, path, names):
"""Find files matching the given names relative to the given path.
Args:
path (str): The file path to start searching up from.
names (List[str]): The file/directory names to look for.
root (str): The directory at which to stop recursing upwards.
Note:
The path MUST be within the root.
"""
if not root:
return []
if not os.path.commonprefix((root, path)):
log.warning("Path %s not in %s", path, root)
return []
# Split the relative by directory, generate all the parent directories, then check each of them.
# This avoids running a loop that has different base-cases for unix/windows
# e.g. /a/b and /a/b/c/d/e.py -> ['/a/b', 'c', 'd']
dirs = [root] + os.path.relpath(os.path.dirname(path), root).split(os.path.sep)
# Search each of /a/b/c, /a/b, /a
while dirs:
search_dir = os.path.join(*dirs)
existing = list(filter(os.path.exists, [os.path.join(search_dir, n) for n in names]))
if existing:
return existing
dirs.pop()
# Otherwise nothing
return []
def is_inside_of(path, root, strictly=True):
"""Return whether path is inside of root or not
It is assumed that both path and root are absolute.
If strictly=False, os.path.normcase and os.path.normpath are not
applied on path and root for efficiency. This is useful if
ablsolute "path" is made from relative one and "root" (and already
normpath-ed), for example.
"""
if strictly:
path = os.path.normcase(os.path.normpath(path))
root = os.path.normcase(os.path.normpath(root))
return path == root or path.startswith(root + os.path.sep)
def normalize_paths(paths, basedir, inside_only):
"""Normalize each elements in paths
Relative elements in paths are treated as relative to basedir.
This function yields "(path, validity)" tuple as normalization
result for each elements in paths. If inside_only is specified and path is
not so, validity is False. Otherwise, path is already normalized
as absolute path, and validity is True.
"""
for path in paths:
full_path = os.path.normpath(os.path.join(basedir, path))
if (not inside_only or
# If "inside_only" is specified, path must (1) not be
# absolute (= be relative to basedir), (2) not have
# drive letter (on Windows), and (3) be descendant of
# the root (= "inside_only").
(not os.path.isabs(path) and
not os.path.splitdrive(path)[0] and
is_inside_of(full_path, inside_only, strictly=False))):
yield full_path, True
else:
yield path, False
def match_uri_to_workspace(uri, workspaces):
if uri is None:
return None
max_len, chosen_workspace = -1, None
path = pathlib.Path(uri).parts
for workspace in workspaces:
try:
workspace_parts = pathlib.Path(workspace).parts
except TypeError:
# This can happen in Python2 if 'value' is a subclass of string
workspace_parts = pathlib.Path(unicode(workspace)).parts
if len(workspace_parts) > len(path):
continue
match_len = 0
for workspace_part, path_part in zip(workspace_parts, path):
if workspace_part == path_part:
match_len += 1
if match_len > 0:
if match_len > max_len:
max_len = match_len
chosen_workspace = workspace
return chosen_workspace
def list_to_string(value):
return ",".join(value) if isinstance(value, list) else value
def merge_dicts(dict_a, dict_b):
"""Recursively merge dictionary b into dictionary a.
If override_nones is True, then
"""
def _merge_dicts_(a, b):
for key in set(a.keys()).union(b.keys()):
if key in a and key in b:
if isinstance(a[key], dict) and isinstance(b[key], dict):
yield (key, dict(_merge_dicts_(a[key], b[key])))
elif b[key] is not None:
yield (key, b[key])
else:
yield (key, a[key])
elif key in a:
yield (key, a[key])
elif b[key] is not None:
yield (key, b[key])
return dict(_merge_dicts_(dict_a, dict_b))
def get_config_by_path(settings, path, default_value=None):
"""Get the value in settings dict at the given path.
If path is not resolvable in specified settings, this returns
default_value.
"""
paths = path.split('.')
while len(paths) > 1:
settings = settings.get(paths.pop(0))
if not isinstance(settings, dict):
# Here, at least one more looking up should be available,
# but the last retrieved was non-dict. Therefore, path is
# not resolvable in specified settings.
return default_value
# Here, paths should have only one value
return settings.get(paths[0], default_value)
def format_docstring(contents):
"""Python doc strings come in a number of formats, but LSP wants markdown.
Until we can find a fast enough way of discovering and parsing each format,
we can do a little better by at least preserving indentation.
"""
contents = contents.replace('\t', u'\u00A0' * 4)
contents = contents.replace(' ', u'\u00A0' * 2)
if LooseVersion(JEDI_VERSION) < LooseVersion('0.15.0'):
contents = contents.replace('*', '\\*')
return contents
def clip_column(column, lines, line_number):
# Normalise the position as per the LSP that accepts character positions > line length
# https://github.com/Microsoft/language-server-protocol/blob/master/protocol.md#position
max_column = len(lines[line_number].rstrip('\r\n')) if len(lines) > line_number else 0
return min(column, max_column)
if os.name == 'nt':
import ctypes
kernel32 = ctypes.windll.kernel32
PROCESS_QUERY_INFROMATION = 0x1000
def is_process_alive(pid):
"""Check whether the process with the given pid is still alive.
Running `os.kill()` on Windows always exits the process, so it can't be used to check for an alive process.
see: https://docs.python.org/3/library/os.html?highlight=os%20kill#os.kill
Hence ctypes is used to check for the process directly via windows API avoiding any other 3rd-party dependency.
Args:
pid (int): process ID
Returns:
bool: False if the process is not alive or don't have permission to check, True otherwise.
"""
process = kernel32.OpenProcess(PROCESS_QUERY_INFROMATION, 0, pid)
if process != 0:
kernel32.CloseHandle(process)
return True
return False
else:
import errno
def is_process_alive(pid):
"""Check whether the process with the given pid is still alive.
Args:
pid (int): process ID
Returns:
bool: False if the process is not alive or don't have permission to check, True otherwise.
"""
if pid < 0:
return False
try:
os.kill(pid, 0)
except OSError as e:
return e.errno == errno.EPERM
else:
return True