程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

通過 multiprocessing 實現 python 多進程

編輯:Python

1. 引言

此前一系列文章中,我們介紹了 Python 的threading 包中的一系列工具。 python 的線程Python 線程同步(一) — 競爭條件與線程鎖python 線程同步(二) — 條件對象python 線程同步(三) — 信號量python 線程同步(四) — 事件對象與柵欄

threading 包為 Python 提供了線程模型,而 multiprocessing 包則為另一種並發模型 — 多進程模型提供了強大的解決方案。 multiprocessing 與 threading 十分相似,他提供了基本的進程對象類以及功能強大的進程同步工具,同時,multiprocessing 還提供了進程池的封裝類 Pool。

2. 多進程 vs 多線程

此前我們介紹了 Python 中的 GIL 鎖,受此影響,Python 每一個時刻只能調度一個線程,這意味著並發並沒有真的在進行。 而多進程則不同,多進程並發的模式中,由於進程間嚴格的隔離,他們得以真正的並行執行。 同時,Python 多進程讓多核 CPU 得以被利用。

但相比多線程機制,多進程的模式也存在一些缺點和不足:

  1. 進程切換更為耗時
  2. 進程間通信相比線程間共享的數據更為復雜

3. multiprocessing 提供的方法

multiprocessing 提供的方法

方法

描述

active_children

返回當前進程存活的子進程的列表

cpu_count

返回系統的 CPU 數量,但並不是當前進程可用的數量,len(os.sched_getaffinity(0)) 方法獲取的是當前進程可用的數量

current_process

獲取當前進程的 Process 對象

get_all_start_methods

返回支持的啟動方法的列表,該列表的首項即為默認選項,包括我們後面即將要介紹的 ’fork’, ’spawn’ 和 ’forkserver’

get_context

返回進程上下文 Context 對象

get_start_method

獲取當前啟動進程的啟動方法,’fork’ , ’spawn’ , ’forkserver’ 或者 None(如果沒有設置)

set_executable

設置在啟動子進程時使用的 Python 解釋器路徑,例如:set_executable(os.path.join(sys.exec_prefix, ’pythonw.exe’))

set_start_method

設置啟動子進程的方法 ’fork’ , ’spawn’ 或者 ’forkserver’

4. Process 類與子進程創建

你會發現 Process 類與 Thread 類十分相似,他們都通過 start 方法啟動並開始執行 run 方法的內容,同時,join 用來阻塞等待某個進程完成執行。 但是不同的是,這些方法只能由被調用進程的父進程來調用。

4.1. Process 類成員

4.1.1. 類成員屬性

  • name — 進程名
  • daemon — 布爾值,是否是守護進程
  • pid — 進程 id
  • exitcode — 進程退出時的退出碼,如果被信號終止,則返回信號值的相反數,進程未退出前該值為 None
  • authkey — 進程身份秘鑰,字節字符串,當 multiprocessing 初始化時,主進程使用 os.urandom() 分配一個隨機字符串,創建 Process 對象時,子進程繼承父進程的身份秘鑰
  • sentinel — 系統對象的數字句柄,在 UNIX 上是一個文件描述符

4.1.2. 類成員方法

  • run — 進程的具體活動
  • start — 啟動進程活動
  • join — 等待進程執行結束或超時
  • is_alive — 判斷進程是否存活
  • terminate — 終止進程,在 UNIX 環境中,通過給進程發送 SIGTERM 信號實現,在 Windows 環境中,通過 TerminateProcess 方法實現,被終止進程的子進程將不會被一起終止

需要注意的是,正如我們上面所說,start() 、 join() 、 is_alive() 、 terminate() 和 exitcode 方法只能由創建進程對象的進程調用。

4.2. 示例 — 通過 Process 類創建進程

4.2.1. 通過繼承 Process 類實現子進程創建

import logging
from multiprocessing import Process
from time import sleep, ctime
class myProcess(Process):
def __init__(self, nsec):
super().__init__()
self.nsec = nsec
def run(self):
logging.info('start_sleep[%s]' % self.nsec)
sleep(self.nsec + 1)
logging.info('end_sleep[%s]' % self.nsec)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
logging.info('start at %s'% ctime())
processes = list()
for i in range(5):
t = myProcess(i)
processes.append(t)
for process in processes:
process.start()
for process in processes:
process.join()
logging.info('end at %s' % ctime())

打印出了:

2019-05-22 10:02:24,705 - INFO: start at Wed May 22 10:02:24 2019 2019-05-22 10:02:24,739 - INFO: start_sleep[2] 2019-05-22 10:02:24,740 - INFO: start_sleep[1] 2019-05-22 10:02:24,742 - INFO: start_sleep[4] 2019-05-22 10:02:24,743 - INFO: start_sleep[0] 2019-05-22 10:02:24,743 - INFO: start_sleep[3] 2019-05-22 10:02:25,745 - INFO: end_sleep[0] 2019-05-22 10:02:26,746 - INFO: end_sleep[1] 2019-05-22 10:02:27,747 - INFO: end_sleep[2] 2019-05-22 10:02:28,748 - INFO: end_sleep[3] 2019-05-22 10:02:29,749 - INFO: end_sleep[4] 2019-05-22 10:02:29,750 - INFO: end at Wed May 22 10:02:29 2019

4.3. 通過創建 Process 對象實現子進程創建

import logging
from multiprocessing import Process
from time import sleep, ctime
def sleep_func(pindex):
logging.info('start_sleep[%s]' % pindex)
sleep(pindex + 1)
logging.info('end_sleep[%s]' % pindex)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
logging.info('start at %s' % ctime())
processes = list()
for i in range(5):
process = Process(target=sleep_func, args=[i])
processes.append(process)
for process in processes:
process.start()
for process in processes:
process.join()
logging.info('end at %s' % ctime())

打印出了:

2019-05-22 10:07:30,340 - INFO: start at Wed May 22 10:07:30 2019 2019-05-22 10:07:30,346 - INFO: start_sleep[2] 2019-05-22 10:07:30,348 - INFO: start_sleep[3] 2019-05-22 10:07:30,350 - INFO: start_sleep[0] 2019-05-22 10:07:30,351 - INFO: start_sleep[4] 2019-05-22 10:07:30,349 - INFO: start_sleep[1] 2019-05-22 10:07:31,353 - INFO: end_sleep[0] 2019-05-22 10:07:32,353 - INFO: end_sleep[1] 2019-05-22 10:07:33,354 - INFO: end_sleep[2] 2019-05-22 10:07:34,353 - INFO: end_sleep[3] 2019-05-22 10:07:35,353 - INFO: end_sleep[4] 2019-05-22 10:07:35,354 - INFO: end at Wed May 22 10:07:35 2019

可以看到,multiprocessing 的用法與 threading 中的用法簡直是一模一樣。

5. 進程的啟動方法

根據不同的平台,multiprocessing 有三種啟動進程的方法:

  1. spawn — 父進程啟動一個新的Python解釋器進程。子進程只會繼承那些運行進程對象的 run() 方法所需的資源,父進程中非必須的文件描述符和句柄則不會被繼承,與另兩種方法相比,這個方法啟動進程非常慢,是 windows 上的默認設置,也可用在 Unix 中
  2. fork — 通過 os.fork() 方法創建子進程,子進程在開始時與父進程完全相同,會繼承父進程中的所有資源,只能用於 Unix,是 Unix 系統中的默認方式
  3. forkserver — 啟動服務器進程,並從此刻開始,每當需要一個新進程時,父進程就會連接到服務器並請求它分叉一個新進程,從而避免父進程中的資源被繼承,只能用於 Unix 環境中

通過 multiprocessing.set_start_method 方法,可以設置不同的啟動方法:

import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()

5.1. 注意

需要注意的是,在程序中 set_start_method() 不應該被多次調用,不同上下文啟動的進程可能是不兼容的,比如使用 fork 上下文創建的鎖不能傳遞給使用 spawn 或 forkserver 啟動方法啟動的進程。

6. 通過多進程處理 CPU 密集型運算

下面我們來對比一下多進程、多線程運行 CPU 密集型任務的耗時情況:

import time
from multiprocessing import Process
from threading import Thread
def count(x, y):
# 使程序完成50萬計算
c = 0
while c < 500000:
c += 1
x += x
y += y
if __name__ == '__main__':
t = time.time()
for x in range(10):
count(1, 1)
print("Line", time.time() - t)
counts = []
t = time.time()
for x in range(10):
thread = Thread(target=count, args=(1, 1))
counts.append(thread)
thread.start()
for thread in counts:
thread.join()
print("Threading", time.time() - t)
counts = []
t = time.time()
for x in range(10):
process = Process(target=count, args=(1, 1))
counts.append(process)
process.start()
for process in counts:
process.join()
print("Multiprocess", time.time() - t)

程序非常簡單,我們分別進行 50 萬次計算,得到結果如下:

Line 69.52206325531006 Threading 55.799378633499146 Multiprocess 44.240989685058594

多進程運行確實有著性能優勢,但也沒有我們想象中那麼大。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved