03 Python 爬蟲教學10 所有文章

【Python教學】Async IO Design Patterns 範例程式

asyncio_程式範例_Max行銷誌

關於本篇將會介紹 Async IO 的兩種設計模式:

  • 範例ㄧ. 協程鏈 Chaining Coroutines
  • 範例二. 協程與列隊 Coroutines with Queue

範例ㄧ. 協程鏈 Chaining Coroutines

在上一篇 【Python教學】淺談 Coroutine 協程使用方法 中有提到協程具有 Awaitables 可等待的特性,因此我們可以利用此特性將程序分解為較小的,可管理的,可回收的協程。

如下面 Chaining Coroutines 協程程式範例的例子,我們將任務分成三個協程函數,分別是 step_one、step_two 和 step_three,並在協程函數 asyncio_chain 中調度這三個任務。

當第一個任務做完之後,會將結果傳入第二個任務,依序執行到第三個任務,當此協程都完成後透過 asyncio.gather 會先將此協程的任務結果先存起來,等所有協程都完成後,透過 print(result) 可以看到所有執行任務返回結果。

import asyncio
import time
import random

now = lambda: time.time()

# 任務一
async def step_one(num, n):
    time_sleep = random.randint(0, 1)
    print('Task {},step one, sleep {}'.format(n, time_sleep))
    await asyncio.sleep(time_sleep)
    print('Task {},step one, wakeup'.format(n))
    num += 1
    return num

# 任務二
async def step_two(num, n):
    time_sleep = random.randint(0, n)
    print('Task {},step two, sleep {}'.format(n, time_sleep))

    await asyncio.sleep(time_sleep)
    print('Task {},step two, wakeup'.format(n))
    num += 2
    return num

# 任務三
async def step_three(num, n):
    time_sleep = random.randint(0, n)
    print('Task {},step three, sleep {}'.format(n, time_sleep))
    await asyncio.sleep(time_sleep)
    print('Task {},step three, wakeup'.format(n))
    num += 3
    return [n, num]

# 任務調度
async def asyncio_chain(n):
    s1 = await step_one(n, n)
    s2 = await step_two(s1, n)
    s3 = await step_three(s2, n)
    return s3

# 收集任務結果
async def main():
    tasks = [asyncio_chain(n) for n in range(3)]
    result = await asyncio.gather(*tasks)
    print(result)


if __name__ == "__main__":

    start = now()
    asyncio.run(main())
    print('TIME: ', now() - start)

看完 Coroutines 範例後,可以了解到 Chaining Coroutines 協程鏈的優點就是不用等待所有的第一階段任務完成後,才能進入第二階段。這些 Coroutines 明確地在鏈中相互等待並且傳遞資訊。

輸出結果:

Task 0,step one, sleep 1
Task 1,step one, sleep 1
Task 2,step one, sleep 0
Task 2,step one, wakeup
Task 2,step two, sleep 0
Task 2,step two, wakeup
Task 2,step three, sleep 1
Task 0,step one, wakeup
Task 0,step two, sleep 0
Task 1,step one, wakeup
Task 1,step two, sleep 0
Task 2,step three, wakeup
Task 0,step two, wakeup
Task 0,step three, sleep 0
Task 1,step two, wakeup
Task 1,step three, sleep 0
Task 0,step three, wakeup
Task 1,step three, wakeup
[[0, 6], [1, 7], [2, 8]]
TIME:  1.00606107711792

範例二. 協程與列隊 Coroutines with Queue

這次範例設計中我們將任務分成生產者 (producers) 和消費者 (consumers),和上次不同的是沒有任何生產者 (producers) 協程鏈接到消費者 (consumers) 協程,彼此也不知道對方的生產數量,一切都透過列隊 Queue 來進行通信。

每個生產者 (producers) 可以在交錯,隨機,未通知的時間將多個項目添加到 Queue 中。一群消費者 (consumers) 在將它們從 Queue 中拉出,而不必等待任何其他信號。

▍關於生產者與消費者問題,維基百科有更詳細的定義:

In computing, the producer–consumer problem[1][2] (also known as the bounded-buffer problem) is a classic example of a multi-processsynchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer’s job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won’t try to add data into the buffer if it’s full and that the consumer won’t try to remove data from an empty buffer.

Producer-consumer problem

▍列隊 Queue :

Python標準庫中包含了四種列隊,分別是:

  1. queue.Queue
  2. asyncio.Queue
  3. multiprocessing.Queue
  4. collections.deque

而這次要介紹的是 asyncio.Queue,用法和其他的 Queue 差異不大,只是要留意的他並非 Tread-safe。
如果想了解 Thread-safe 的朋友,可以參考之前的這篇【Python教學】淺談 GIL & Thread-safe & Atomic,會解釋什麼是 Thread-safe,如何避免以及為何會有 GIL 的出現。

▍關於 asyncio.Queue 使用方法教學:

  • q = asyncio.Queue(maxsize=num_consumers):限制列隊大小
  • q.put():放入
  • q.get():取得
  • q.task_done():在完成一項工作之後,向隊列發送一個信號
  • q.join():實際上意味著等到列隊為空,再執行後續操作
  • q.qsize():返回列隊的大小
  • q.empty(): 如果列隊為空,返回 True,反之 False
  • q.full(): 如果列隊滿了,返回 True,反之 False

了解列隊之後,進入正題:

import asyncio

# 消費者
async def consumer(n, q):
    print('consumer {}: 進入商店'.format(n))
    while True:
        item = await q.get()
        print('consumer {}: 購買產品 {}'.format(n, item))
        await asyncio.sleep(0.15)
        q.task_done()
    print('consumer {}: ending'.format(n))

# 生產者 A
async def producer_a(q, num_workers):
    print('生產者A: 開始生產')
    for i in range(num_workers * 2):
        await q.put('A ' + str(i))
        print('生產者A: 上架產品A {}'.format(i))
        await asyncio.sleep(0.01)

# 生產者 B
async def producer_b(q, num_workers):
    print('生產者B: 開始生產')
    for i in range(num_workers * 2):
        await q.put('B ' + str(i))
        print('生產者B: 上架產品B {}'.format(i))
        await asyncio.sleep(0.02)

# 任務調度
async def main(num_consumers, num_workers):
    q = asyncio.Queue(maxsize=num_consumers)

    prod_a = [asyncio.create_task(producer_a(q, num_workers))]
    prod_b = [asyncio.create_task(producer_b(q, num_workers))]

    consumers = [
        asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
    ]

    await asyncio.gather(*prod_a, *prod_b)
    print('生產者 All: ending')

    await q.join()
    print('consumers All: ending')

    for c in consumers:
        c.cancel()

# main(消費者數量, 生產者數量)
asyncio.run(main(3, 2))

從 q = asyncio.Queue(maxsize=num_consumers) 可以限制列隊大小,也就是說當生產者生產數量超過最大值時,會開始休息,直到列隊訊息被消費者取走後,才會開始繼續生產產品。

而 q.join() 則是等待所有列隊已經清空後才結束任務進行 c.cancel() 的動作。

最後輸出結果:

生產者A: 開始生產
生產者A: 上架產品A 0
生產者B: 開始生產
生產者B: 上架產品B 0
consumer 0: 進入商店
consumer 0: 購買產品 A 0
consumer 1: 進入商店
consumer 1: 購買產品 B 0
consumer 2: 進入商店
生產者A: 上架產品A 1
consumer 2: 購買產品 A 1
生產者B: 上架產品B 1
生產者A: 上架產品A 2
生產者A: 上架產品A 3
consumer 0: 購買產品 B 1
consumer 1: 購買產品 A 2
生產者B: 上架產品B 2
consumer 2: 購買產品 A 3
生產者B: 上架產品B 3
生產者 All: ending
consumer 0: 購買產品 B 2
consumer 1: 購買產品 B 3
consumers All: ending

最後~

▍關於 Async IO 相關其他文章,可以參考:

▍關於與 Concurrency Programming 相關其他文章,可以參考:

那麼有關於【Python教學】Async IO Design Patterns 範例程式 的介紹就到這邊告一個段落囉!有任何問題可以在以下留言~

有關 Max行銷誌的最新文章,都會發佈在 Max 的 Facebook 粉絲專頁,如果想看最新更新,還請您按讚或是追蹤唷!

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *