-
-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathitems.py
219 lines (185 loc) · 7.16 KB
/
items.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4 nu
""" libzim Item helpers """
import io
import pathlib
import re
import tempfile
import urllib.parse
from typing import Any, Optional
import libzim.writer # pyright: ignore
from zimscraperlib.download import stream_file
from zimscraperlib.zim.providers import (
FileLikeProvider,
FileProvider,
StringProvider,
URLProvider,
)
class Item(libzim.writer.Item):
"""libzim.writer.Item returning props for path/title/mimetype"""
def __init__(
self,
path: Optional[str] = None,
title: Optional[str] = None,
mimetype: Optional[str] = None,
hints: Optional[dict] = None,
**kwargs: Any,
):
super().__init__()
if path:
kwargs["path"] = path
if title:
kwargs["title"] = title
if mimetype:
kwargs["mimetype"] = mimetype
if hints:
kwargs["hints"] = hints
for k, v in kwargs.items():
setattr(self, k, v)
@property
def should_index(self):
return self.get_mimetype().startswith("text/html")
def get_path(self) -> str:
return getattr(self, "path", "")
def get_title(self) -> str:
return getattr(self, "title", "")
def get_mimetype(self) -> str:
return getattr(self, "mimetype", "")
def get_hints(self) -> dict:
return getattr(self, "hints", {})
class StaticItem(Item):
"""scraperlib Item with auto contentProvider from `content` or `filepath`
Sets a `ref` to itself on the File/String content providers so it outlives them
We need Item to survive its ContentProvider so that we can track lifecycle
more efficiently: now when the libzim destroys the CP, python will destroy
the Item and we can be notified that we're effectively through with our content"""
def __init__(
self,
content: Optional[str] = None,
fileobj: Optional[io.IOBase] = None,
filepath: Optional[pathlib.Path] = None,
path: Optional[str] = None,
title: Optional[str] = None,
mimetype: Optional[str] = None,
hints: Optional[dict] = None,
**kwargs: Any,
):
if content:
kwargs["content"] = content
if fileobj:
kwargs["fileobj"] = fileobj
if filepath:
kwargs["filepath"] = filepath
super().__init__(
path=path, title=title, mimetype=mimetype, hints=hints, **kwargs
)
def get_contentprovider(self) -> libzim.writer.ContentProvider:
# content was set manually
content = getattr(self, "content", None)
if content is not None:
return StringProvider(content=content, ref=self)
# using a file-like object
fileobj = getattr(self, "fileobj", None)
if fileobj:
return FileLikeProvider(
fileobj=fileobj, ref=self, size=getattr(self, "size", None)
)
# we had to download locally to get size
filepath = getattr(self, "filepath", None)
if filepath:
return FileProvider(
filepath=filepath, ref=self, size=getattr(self, "size", None)
)
raise NotImplementedError("No data to provide`")
class URLItem(StaticItem):
"""StaticItem to automatically fetch and feed an URL resource
Appropriate for retrieving/bundling static assets that you don't need to
post-process.
Uses URL's path as zim path if none provided
Keeps single in-memory copy of content for HTML resources (indexed)
Works transparently on servers returning a Content-Length header (most)
*Swaps* a copy of the content either in memory or on disk (`use_disk=True`)
in case the content size could not be retrieved from headers.
Use `tmp_dir` to point location of that temp file."""
@staticmethod
def download_for_size(url, on_disk, tmp_dir=None):
"""Download URL to a temp file and return its tempfile and size"""
fpath = stream = None
if on_disk:
suffix = pathlib.Path(re.sub(r"^/", "", url.path)).suffix
fpath = pathlib.Path(
tempfile.NamedTemporaryFile(
suffix=suffix, delete=False, dir=tmp_dir
).name
)
else:
stream = io.BytesIO()
size, _ = stream_file(url.geturl(), fpath=fpath, byte_stream=stream)
return fpath or stream, size
def __init__(
self,
url: str,
path: Optional[str] = None,
title: Optional[str] = None,
mimetype: Optional[str] = None,
hints: Optional[dict] = None,
use_disk: Optional[bool] = None,
**kwargs: Any,
):
if use_disk:
kwargs["use_disk"] = use_disk
super().__init__(
path=path, title=title, mimetype=mimetype, hints=hints, **kwargs
)
self.url = urllib.parse.urlparse(url)
use_disk = getattr(self, "use_disk", False)
# fetch headers to retrieve size and type
try:
_, self.headers = stream_file(
url, byte_stream=io.BytesIO(), only_first_block=True
)
except Exception as exc:
raise OSError(f"Unable to access URL at {url}: {exc}") from None
# HTML content will be indexed.
# we proxy the content in the Item to prevent double-download of the resource
# we use a value-variable to prevent race-conditions in the multiple
# reads of the content in the provider
if self.should_index:
self.fileobj = io.BytesIO()
self.size, _ = stream_file(self.url.geturl(), byte_stream=self.fileobj)
return
try:
# Encoded data (compressed) prevents us from using Content-Length header
# as source for the content (it represents length of compressed data)
if self.headers.get("Content-Encoding", "identity") != "identity":
raise ValueError("Can't trust Content-Length for size")
# non-html, non-compressed data.
self.size = int(self.headers["Content-Length"])
except Exception:
# we couldn't retrieve size so we have to download resource to
target, self.size = self.download_for_size(
self.url, on_disk=use_disk, tmp_dir=getattr(self, "tmp_dir", None)
)
# downloaded to disk and using a file path from now on
if use_disk:
self.filepath = target
# downloaded to RAM and using a bytes object
else:
self.fileobj = target
def get_path(self) -> str:
return getattr(self, "path", re.sub(r"^/", "", self.url.path))
def get_title(self) -> str:
return getattr(self, "title", "")
def get_mimetype(self) -> str:
return getattr(
self,
"mimetype",
self.headers.get("Content-Type", "application/octet-stream"),
)
def get_contentprovider(self):
try:
return super().get_contentprovider()
except NotImplementedError:
return URLProvider(
url=self.url.geturl(), size=getattr(self, "size", None), ref=self
)