forked from IdentityPython/pyFF
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch.py
142 lines (119 loc) · 4.78 KB
/
fetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import queue
import threading
from datetime import datetime
from pyff.constants import config
from pyff.logs import get_log
from pyff.utils import Watchable, load_callable, url_get
log = get_log(__name__)
def make_resourcestore_instance(*args, **kwargs):
new_store = load_callable(config.resource_store_class)
return new_store(*args, **kwargs)
class ResourceStore(object):
pass
class Fetch(threading.Thread):
"""
Fetch is a thread that calls url_get to retrieve a URL. All URL schemes supported by the python requests
library aswell as file:/// URLs are supported. The Fetch thread is part of a thread pool that works off of
a deque feed by a main Fetcher thread. Results are passed back via another deque owned by the Fetcher. A
content handler callable is called with the response object and the result is passed up to the Fetcher.
"""
def __init__(self, request, response, pool, name, content_handler):
threading.Thread.__init__(self)
self._id = name
self.request = request
self.response = response
self.pool = pool
self.halt = False
self.content_handler = content_handler
self.state('idle')
def state(self, state):
self.name = "{} ({})".format(self._id, state)
def run(self):
while not self.halt:
log.debug("waiting for pool {}....".format(self._id))
with self.pool:
url = self.request.get()
if url is not None:
try:
self.state(url)
r = url_get(url,verify_tls=False)
if self.content_handler is not None:
r = self.content_handler(r)
self.response.put(
{'response': r, 'url': url, 'exception': None, 'last_fetched': datetime.now()}
)
log.debug("successfully fetched {}".format(url))
except Exception as ex:
self.response.put(
{'response': None, 'url': url, 'exception': ex, 'last_fetched': datetime.now()}
)
log.warning("error fetching {}".format(url))
log.warning(ex)
import traceback
log.debug(traceback.format_exc())
finally:
self.state('idle')
self.request.task_done()
class Fetcher(threading.Thread, Watchable):
"""
The main threed managing a pool of Fetch threads. All Fetch instances are initiatlized with the same
content handler callable.
"""
def __init__(self, num_threads=config.worker_pool_size, name="Fetcher", content_handler=None):
threading.Thread.__init__(self)
Watchable.__init__(self)
self._id = name
self.name = '{} (master)'.format(self._id)
self.request = queue.Queue()
self.response = queue.Queue()
self.pool = threading.BoundedSemaphore(num_threads)
self.threads = []
for i in range(0, num_threads):
t = Fetch(self.request, self.response, self.pool, self._id, content_handler)
t.start()
self.threads.append(t)
self.halt = False
def schedule(self, url):
"""
Schedule a URL for retrieval.
:param url: the url to fetch
:return: nothing is returned.
"""
log.info("scheduling fetch of {}".format(url))
self.request.put(url)
def stop(self):
"""
Halt the Fetcher and all Fetch threads.
:return:
"""
log.debug("stopping fetcher")
for t in self.threads:
t.halt = True
for t in self.threads:
self.request.put(None)
for t in self.threads:
t.join()
self.halt = True
self.response.put(None)
def run(self):
"""
Launch the Fetcher. Notify all watchers.
:return: nothing is returned
"""
log.debug("Fetcher ({}) ready & waiting for responses...".format(self._id))
while not self.halt:
info = self.response.get()
if info is not None:
self.notify(**info)
log.debug("Fetcher ({}) exiting...".format(self._id))
def make_fetcher(name="Fetcher", content_handler=None):
"""
A utility method that creates and starts a Fetcher with the specified content handler.
:param name: A name - used in displays and instrumentation
:param content_handler: a callable - passed to the main Fetcher thread
:return: the Fetcher instance in running state
"""
f = Fetcher(name=name, content_handler=content_handler)
f.start()
log.debug("fetcher created: {}".format(f))
return f