Table
一. 簡單的 Multi-processing pool 範例
使用 Python 標準庫內 multiprocessing 寫一個 multi-processing pool (多處理程序池 / 多進程池),簡單的範例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
from multiprocessing import Process, Pool import os, time def main_map(i): result = i * i return result if __name__ == ‘__main__': inputs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # 設定處理程序數量 pool = Pool(4) # 運行多處理程序 pool_outputs = pool.map(main_map, inputs) # 輸出執行結果 print(pool_outputs) |
1 2 |
# 輸出結果 >>> [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] |
二. 建議使用處理程序 (process) 數量
可使用 multiprocessing.cpu_count() 或 os.cpu_count() 來獲取當前機器的 CPU 核心數量。
假設目前 CPU 是四核,那麼 process 設定如果超過 4 的話,代表有個核會同時運行 2 個以上的任務,而 CPU 之間程序處理會切換造成本進而降低處理效率,所以建議設置 process 時,最好等於當前機器的 CPU 核心數量。
1 2 3 4 5 6 7 |
# 方法一 import multiprocessing cpus = multiprocessing.cpu_count() # 方法二 Import os cpus = os.cpu_count() |
三. 了解 pool 能調用的方法(method)
剛剛使用了 pool.map(main_map, inputs) 的 pool.map 方法來啟動 multi-processing pool,接下來我們探討一下還有什麼方法啟動多處理程序池,以及不同方法之間他們有差異呢?
1 2 3 4 |
print(dir(pool)) # 結果輸出 >>> [ ... 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'] |
而我們這次主要介紹的將會有:
- map
- map_async
- starmap:New in version Python 3.3.
- starmap_async:New in version Python 3.3.
- close
- join
參考文章:python – multiprocessing.Pool: When to use apply, apply_async or map? – Stack Overflow
▍到底有用 _async 是差在哪裡?
首先我們使用 pool.map 來看一下輸出結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from multiprocessing import Process, Pool import os, time def main_map(i): result = I * I print(result) return result if __name__ == '__main__': inputs = [0, 1, 2, 3] pool = Pool(4) pool_outputs = pool.map(main_map, inputs) print(‘將會阻塞並於 pool.map 子程序結束後觸發') |
1 2 3 4 5 6 |
# 輸出結果 >>> 0 >>> 1 >>> 4 >>> 9 >>> 將會阻塞並於 pool.map 子程序結束後觸發 |
接下來我們使用 pool.map_async 來看看輸出結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
from multiprocessing import Process, Pool import os, time def main_map(i): result = i * i print(result) return result if __name__ == ‘__main__’: inputs = [0, 1, 2, 3] pool = Pool(4) pool_outputs = pool.map_async(main_map, inputs) print('將不會阻塞並和 pool.map_async 並行觸發') # close 和 join 是確保主程序結束後,子程序仍然繼續進行 pool.close() pool.join() |
1 2 3 4 5 6 |
# 輸出結果 >>> 將不會阻塞並和 pool.map_async 並行觸發 >>> 0 >>> 1 >>> 4 >>> 9 |
pool.map_async 和 pool.map 的差異在於,主處理程序是否同步並行,而 pool.map 會阻塞主程序,待所有子程序結束後,才會繼續運行主程序。而 pool.map_async 反之,所以最後要寫 close 和 join 來避免主程序結束後,子程序被迫關閉。
關於更多 close() 和 join() 的敘述可以看官方文件的說明如下:
close():
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
join():
Wait for the worker processes to exit. One must call close() or terminate() before using join() .
▍map 和 starmap 到底差在哪裡?
首先我們來看看官方文件對 starmap 的敘述:
Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments. Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
同官方文件所述 map 和 starmap 差別僅在於能否傳入 Multi-args 多參數到同一個 function 內
pool.map 使用方法:
1 |
pool.map(main_map, [0, 1, 2, 3]) |
pool.starmap 使用方法:
1 |
pool.starmap(main_starmap, [(1, 3), (2, 4), (3, 5)]) |
▍Callback?
可以使用 callback 的有:
- map_async
- starmap_async
使用 map_async 寫 callback 範例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
from multiprocessing import Process, Pool import os, time def main_map(i): result = I * I print(“子處理程序 ID: {}, 運送結果: {}".format(os.getpid(), result)) return result def show(get_result): print('Callback: {} PID: {}'.format(get_result, os.getpid())) if __name__ == '__main__': print('主處理程序 ID:', os.getpid()) pool = Pool(4) results = pool.map_async(main_map, [3, 5, 7, 9, 11, 13, 15], callback=show) pool.close() pool.join() |
輸出結果:
1 2 3 4 5 6 7 8 9 |
>>> 主處理程序 ID: 63544 >>> 子處理程序 ID: 63546, 運送結果: 25 >>> 子處理程序 ID: 63547, 運送結果: 49 >>> 子處理程序 ID: 63545, 運送結果: 9 >>> 子處理程序 ID: 63546, 運送結果: 121 >>> 子處理程序 ID: 63546, 運送結果: 169 >>> 子處理程序 ID: 63545, 運送結果: 225 >>> 子處理程序 ID: 63548, 運送結果: 81 >>> Callback: [9, 25, 49, 81, 121, 169, 225] PID: 63544 |
從 print 結果可以看到此次啟動了 4 個子處理程序,最後子處理程序都運送完後,由主程序處理啟動 callback 的函式 show,最後結束主處理程序。
▍取得回傳資料
pool.map_async 和 pool.starmap_async 要取的回傳資料需要使用 .get() 方法
1 2 |
results = pool.map_async(main_map, [3, 5, 7, 9, 11, 13, 15]) print(results.get()) |
1 2 |
results = pool.starmap_async(main_starmap, [(1, 3), (2, 4), (3, 5), (4, 6),(5, 7)]) print(results.get()) |
pool.map 和 pool.starmap 可直接調用即可
1 2 |
results = pool.map(main_map, [3, 5, 7, 9, 11, 13, 15]) print(results) |
1 2 |
results = pool.starmap(main_starmap, [(1, 3), (2, 4), (3, 5)]) print(results) |
▍補充 chunksize & maxtasksperchild 是什麼?
1. maxtaslsperchild:
當完成多少任務時,須重啟子處理程序
使用方法:
1 |
pool = Pool(4, maxtasksperchild=100) |
官方文件說明:maxtasksperchild
maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.
New in version 3.2: maxtasksperchild
2. chunksize:
每次子處理程序可以接收到的任務數量
使用方法:
1 |
pool.map(f, range(20), chunksize=10) |
官方文件說明:chunksize
the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
最後~
▍回顧本篇我們介紹了的內容:
- 簡單的 multi-processing pool 範例
- 建議使用處理程序 (process) 數量
- 了解 pool 能調用的方法(method)
- 到底有用 _async 是差在哪裡?
- map 和 starmap 到底差在哪裡?
- Callback?
- 取得回傳資料
- 補充 chunksize & maxtasksperchild 是什麼?
▍關於與 Concurrency Programming 相關其他文章,可以參考:
- 【Python教學】淺談 Concurrency Programming
- 【Python教學】淺談 GIL & Thread-safe & Atomic
- 【Python教學】淺談 Multi-processing & Multi-threading 使用方法
- 【Python教學】淺談 Multi-processing pool 使用方法
▍關於 Async IO 相關其他文章,可以參考:
- 【Python教學】淺談 Coroutine 協程使用方法
- 【Python教學】Async IO Design Patterns 範例程式
- 【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲
那麼有關於【Python教學】淺談 Multi-processing pool 使用方法 的介紹就到這邊告一個段落囉!有任何問題可以在以下留言~
有關 Max行銷誌的最新文章,都會發佈在 Max 的 Facebook 粉絲專頁,如果想看最新更新,還請您按讚或是追蹤唷!