以下程式碼範例是來自於 Python 之父 Guido van Rossum 和 A. Jesse Jiryu Davis 所一起撰寫的 web crawler ,主要是展示如何使用 asyncio module + aiohttp 來寫網頁異步爬蟲。
Authors: A. Jesse Jiryu Davis and Guido van Rossum
Project: Web crawlerThis is a web crawler. You give it a URL and it will crawl that website by following href links in the HTML pages.
The point of the example is to show off how to write a reasonably complex HTTP client application using the asyncio module. This module, originally nicknamed Tulip, is new in the Python 3.4 standard library, based on PEP 3156. The example uses an HTTP client implementation for asyncio called “aiohttp”, by Andrew Svetlov, Nikolay Kim, and others.
https://github.com/aosabook/500lines/tree/master/crawler
Table
ㄧ. 將部分程式更新
原始版本 GitHub 位置於此:500lines/crawling.py at master · aosabook/500lines · GitHub
因為 GitHub 更新時間是四年前,clone 下來後,有些程式使用方法需要被更新:
▍Python 3.8+ 移除 urllib.parse.splitport()
urllib.parse.splitport() 將在 Python 3.8 被移除,所以改使用 urlparse 和 hostname 的方法取得 hostname
1 2 |
parts = urllib.parse.urlparse(root) host = parts.hostname |
參考文件:問題 27485: urllib.splitport — is it official or not? – Python tracker
▍Python 3.10+ 即將移除 @asyncio.coroutine
Generator-based coroutine 的方式將在 Python 3.10 中被移除,所以這邊改用 Native coroutine 的方式,使用 Python 3.5+ library 中 async / await 語法來取代 @asyncio.coroutine
參考文件:Coroutines and Tasks — Python 3.8.2 documentation
▍asyncio.get_event_loop
Python 3.7 推出更簡潔的方法,將 event_loop 封裝,使用 asyncio.run() 一行程式就結束,不用在建立 event_loop 結束時也不需要 loop.close。
參考文件:cpython/runners.py at 3.8 · python/cpython · GitHub
二. 開始解析
▍Python 環境配置:
- Python 3.7+
▍pip install 安裝套件:
1 |
pip install aiohttp |
▍開始解析
可以看到 Crawler 類內,我使用註解來區分成三個部分
# 解析爬取到的 url 是否符合需求規範
# 將爬取到的 url 放入列隊
# 主要運行的異步函式

▍完整程式如下:
我將程式閱讀步驟 Step 註解寫在程式旁邊,建議由下往上開始閱讀
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
import asyncio import aiohttp # Install with "pip install aiohttp". from asyncio import Queue import cgi from collections import namedtuple import logging import re import time import urllib.parse from urllib.parse import urljoin FetchStatistic = namedtuple('FetchStatistic', [ 'url', 'next_url', 'status', 'exception', 'size', 'content_type', 'encoding', 'num_urls', 'num_new_urls' ]) def lenient_host(host): parts = host.split('.')[-2:] return ''.join(parts) def is_redirect(response): return response.status in (300, 301, 302, 303, 307) class Crawler: def __init__( self, roots, exclude=None, strict=True, # What to crawl. max_redirect=10, max_tries=4, # Per-url limits. max_tasks=10): self.roots = roots # 使用者指定抓取的網站地址,是一個 list self.exclude = exclude self.strict = strict self.max_redirect = max_redirect self.max_tries = max_tries self.max_tasks = max_tasks self.seen_urls = set() # 會保證不重複 url 與和已經抓取過的 url self.done = [] self.root_domains = set() # 解析爬取到的 url 是否符合需求規範 def host_okay(self, host): """Check if a host should be crawled. A literal match (after lowercasing) is always good. For hosts that don't look like IP addresses, some approximate matches are okay depending on the strict flag. """ host = host.lower() if host in self.root_domains: return True if re.match(r'\A[\d\.]*\Z', host): return False if self.strict: return self._host_okay_strictish(host) else: return self._host_okay_lenient(host) def _host_okay_strictish(self, host): """Check if a host should be crawled, strict-ish version. This checks for equality modulo an initial 'www.' component. """ host = host[4:] if host.startswith('www.') else 'www.' + host return host in self.root_domains def _host_okay_lenient(self, host): """Check if a host should be crawled, lenient version. This compares the last two components of the host. """ return lenient_host(host) in self.root_domains def url_allowed(self, url): if self.exclude and re.search(self.exclude, url): print('--------------------exclude', url) return False parts = urllib.parse.urlparse(url) if parts.scheme not in ('http', 'https'): return False host = parts.hostname if not self.host_okay(host): return False return True # 將爬取到的 url 放入列隊 def add_url(self, url, max_redirect=None): # print(url) if max_redirect is None: max_redirect = self.max_redirect self.seen_urls.add(url) self.q.put_nowait((url, max_redirect)) def record_statistic(self, fetch_statistic): """Record the FetchStatistic for completed / failed URL.""" self.done.append(fetch_statistic) # 以下為主要運行的異步函式 # Step 5 async def parse_links(self, response): links = set() content_type = None encoding = None body = await response.read() if response.status == 200: content_type = response.headers.get('content-type') pdict = {} if content_type: content_type, pdict = cgi.parse_header(content_type) encoding = pdict.get('charset', 'utf-8') if content_type in ('text/html', 'application/xml'): text = await response.text() urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text)) for url in urls: url_join = urllib.parse.urljoin(str(response.url), url) defragmented, frag = urllib.parse.urldefrag(url_join) if self.url_allowed(defragmented): links.add(defragmented) print(defragmented) stat = FetchStatistic(url=response.url, next_url=None, status=response.status, exception=None, size=len(body), content_type=content_type, encoding=encoding, num_urls=len(links), num_new_urls=len(links - self.seen_urls)) return stat, links # Step 4 async def fetch(self, url, max_redirect): tries = 0 exception = None while tries < self.max_tries: # 取得 url 的 response,失敗則在 max_tries 內持續嘗試 try: response = await self.session.get(url, allow_redirects=False) break except Exception as e: exception = e tries += 1 else: self.record_statistic( FetchStatistic(url=url, next_url=None, status=None, exception=exception, size=0, content_type=None, encoding=None, num_urls=0, num_new_urls=0)) return try: # 判斷是否跳轉頁面 if is_redirect(response): location = response.headers['location'] next_url = urllib.parse.urljoin(url, location) self.record_statistic( FetchStatistic(url=url, next_url=next_url, status=response.status, exception=None, size=0, content_type=None, encoding=None, num_urls=0, num_new_urls=0)) if next_url in self.seen_urls: return if max_redirect > 0: self.add_url(next_url, max_redirect - 1) else: print('redirect limit reached for %r from %r', next_url, url) else: stat, links = await self.parse_links(response) self.record_statistic(stat) for link in links.difference(self.seen_urls): self.q.put_nowait((link, self.max_redirect)) self.seen_urls.update(links) finally: await response.release() # Step 3 async def work(self): try: while True: url, max_redirect = await self.q.get() await self.fetch(url, max_redirect) self.q.task_done() except asyncio.CancelledError: pass # Step 2 async def crawl(self): self.q = asyncio.Queue() # 存放所有等待抓取的 url self.t0 = time.time() self.session = aiohttp.ClientSession() for root in self.roots: parts = urllib.parse.urlparse(root) host = parts.hostname # 判斷解析 url 後有無 host if not host: continue # 判斷 host 是否為數字 if re.match(r'\A[\d\.]*\Z', host): self.root_domains.add(host) else: host = host.lower() if self.strict: self.root_domains.add(host) else: self.root_domains.add(lenient_host(host)) for root in self.roots: self.add_url(root) workers = [ asyncio.create_task(self.work()) for _ in range(self.max_tasks) ] await self.q.join() # 等待列隊 url 清空,將結束任務 for w in workers: w.cancel() await self.session.close() self.t1 = time.time() # Step 1 time_start = time.time() crawler = Crawler(['https://xkcd.com'], max_tasks=30, exclude='.css') asyncio.run(crawler.crawl()) print(len(crawler.done)) print(time.time() - time_start) |
三. 談談 namedtuple
過去寫爬蟲都是用字典 dict 來處理爬下來的數據,可以看到 Python 之父是使用 namedtuple,所以我們來看看 tuple vs namedtuple vs dict 之間有什麼差別?
1. tuple、namedtuple 與 dict 之間差異?
▍namedtuple vs tuple
在使用 tuple 結構的時候,要訪問 tuple 裡的的其中一個值,需使用他的 index 值來取得,而 index 通常會有可讀性與維護性上的困擾。
namedtuple 不但保有 tuple 不可變動的特性,還解決了 index 的問題,可以使用 name 來訪問其中的值,待會下面範例會再講解。
▍namedtuple vs dict
namedtuple 是一個 immutable object,因此他所需要的空間比字典 dict 還要來的少,但字典對於 key 值的搜尋速度比 namedtuple 快,理想上 python 的字典 dict 在搜尋 key 值的時間複雜度是 O(1),而 namedtuple 基本上還是 tuple 結構,所以其時間複雜度為 O(n),若是較重視空間效率的程式會偏好使用 namedtuple,但搜尋速度和 dict 實在是差太多了
如果是像這次爬蟲範例中,單純僅需要寫入儲存資料的話,使用 namedtuple 確實會比 dict 和 tuple 都還適合。
2. namedtuple 使用方法
1 |
from collections import namedtuple |
▍宣告 namedtuple
1 2 3 4 5 6 7 8 9 10 11 |
Product_detail = namedtuple('Product', ['name', 'price', 'sales', 'ship_fees']) p0 = Product_detail('Max0', 6666.6, 10, True) p1 = Product_detail('Max1', 6666.5, 12, False) p2 = Product_detail('Max2', 6666.4, 11, True) print(p0, p1, p2) # 輸出內容: >>> Product(name='Max0', price=6666.6, sales=10, ship_fees=True) >>> Product(name='Max1', price=6666.5, sales=12, ship_fees=False) >>> Product(name='Max2', price=6666.4, sales=11, ship_fees=True) |
▍將 list 轉換成 namedtuple
1 2 3 4 5 6 |
product_list = ['Max3', 6666.3, 10, True] p3 = Product_detail._make(product_list) print(p3) # 輸出內容: >>> Product(name='Max3', price=6666.3, sales=10, ship_fees=True) |
▍將 dict 轉換成 namedtuple
1 2 3 4 5 6 |
produt_dict = {'name': 'Max4', 'price': 6666.2, 'sales': 9, 'ship_fees': True} p4 = Product_detail(**produt_dict) print(p4) # 輸出內容: >>> Product(name='Max4', price=6666.2, sales=9, ship_fees=True) |
▍將 namedtuple 轉換成 dict
1 2 3 4 5 |
produt_nametuple_to_dict = p4._asdict() print(produt_nametuple_to_dict) # 輸出內容: >>> OrderedDict([('name', 'Max4'), ('price', 6666.2), ('sales', 9), ('ship_fees', True)]) |
四. 談談 urllib.parse
可以看到 Python 之父使用很多 urllib 的套件來解析爬取到的 url,所以我們來了解 urllib 解析 url 的方式:
1. urllib.parse 使用方法
1 |
import urllib.parse |
▍ urllib.parse.urlparse()
用來解析 url 的 hostname、port、scheme、query 各式參數,這次 urlparse() 主要用來取得 hostname 並判斷是否是內部網址。
1 2 3 4 5 6 7 8 9 10 |
url = 'http://www.maxlist.xyz:80/author?user=Max&pass=123#123' parsed = urllib.parse.urlparse(url) print(parsed) print(parsed.hostname) print(parsed.port) # 輸出內容: >>> ParseResult(scheme='http', netloc='www.maxlist.xyz:80', path='/author', params='', query='user=Max&pass=123', fragment='123') >>> www.maxlist.xyz >>> 80 |
▍urllib.parse.urljoin()
爬取到的網址有時後不會有 netloc 的部分,使用 urljoin 來合併 url
1 2 3 4 5 6 7 |
crawler_url = '/newsletter/' response_url = 'https://xkcd.com' url_join = urllib.parse.urljoin(response_url, crawler_url) print(url_join) # 輸出內容: >>> https://xkcd.com/newsletter/ |
▍urllib.parse.urldefrag()
不同的 fragment 其實還是同一個頁面,所以在爬取時會將 fragment 進行處理
1 2 3 4 5 |
url = 'http://www.maxlist.xyz:80/author?user=Max&pass=123#remove' print(urllib.parse.urldefrag(url)) # 輸出內容: >>> DefragResult(url='http://www.maxlist.xyz:80/author?user=Max&pass=123', fragment='remove') |
最後~
▍關於 Async IO 相關其他文章,可以參考:
- 【Python教學】淺談 Coroutine 協程使用方法
- 【Python教學】Async IO Design Patterns 範例程式
- 【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲
▍關於與 Concurrency Programming 相關其他文章,可以參考:
- 【Python教學】淺談 Concurrency Programming
- 【Python教學】淺談 GIL & Thread-safe & Atomic
- 【Python教學】淺談 Multi-processing & Multi-threading 使用方法
- 【Python教學】淺談 Multi-processing pool 使用方法
那麼有關於【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲 的介紹就到這邊告一個段落囉!有任何問題可以在以下留言~
有關 Max行銷誌的最新文章,都會發佈在 Max 的 Facebook 粉絲專頁,如果想看最新更新,還請您按讚或是追蹤唷!
在〈【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲〉中有 1 則留言
写的太棒啦!
留言功能已關閉。