Skip to content

Commit cbbdb85

Browse files
Add async crawler video
1 parent 59e2828 commit cbbdb85

File tree

3 files changed

+234
-0
lines changed

3 files changed

+234
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ James and his team are available for consulting, contracting, code reviews, and
1212

1313
| N | Code | Video |
1414
| -- | --- |--- |
15+
| 117 | [src](videos/117_hello_async) | [Intro to async Python | Writing a Web Crawler](https://youtu.be/ftmdDlwMwwQ) |
1516
| 116 | [src](videos/116_complex_fraction) | [Complex (Gaussian) Rationals - Extending Python's Number hierarchy](https://youtu.be/lcm4tYGmAig) |
1617
| 115 | [src](videos/115_fast_pow) | [Fast pow](https://youtu.be/GrNJE6ogyQU) |
1718
| 114 | [src](videos/114_copy_or_no_copy) | [Python Iterators! COPY or NO COPY?](https://youtu.be/hVFKy9Gw95c) |
+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
import time
3+
4+
5+
async def do_work(s: str, delay_s: float = 1.0):
6+
print(f"{s} started")
7+
await asyncio.sleep(delay_s)
8+
print(f"{s} done")
9+
10+
11+
async def main():
12+
start = time.perf_counter()
13+
14+
todo = ['get package', 'laundry', 'bake cake']
15+
16+
tasks = [asyncio.create_task(do_work(item)) for item in todo]
17+
done, pending = await asyncio.wait(tasks)
18+
for task in done:
19+
result = task.result()
20+
21+
tasks = [asyncio.create_task(do_work(item)) for item in todo]
22+
results = await asyncio.gather(*tasks, return_exceptions=True)
23+
24+
coros = [do_work(item) for item in todo]
25+
results = await asyncio.gather(*coros, return_exceptions=True)
26+
27+
async with asyncio.TaskGroup() as tg: # Python 3.11+
28+
tasks = [tg.create_task(do_work(item)) for item in todo]
29+
30+
end = time.perf_counter()
31+
print(f"it took: {end - start:.2f}s")
32+
33+
34+
if __name__ == '__main__':
35+
asyncio.run(main())

videos/117_hello_async/crawl.py

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import html.parser
5+
import pathlib
6+
import time
7+
import urllib.parse
8+
from typing import Callable, Iterable
9+
10+
import httpx # https://github.com/encode/httpx
11+
12+
13+
class UrlFilterer:
14+
def __init__(
15+
self,
16+
allowed_domains: set[str] | None = None,
17+
allowed_schemes: set[str] | None = None,
18+
allowed_filetypes: set[str] | None = None,
19+
):
20+
self.allowed_domains = allowed_domains
21+
self.allowed_schemes = allowed_schemes
22+
self.allowed_filetypes = allowed_filetypes
23+
24+
def filter_url(self, base: str, url: str) -> str | None:
25+
url = urllib.parse.urljoin(base, url)
26+
url, _frag = urllib.parse.urldefrag(url)
27+
parsed = urllib.parse.urlparse(url)
28+
if (self.allowed_schemes is not None
29+
and parsed.scheme not in self.allowed_schemes):
30+
return None
31+
if (self.allowed_domains is not None
32+
and parsed.netloc not in self.allowed_domains):
33+
return None
34+
ext = pathlib.Path(parsed.path).suffix
35+
if (self.allowed_filetypes is not None
36+
and ext not in self.allowed_filetypes):
37+
return None
38+
return url
39+
40+
41+
class UrlParser(html.parser.HTMLParser):
42+
def __init__(
43+
self,
44+
base: str,
45+
filter_url: Callable[[str, str], str | None]
46+
):
47+
super().__init__()
48+
self.base = base
49+
self.filter_url = filter_url
50+
self.found_links = set()
51+
52+
def handle_starttag(self, tag: str, attrs):
53+
# look for <a href="...">
54+
if tag != "a":
55+
return
56+
57+
for attr, url in attrs:
58+
if attr != "href":
59+
continue
60+
61+
if (url := self.filter_url(self.base, url)) is not None:
62+
self.found_links.add(url)
63+
64+
65+
class Crawler:
66+
def __init__(
67+
self,
68+
client: httpx.AsyncClient,
69+
urls: Iterable[str],
70+
filter_url: Callable[[str, str], str | None],
71+
workers: int = 10,
72+
limit: int = 25,
73+
):
74+
self.client = client
75+
76+
self.start_urls = set(urls)
77+
self.todo = asyncio.Queue()
78+
self.seen = set()
79+
self.done = set()
80+
81+
self.filter_url = filter_url
82+
self.num_workers = workers
83+
self.limit = limit
84+
self.total = 0
85+
86+
async def run(self):
87+
await self.on_found_links(self.start_urls) # prime the queue
88+
workers = [
89+
asyncio.create_task(self.worker())
90+
for _ in range(self.num_workers)
91+
]
92+
await self.todo.join()
93+
94+
for worker in workers:
95+
worker.cancel()
96+
97+
async def worker(self):
98+
while True:
99+
try:
100+
await self.process_one()
101+
except asyncio.CancelledError:
102+
return
103+
104+
async def process_one(self):
105+
url = await self.todo.get()
106+
try:
107+
await self.crawl(url)
108+
except Exception as exc:
109+
# retry handling here...
110+
pass
111+
finally:
112+
self.todo.task_done()
113+
114+
async def crawl(self, url: str):
115+
116+
# rate limit here...
117+
await asyncio.sleep(.1)
118+
119+
response = await self.client.get(url, follow_redirects=True)
120+
121+
found_links = await self.parse_links(
122+
base=str(response.url),
123+
text=response.text,
124+
)
125+
126+
await self.on_found_links(found_links)
127+
128+
self.done.add(url)
129+
130+
async def parse_links(self, base: str, text: str) -> set[str]:
131+
parser = UrlParser(base, self.filter_url)
132+
parser.feed(text)
133+
return parser.found_links
134+
135+
async def on_found_links(self, urls: set[str]):
136+
new = urls - self.seen
137+
self.seen.update(new)
138+
139+
# await save to database or file here...
140+
141+
for url in new:
142+
await self.put_todo(url)
143+
144+
async def put_todo(self, url: str):
145+
if self.total >= self.limit:
146+
return
147+
self.total += 1
148+
await self.todo.put(url)
149+
150+
151+
async def main():
152+
filterer = UrlFilterer(
153+
allowed_domains={"mcoding.io"},
154+
allowed_schemes={"http", "https"},
155+
allowed_filetypes={".html", ".php", ""},
156+
)
157+
158+
start = time.perf_counter()
159+
async with httpx.AsyncClient() as client:
160+
crawler = Crawler(
161+
client=client,
162+
urls=["https://mcoding.io/"],
163+
filter_url=filterer.filter_url,
164+
workers=5,
165+
limit=25,
166+
)
167+
await crawler.run()
168+
end = time.perf_counter()
169+
170+
seen = sorted(crawler.seen)
171+
print("Results:")
172+
for url in seen:
173+
print(url)
174+
print(f"Crawled: {len(crawler.done)} URLs")
175+
print(f"Found: {len(seen)} URLs")
176+
print(f"Done in {end - start:.2f}s")
177+
178+
179+
if __name__ == '__main__':
180+
asyncio.run(main(), debug=True)
181+
182+
183+
async def homework():
184+
"""
185+
Ideas for you to implement to test your understanding:
186+
- Respect robots.txt *IMPORTANT*
187+
- Find all links in sitemap.xml
188+
- Provide a user agent
189+
- Normalize urls (make sure not to count mcoding.io and mcoding.io/ as separate)
190+
- Skip filetypes (jpg, pdf, etc.) or include only filetypes (html, php, etc.)
191+
- Max depth
192+
- Max concurrent connections per domain
193+
- Rate limiting
194+
- Rate limiting per domain
195+
- Store connections as graph
196+
- Store results to database
197+
- Scale
198+
"""

0 commit comments

Comments
 (0)