03 Python 爬蟲教學10 所有文章

【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲

Async異步全站爬蟲_Max行銷誌

以下程式碼範例是來自於 Python 之父 Guido van RossumA. Jesse Jiryu Davis 所一起撰寫的 web crawler ,主要是展示如何使用 asyncio module + aiohttp 來寫網頁異步爬蟲。

Authors: A. Jesse Jiryu Davis and Guido van Rossum
Project: Web crawler

This is a web crawler. You give it a URL and it will crawl that website by following href links in the HTML pages.

The point of the example is to show off how to write a reasonably complex HTTP client application using the asyncio module. This module, originally nicknamed Tulip, is new in the Python 3.4 standard library, based on PEP 3156. The example uses an HTTP client implementation for asyncio called “aiohttp”, by Andrew Svetlov, Nikolay Kim, and others.

https://github.com/aosabook/500lines/tree/master/crawler

ㄧ. 將部分程式更新

原始版本 GitHub 位置於此:500lines/crawling.py at master · aosabook/500lines · GitHub

因為 GitHub 更新時間是四年前,clone 下來後,有些程式使用方法需要被更新:

▍Python 3.8+ 移除 urllib.parse.splitport()

urllib.parse.splitport() 將在 Python 3.8 被移除,所以改使用 urlparse 和 hostname 的方法取得 hostname

parts = urllib.parse.urlparse(root)
host = parts.hostname

參考文件:問題 27485: urllib.splitport — is it official or not? – Python tracker

▍Python 3.10+ 即將移除 @asyncio.coroutine

Generator-based coroutine 的方式將在 Python 3.10 中被移除,所以這邊改用 Native coroutine 的方式,使用 Python 3.5+ library 中 async / await 語法來取代 @asyncio.coroutine

參考文件:Coroutines and Tasks — Python 3.8.2 documentation

▍asyncio.get_event_loop

Python 3.7 推出更簡潔的方法,將 event_loop 封裝,使用 asyncio.run() 一行程式就結束,不用在建立 event_loop 結束時也不需要 loop.close。

參考文件:cpython/runners.py at 3.8 · python/cpython · GitHub

二. 開始解析

▍Python 環境配置:

  • Python 3.7+

▍pip install 安裝套件:

pip install aiohttp

▍開始解析

可以看到 Crawler 類內,我使用註解來區分成三個部分

# 解析爬取到的 url 是否符合需求規範
# 將爬取到的 url 放入列隊
# 主要運行的異步函式

▍完整程式如下:

我將程式閱讀步驟 Step 註解寫在程式旁邊,建議由下往上開始閱讀

import asyncio
import aiohttp  # Install with "pip install aiohttp".
from asyncio import Queue

import cgi
from collections import namedtuple
import logging
import re
import time
import urllib.parse
from urllib.parse import urljoin

FetchStatistic = namedtuple('FetchStatistic', [
    'url', 'next_url', 'status', 'exception', 'size', 'content_type',
    'encoding', 'num_urls', 'num_new_urls'
])


def lenient_host(host):
    parts = host.split('.')[-2:]
    return ''.join(parts)


def is_redirect(response):
    return response.status in (300, 301, 302, 303, 307)


class Crawler:
    def __init__(
            self,
            roots,
            exclude=None,
            strict=True,  # What to crawl.
            max_redirect=10,
            max_tries=4,  # Per-url limits.
            max_tasks=10):
        self.roots = roots  # 使用者指定抓取的網站地址,是一個 list
        self.exclude = exclude
        self.strict = strict
        self.max_redirect = max_redirect
        self.max_tries = max_tries
        self.max_tasks = max_tasks
        self.seen_urls = set()  # 會保證不重複 url 與和已經抓取過的 url
        self.done = []
        self.root_domains = set()

    # 解析爬取到的 url 是否符合需求規範
    def host_okay(self, host):
        """Check if a host should be crawled.

        A literal match (after lowercasing) is always good.  For hosts
        that don't look like IP addresses, some approximate matches
        are okay depending on the strict flag.
        """
        host = host.lower()
        if host in self.root_domains:
            return True
        if re.match(r'\A[\d\.]*\Z', host):
            return False
        if self.strict:
            return self._host_okay_strictish(host)
        else:
            return self._host_okay_lenient(host)

    def _host_okay_strictish(self, host):
        """Check if a host should be crawled, strict-ish version.

        This checks for equality modulo an initial 'www.' component.
        """
        host = host[4:] if host.startswith('www.') else 'www.' + host
        return host in self.root_domains

    def _host_okay_lenient(self, host):
        """Check if a host should be crawled, lenient version.

        This compares the last two components of the host.
        """
        return lenient_host(host) in self.root_domains

    def url_allowed(self, url):
        if self.exclude and re.search(self.exclude, url):
            print('--------------------exclude', url)
            return False
        parts = urllib.parse.urlparse(url)
        if parts.scheme not in ('http', 'https'):
            return False
        host = parts.hostname
        if not self.host_okay(host):
            return False
        return True

    # 將爬取到的 url 放入列隊
    def add_url(self, url, max_redirect=None):
        # print(url)
        if max_redirect is None:
            max_redirect = self.max_redirect
        self.seen_urls.add(url)
        self.q.put_nowait((url, max_redirect))

    def record_statistic(self, fetch_statistic):
        """Record the FetchStatistic for completed / failed URL."""
        self.done.append(fetch_statistic)

    # 以下為主要運行的異步函式
    # Step 5
    async def parse_links(self, response):
        links = set()
        content_type = None
        encoding = None
        body = await response.read()
        if response.status == 200:
            content_type = response.headers.get('content-type')
            pdict = {}

            if content_type:
                content_type, pdict = cgi.parse_header(content_type)
            encoding = pdict.get('charset', 'utf-8')
            if content_type in ('text/html', 'application/xml'):
                text = await response.text()

                urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text))
                for url in urls:
                    url_join = urllib.parse.urljoin(str(response.url), url)
                    defragmented, frag = urllib.parse.urldefrag(url_join)
                    if self.url_allowed(defragmented):
                        links.add(defragmented)
                        print(defragmented)

        stat = FetchStatistic(url=response.url,
                              next_url=None,
                              status=response.status,
                              exception=None,
                              size=len(body),
                              content_type=content_type,
                              encoding=encoding,
                              num_urls=len(links),
                              num_new_urls=len(links - self.seen_urls))

        return stat, links

    # Step 4
    async def fetch(self, url, max_redirect):
        tries = 0
        exception = None
        while tries < self.max_tries:
            # 取得 url 的 response,失敗則在 max_tries 內持續嘗試
            try:
                response = await self.session.get(url, allow_redirects=False)
                break
            except Exception as e:
                exception = e
            tries += 1
        else:
            self.record_statistic(
                FetchStatistic(url=url,
                               next_url=None,
                               status=None,
                               exception=exception,
                               size=0,
                               content_type=None,
                               encoding=None,
                               num_urls=0,
                               num_new_urls=0))
            return
        try:
            # 判斷是否跳轉頁面
            if is_redirect(response):
                location = response.headers['location']
                next_url = urllib.parse.urljoin(url, location)
                self.record_statistic(
                    FetchStatistic(url=url,
                                   next_url=next_url,
                                   status=response.status,
                                   exception=None,
                                   size=0,
                                   content_type=None,
                                   encoding=None,
                                   num_urls=0,
                                   num_new_urls=0))

                if next_url in self.seen_urls:
                    return
                if max_redirect > 0:
                    self.add_url(next_url, max_redirect - 1)
                else:
                    print('redirect limit reached for %r from %r', next_url,
                          url)
            else:
                stat, links = await self.parse_links(response)

                self.record_statistic(stat)
                for link in links.difference(self.seen_urls):
                    self.q.put_nowait((link, self.max_redirect))
                self.seen_urls.update(links)
        finally:
            await response.release()

    # Step 3
    async def work(self):
        try:
            while True:
                url, max_redirect = await self.q.get()
                await self.fetch(url, max_redirect)
                self.q.task_done()

        except asyncio.CancelledError:
            pass
    
    # Step 2
    async def crawl(self):
        self.q = asyncio.Queue()  # 存放所有等待抓取的 url
        self.t0 = time.time()
        self.session = aiohttp.ClientSession()

        for root in self.roots:
            parts = urllib.parse.urlparse(root)
            host = parts.hostname

            # 判斷解析 url 後有無 host
            if not host:
                continue
            # 判斷 host 是否為數字
            if re.match(r'\A[\d\.]*\Z', host):
                self.root_domains.add(host)
            else:
                host = host.lower()
                if self.strict:
                    self.root_domains.add(host)
                else:
                    self.root_domains.add(lenient_host(host))

        for root in self.roots:
            self.add_url(root)

        workers = [
            asyncio.create_task(self.work()) for _ in range(self.max_tasks)
        ]

        await self.q.join()  # 等待列隊 url 清空,將結束任務

        for w in workers:
            w.cancel()

        await self.session.close()

        self.t1 = time.time()

# Step 1
time_start = time.time()
crawler = Crawler(['https://xkcd.com'], max_tasks=30, exclude='.css')
asyncio.run(crawler.crawl())

print(len(crawler.done))
print(time.time() - time_start)

三. 談談 namedtuple

過去寫爬蟲都是用字典 dict 來處理爬下來的數據,可以看到 Python 之父是使用 namedtuple,所以我們來看看 tuple vs namedtuple vs dict 之間有什麼差別?

1. tuple、namedtuple 與 dict 之間差異?

▍namedtuple vs tuple

在使用 tuple 結構的時候,要訪問 tuple 裡的的其中一個值,需使用他的 index 值來取得,而 index 通常會有可讀性與維護性上的困擾。
namedtuple 不但保有 tuple 不可變動的特性,還解決了 index 的問題,可以使用 name 來訪問其中的值,待會下面範例會再講解。

▍namedtuple vs dict

namedtuple 是一個 immutable object,因此他所需要的空間比字典 dict 還要來的少,但字典對於 key 值的搜尋速度比 namedtuple 快,理想上 python 的字典 dict 在搜尋 key 值的時間複雜度是 O(1),而 namedtuple 基本上還是 tuple 結構,所以其時間複雜度為 O(n),若是較重視空間效率的程式會偏好使用 namedtuple,但搜尋速度和 dict 實在是差太多了

如果是像這次爬蟲範例中,單純僅需要寫入儲存資料的話,使用 namedtuple 確實會比 dict 和 tuple 都還適合。

2. namedtuple 使用方法

from collections import namedtuple

▍宣告 namedtuple

Product_detail = namedtuple('Product', ['name', 'price', 'sales', 'ship_fees'])

p0 = Product_detail('Max0', 6666.6, 10, True)
p1 = Product_detail('Max1', 6666.5, 12, False)
p2 = Product_detail('Max2', 6666.4, 11, True)
print(p0, p1, p2)

# 輸出內容:
>>> Product(name='Max0', price=6666.6, sales=10, ship_fees=True) 
>>> Product(name='Max1', price=6666.5, sales=12, ship_fees=False) 
>>> Product(name='Max2', price=6666.4, sales=11, ship_fees=True)

▍將 list 轉換成 namedtuple

product_list = ['Max3', 6666.3, 10, True]
p3 = Product_detail._make(product_list)
print(p3)

# 輸出內容:
>>> Product(name='Max3', price=6666.3, sales=10, ship_fees=True)

▍將 dict 轉換成 namedtuple

produt_dict = {'name': 'Max4', 'price': 6666.2, 'sales': 9, 'ship_fees': True}
p4 = Product_detail(**produt_dict)
print(p4)

# 輸出內容:
>>> Product(name='Max4', price=6666.2, sales=9, ship_fees=True)

▍將 namedtuple 轉換成 dict

produt_nametuple_to_dict = p4._asdict()
print(produt_nametuple_to_dict)

# 輸出內容:
>>> OrderedDict([('name', 'Max4'), ('price', 6666.2), ('sales', 9), ('ship_fees', True)])

四. 談談 urllib.parse

可以看到 Python 之父使用很多 urllib 的套件來解析爬取到的 url,所以我們來了解 urllib 解析 url 的方式:

1. urllib.parse 使用方法

import urllib.parse

▍ urllib.parse.urlparse()

用來解析 url 的 hostname、port、scheme、query 各式參數,這次 urlparse() 主要用來取得 hostname 並判斷是否是內部網址。

url = 'http://www.maxlist.xyz:80/author?user=Max&pass=123#123'
parsed = urllib.parse.urlparse(url)
print(parsed)
print(parsed.hostname)
print(parsed.port)

# 輸出內容:
>>> ParseResult(scheme='http', netloc='www.maxlist.xyz:80', path='/author', params='', query='user=Max&pass=123', fragment='123')
>>> www.maxlist.xyz
>>> 80

▍urllib.parse.urljoin()

爬取到的網址有時後不會有 netloc 的部分,使用 urljoin 來合併 url

crawler_url = '/newsletter/'
response_url = 'https://xkcd.com'
url_join = urllib.parse.urljoin(response_url, crawler_url)
print(url_join)

# 輸出內容:
>>> https://xkcd.com/newsletter/

▍urllib.parse.urldefrag()

不同的 fragment 其實還是同一個頁面,所以在爬取時會將 fragment 進行處理

url = 'http://www.maxlist.xyz:80/author?user=Max&pass=123#remove'
print(urllib.parse.urldefrag(url))

# 輸出內容:
>>> DefragResult(url='http://www.maxlist.xyz:80/author?user=Max&pass=123', fragment='remove')

最後~

▍關於 Async IO 相關其他文章,可以參考:

▍關於與 Concurrency Programming 相關其他文章,可以參考:

那麼有關於【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲 的介紹就到這邊告一個段落囉!有任何問題可以在以下留言~

有關 Max行銷誌的最新文章,都會發佈在 Max 的 Facebook 粉絲專頁,如果想看最新更新,還請您按讚或是追蹤唷!

在〈【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲〉中有 1 則留言

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *