程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python 多進程

編輯:Python

本文介紹Python多進程模塊。

概述

Python中的多進程是通過multiprocessing包來實現的,和多線程的threading.Thread差不多,它可以利用multiprocessing.Process對象來創建一個進程對象。這個進程對象的方法和線程對象的方法差不多也有start(), run(), join()等方法,其中有一個方法不同Thread線程對象中的守護線程方法是setDeamon,而Process進程對象的守護進程是通過設置deamon屬性來完成的。

Python多進程實現

方法一

from multiprocessing import Process
def fun1(name):
print('測試%s多進程' %name)
if __name__ == '__main__':
process_list = []
for i in range(5): #開啟5個子進程執行fun1函數
p = Process(target=fun1,args=('Python',)) #實例化進程對象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('結束測試')

結果

測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
結束測試
Process finished with exit code 0

上面的代碼開啟了5個子進程去執行函數,我們可以觀察結果,是同時打印的,這裡實現了真正的並行操作,就是多個CPU同時執行任務。我們知道進程是python中最小的資源分配單元,也就是進程中間的數據,內存是不共享的,每啟動一個進程,都要獨立分配資源和拷貝訪問的數據,所以進程的啟動和銷毀的代價是比較大了,所以在實際中使用多進程,要根據服務器的配置來設定。

方法二

還記得python多線程的第二種實現方法嗎?是通過類繼承的方法來實現的,python多進程的第二種實現方式也是一樣的

from multiprocessing import Process
class MyProcess(Process): #繼承Process類
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name
def run(self):
print('測試%s多進程' % self.name)
if __name__ == '__main__':
process_list = []
for i in range(5): #開啟5個子進程執行fun1函數
p = MyProcess('Python') #實例化進程對象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('結束測試')

結果

測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
結束測試
Process finished with exit code 0

效果和第一種方式一樣。

我們可以看到Python多進程的實現方式和多線程的實現方式幾乎一樣。

Process類的其他方法

構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組
target: 要執行的方法
name: 進程名
args/kwargs: 要傳入方法的參數
實例方法:
is_alive():返回進程是否在運行,bool類型。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程准備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():不管任務是否完成,立即停止工作進程
屬性:
daemon:和線程的setDeamon功能一樣
name:進程名字
pid:進程號

關於join,daemon的使用和python多線程一樣,這裡就不在復述了,大家可以看看以前的python多線程系列文章。

Python多線程的通信

進程是系統獨立調度核分配系統資源(CPU、內存)的基本單位,進程之間是相互獨立的,每啟動一個新的進程相當於把數據進行了一次克隆,子進程裡的數據修改無法影響到主進程中的數據,不同子進程之間的數據也不能共享,這是多進程在使用中與多線程最明顯的區別。但是難道Python多進程中間難道就是孤立的嗎?當然不是,python也提供了多種方法實現了多進程中間的通信和數據共享(可以修改一份數據)

進程隊列Queue

Queue在多線程中也說到過,在生成者消費者模式中使用,是線程安全的,是生產者和消費者中間的數據管道,那在python多進程中,它其實就是進程之間的數據管道,實現進程通信。

from multiprocessing import Process,Queue
def fun1(q,i):
print('子進程%s 開始put數據' %i)
q.put('我是%s 通過Queue通信' %i)
if __name__ == '__main__':
q = Queue()
process_list = []
for i in range(3):
p = Process(target=fun1,args=(q,i,)) #注意args裡面要把q對象傳給我們要執行的方法,這樣子進程才能和主進程用Queue來通信
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('主進程獲取Queue數據')
print(q.get())
print(q.get())
print(q.get())
print('結束測試')

結果

子進程0 開始put數據
子進程1 開始put數據
子進程2 開始put數據
主進程獲取Queue數據
我是0 通過Queue通信
我是1 通過Queue通信
我是2 通過Queue通信
結束測試
Process finished with exit code 0

上面的代碼結果可以看到我們主進程中可以通過Queue獲取子進程中put的數據,實現進程間的通信。

管道Pipe

管道Pipe和Queue的作用大致差不多,也是實現進程間的通信,下面之間看怎麼使用吧

from multiprocessing import Process, Pipe
def fun1(conn):
print('子進程發送消息:')
conn.send('你好主進程')
print('子進程接受消息:')
print(conn.recv())
conn.close()
if __name__ == '__main__':
conn1, conn2 = Pipe() #關鍵點,pipe實例化生成一個雙向管
p = Process(target=fun1, args=(conn2,)) #conn2傳給子進程
p.start()
print('主進程接受消息:')
print(conn1.recv())
print('主進程發送消息:')
conn1.send("你好子進程")
p.join()
print('結束測試')

結果

主進程接受消息:
子進程發送消息:
子進程接受消息:
你好主進程
主進程發送消息:
你好子進程
結束測試
Process finished with exit code 0

上面可以看到主進程和子進程可以相互發送消息

Managers

Queue和Pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據。那麼久要用到Managers

from multiprocessing import Process, Manager
def fun1(dic,lis,index):
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)
if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的聲明方式,不能直接通過{}來定義
l = manager.list(range(5))#[0,1,2,3,4]
process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i))
p.start()
process_list.append(p)
for res in process_list:
res.join()
print(dic)
print(l)

結果:

{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]

可以看到主進程定義了一個字典和一個列表,在子進程中,可以添加和修改字典的內容,在列表中插入新的數據,實現進程間的數據共享,即可以共同修改同一份數據

進程鎖

在多進程程序運行時,可能會有部分內存或硬盤可以同時被多個進程操作,如果不小心管理很可能導致沖突。此時可以用進程鎖有效解決問題。

from multiprocessing import Process, Manager
def fun1(dic,lis,index, lock):
lock.acquire()
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
lock.release()
#print(l)
if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的聲明方式,不能直接通過{}來定義
l = manager.list(range(5))#[0,1,2,3,4]
lock = manager.Lock()
process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i,lock))
p.start()
process_list.append(p)
for res in process_list:
res.join()
print(dic)
print(l)

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程為止。就是固定有幾個進程可以使用。

常規進程池

進程池中有兩個方法:

apply:同步,一般不使用

apply_async:異步

from multiprocessing import Process,Pool
import os, time, random
def fun1(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
pool = Pool(5) #創建一個5個進程的進程池
for i in range(10):
pool.apply_async(func=fun1, args=(i,))
pool.close()
pool.join()
print('結束測試')

結果

Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
結束測試

Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。

進程池map方法

import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if \'jpeg\' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == \'__main__\':
folder = os.path.abspath(
\'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images) #關鍵點,images是一個可迭代對象
pool.close()
pool.join()

上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件,一一生成縮略圖,並將這些縮略圖保存到特定文件夾中。 map 函數並不支持手動線程管理,反而使得相關的 debug 工作也變得異常簡單。

多進程啟動方式

multiprocessing 支持三種方式 啟動進程

  • 在python中可以顯示設置啟動方式
import multiprocessing as mp
mp = mp.get_context('spawn')

spawn

父進程啟動一個新的python解釋器進程.子進程只繼承運行 Run()方法所需的資源。 來自父進程的不必要的文件描述符和句柄將不會被繼承,運行速度比較慢

適用系統:unix and windows

fork

父進程使用os.fork()方法對Python解釋器進行fork。子進程開始時實際上與父進程相同。 父進程的所有資源都由子進程繼承。請注意,安全的fork多線程的進程是有問題的。

適用系統:unix

forkserver

當程序啟動並選擇forkserver的啟動方法時,將啟動服務器進程。 從那時起,每當需要一個新進程時,父進程就會連接到服務器,並請求它fork一個新進程。 fork server進程是單線程的,所以使用os.fork()是安全的。沒有任何不必要的資源被繼承

適用系統:unix

參考資料

  • https://zhuanlan.zhihu.com/p/64702600
  • https://blog.csdn.net/zy13270867781/article/details/82499108

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved