02 Python 基礎教學03 Python 爬蟲教學10 所有文章

【Python教學】淺談 Multi-processing pool 使用方法

mutli_processing_pool

一. 簡單的 Multi-processing pool 範例

使用 Python 標準庫內 multiprocessing 寫一個 multi-processing pool (多處理程序池 / 多進程池),簡單的範例如下:

from multiprocessing import Process, Pool
import os, time


def main_map(i):
    result = i * i
    return result


if __name__ == ‘__main__':
    inputs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

      # 設定處理程序數量
    pool = Pool(4)

      # 運行多處理程序
    pool_outputs = pool.map(main_map, inputs)

      # 輸出執行結果
    print(pool_outputs)
# 輸出結果
>>> [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

二. 建議使用處理程序 (process) 數量

可使用 multiprocessing.cpu_count() 或 os.cpu_count() 來獲取當前機器的 CPU 核心數量。
假設目前 CPU 是四核,那麼 process 設定如果超過 4 的話,代表有個核會同時運行 2 個以上的任務,而 CPU 之間程序處理會切換造成本進而降低處理效率,所以建議設置 process 時,最好等於當前機器的 CPU 核心數量。

# 方法一
import multiprocessing
cpus = multiprocessing.cpu_count()

# 方法二
Import os
cpus = os.cpu_count() 

三. 了解 pool 能調用的方法(method)

剛剛使用了 pool.map(main_map, inputs) 的 pool.map 方法來啟動 multi-processing pool,接下來我們探討一下還有什麼方法啟動多處理程序池,以及不同方法之間他們有差異呢?

print(dir(pool))

# 結果輸出
>>> [ ... 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']

而我們這次主要介紹的將會有:

  • map
  • map_async
  • starmap:New in version Python 3.3.
  • starmap_async:New in version Python 3.3.
  • close
  • join

參考文章:python – multiprocessing.Pool: When to use apply, apply_async or map? – Stack Overflow

▍到底有用 _async 是差在哪裡?

首先我們使用 pool.map 來看一下輸出結果:

from multiprocessing import Process, Pool
import os, time


def main_map(i):
    result = I * I
    print(result)
    return result


if __name__ == '__main__':

    inputs = [0, 1, 2, 3]
    pool = Pool(4)

    pool_outputs = pool.map(main_map, inputs)
    print(‘將會阻塞並於 pool.map 子程序結束後觸發')
# 輸出結果
>>> 0
>>> 1
>>> 4
>>> 9
>>> 將會阻塞並於 pool.map 子程序結束後觸發

接下來我們使用 pool.map_async 來看看輸出結果:

from multiprocessing import Process, Pool
import os, time


def main_map(i):
    result = i * i
    print(result)
    return result


if __name__ == ‘__main__’:

    inputs = [0, 1, 2, 3]
    pool = Pool(4)

    pool_outputs = pool.map_async(main_map, inputs)
    print('將不會阻塞並和 pool.map_async 並行觸發')

    # close 和 join 是確保主程序結束後,子程序仍然繼續進行
    pool.close()
    pool.join()
# 輸出結果
>>> 將不會阻塞並和 pool.map_async 並行觸發
>>> 0
>>> 1
>>> 4
>>> 9

pool.map_async 和 pool.map 的差異在於,主處理程序是否同步並行,而 pool.map 會阻塞主程序,待所有子程序結束後,才會繼續運行主程序。而 pool.map_async 反之,所以最後要寫 close 和 join 來避免主程序結束後,子程序被迫關閉。

關於更多 close() 和 join() 的敘述可以看官方文件的說明如下:

close():

Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

join():

Wait for the worker processes to exit. One must call close() or terminate() before using join() .

▍map 和 starmap 到底差在哪裡?

首先我們來看看官方文件對 starmap 的敘述

Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments. Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].

同官方文件所述 map 和 starmap 差別僅在於能否傳入 Multi-args 多參數到同一個 function 內

pool.map 使用方法:

pool.map(main_map, [0, 1, 2, 3])

pool.starmap 使用方法:

pool.starmap(main_starmap, [(1, 3), (2, 4), (3, 5)])

▍Callback?

可以使用 callback 的有:

  • map_async
  • starmap_async

使用 map_async 寫 callback 範例:

from multiprocessing import Process, Pool
import os, time


def main_map(i):
    result = I * I
    print(“子處理程序 ID: {}, 運送結果: {}".format(os.getpid(), result))
    return result


def show(get_result):
    print('Callback: {} PID: {}'.format(get_result, os.getpid()))


if __name__ == '__main__':

    print('主處理程序 ID:', os.getpid())
    pool = Pool(4)

    results = pool.map_async(main_map, [3, 5, 7, 9, 11, 13, 15], callback=show)
    pool.close()
    pool.join()

輸出結果:

>>> 主處理程序 ID: 63544
>>> 子處理程序 ID: 63546, 運送結果: 25
>>> 子處理程序 ID: 63547, 運送結果: 49
>>> 子處理程序 ID: 63545, 運送結果: 9
>>> 子處理程序 ID: 63546, 運送結果: 121
>>> 子處理程序 ID: 63546, 運送結果: 169
>>> 子處理程序 ID: 63545, 運送結果: 225
>>> 子處理程序 ID: 63548, 運送結果: 81
>>> Callback: [9, 25, 49, 81, 121, 169, 225] PID: 63544

從 print 結果可以看到此次啟動了 4 個子處理程序,最後子處理程序都運送完後,由主程序處理啟動 callback 的函式 show,最後結束主處理程序。

▍取得回傳資料

pool.map_async 和 pool.starmap_async 要取的回傳資料需要使用 .get() 方法

results = pool.map_async(main_map, [3, 5, 7, 9, 11, 13, 15])
print(results.get())
results = pool.starmap_async(main_starmap, [(1, 3), (2, 4), (3, 5), (4, 6),(5, 7)])
print(results.get())

pool.map 和 pool.starmap 可直接調用即可

results = pool.map(main_map, [3, 5, 7, 9, 11, 13, 15])
print(results)
results = pool.starmap(main_starmap, [(1, 3), (2, 4), (3, 5)])
print(results)

▍補充 chunksize & maxtasksperchild 是什麼?

1. maxtaslsperchild:

當完成多少任務時,須重啟子處理程序
使用方法:

pool = Pool(4, maxtasksperchild=100)

官方文件說明:maxtasksperchild

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.
New in version 3.2: maxtasksperchild

2. chunksize:

每次子處理程序可以接收到的任務數量
使用方法:

pool.map(f, range(20), chunksize=10)

官方文件說明:chunksize

the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

最後~

▍回顧本篇我們介紹了的內容:

  • 簡單的 multi-processing pool 範例
  • 建議使用處理程序 (process) 數量
  • 了解 pool 能調用的方法(method)
    • 到底有用 _async 是差在哪裡?
    • map 和 starmap 到底差在哪裡?
    • Callback?
    • 取得回傳資料
    • 補充 chunksize & maxtasksperchild 是什麼?

▍關於與 Concurrency Programming 相關其他文章,可以參考:

▍關於 Async IO 相關其他文章,可以參考:

那麼有關於【Python教學】淺談 Multi-processing pool 使用方法 的介紹就到這邊告一個段落囉!有任何問題可以在以下留言~

有關 Max行銷誌的最新文章,都會發佈在 Max 的 Facebook 粉絲專頁,如果想看最新更新,還請您按讚或是追蹤唷!

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *