本文介紹Python多進程模塊.
Python中的多進程是通過multiprocessing包來實現的,和多線程的threading.Thread差不多,它可以利用multiprocessing.Process對象來創建一個進程對象.這個進程對象的方法和線程對象的方法差不多也有start(), run(), join()等方法,其中有一個方法不同Thread線程對象中的守護線程方法是setDeamon,而Process進程對象的守護進程是通過設置deamon屬性來完成的.
from multiprocessing import Process
def fun1(name):
print('測試%s多進程' %name)
if __name__ == '__main__':
process_list = []
for i in range(5): #開啟5個子進程執行fun1函數
p = Process(target=fun1,args=('Python',)) #實例化進程對象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('結束測試')
結果
測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
結束測試
Process finished with exit code 0
上面的代碼開啟了5個子進程去執行函數,我們可以觀察結果,是同時打印的,這裡實現了真正的並行操作,就是多個CPU同時執行任務.我們知道進程是python中最小的資源分配單元,也就是進程中間的數據,內存是不共享的,每啟動一個進程,都要獨立分配資源和拷貝訪問的數據,所以進程的啟動和銷毀的代價是比較大了,所以在實際中使用多進程,要根據服務器的配置來設定.
還記得python多線程的第二種實現方法嗎?是通過類繼承的方法來實現的,python多進程的第二種實現方式也是一樣的
from multiprocessing import Process
class MyProcess(Process): #繼承Process類
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name
def run(self):
print('測試%s多進程' % self.name)
if __name__ == '__main__':
process_list = []
for i in range(5): #開啟5個子進程執行fun1函數
p = MyProcess('Python') #實例化進程對象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('結束測試')
結果
測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
測試Python多進程
結束測試
Process finished with exit code 0
效果和第一種方式一樣.
我們可以看到Python多進程的實現方式和多線程的實現方式幾乎一樣.
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組
target: 要執行的方法
name: 進程名
args/kwargs: 要傳入方法的參數
實例方法:
is_alive():返回進程是否在運行,bool類型.
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數).
start():進程准備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法.
terminate():不管任務是否完成,立即停止工作進程
屬性:
daemon:和線程的setDeamon功能一樣
name:進程名字
pid:進程號
關於join,daemon的使用和python多線程一樣,這裡就不在復述了,大家可以看看以前的python多線程系列文章.
進程是系統獨立調度核分配系統資源(CPU、內存)的基本單位,進程之間是相互獨立的,每啟動一個新的進程相當於把數據進行了一次克隆,子進程裡的數據修改無法影響到主進程中的數據,不同子進程之間的數據也不能共享,這是多進程在使用中與多線程最明顯的區別.但是難道Python多進程中間難道就是孤立的嗎?當然不是,python也提供了多種方法實現了多進程中間的通信和數據共享(可以修改一份數據)
Queue在多線程中也說到過,在生成者消費者模式中使用,是線程安全的,是生產者和消費者中間的數據管道,那在python多進程中,它其實就是進程之間的數據管道,實現進程通信.
from multiprocessing import Process,Queue
def fun1(q,i):
print('子進程%s 開始put數據' %i)
q.put('我是%s 通過Queue通信' %i)
if __name__ == '__main__':
q = Queue()
process_list = []
for i in range(3):
p = Process(target=fun1,args=(q,i,)) #注意args裡面要把q對象傳給我們要執行的方法,這樣子進程才能和主進程用Queue來通信
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('主進程獲取Queue數據')
print(q.get())
print(q.get())
print(q.get())
print('結束測試')
結果
子進程0 開始put數據
子進程1 開始put數據
子進程2 開始put數據
主進程獲取Queue數據
我是0 通過Queue通信
我是1 通過Queue通信
我是2 通過Queue通信
結束測試
Process finished with exit code 0
上面的代碼結果可以看到我們主進程中可以通過Queue獲取子進程中put的數據,實現進程間的通信.
管道Pipe和Queue的作用大致差不多,也是實現進程間的通信,下面之間看怎麼使用吧
from multiprocessing import Process, Pipe
def fun1(conn):
print('子進程發送消息:')
conn.send('你好主進程')
print('子進程接受消息:')
print(conn.recv())
conn.close()
if __name__ == '__main__':
conn1, conn2 = Pipe() #關鍵點,pipe實例化生成一個雙向管
p = Process(target=fun1, args=(conn2,)) #conn2傳給子進程
p.start()
print('主進程接受消息:')
print(conn1.recv())
print('主進程發送消息:')
conn1.send("你好子進程")
p.join()
print('結束測試')
結果
主進程接受消息:
子進程發送消息:
子進程接受消息:
你好主進程
主進程發送消息:
你好子進程
結束測試
Process finished with exit code 0
上面可以看到主進程和子進程可以相互發送消息
Queue和Pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據.那麼久要用到Managers
from multiprocessing import Process, Manager
def fun1(dic,lis,index):
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)
if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的聲明方式,不能直接通過{}來定義
l = manager.list(range(5))#[0,1,2,3,4]
process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i))
p.start()
process_list.append(p)
for res in process_list:
res.join()
print(dic)
print(l)
結果:
{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
可以看到主進程定義了一個字典和一個列表,在子進程中,可以添加和修改字典的內容,在列表中插入新的數據,實現進程間的數據共享,即可以共同修改同一份數據
When a multi-process program is running,There may be some memory or hard disk that can be operated by multiple processes at the same time,It can lead to conflicts if not managed carefully.At this point, the process lock can be used to effectively solve the problem.
from multiprocessing import Process, Manager
def fun1(dic,lis,index, lock):
lock.acquire()
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
lock.release()
#print(l)
if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的聲明方式,不能直接通過{}來定義
l = manager.list(range(5))#[0,1,2,3,4]
lock = manager.Lock()
process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i,lock))
p.start()
process_list.append(p)
for res in process_list:
res.join()
print(dic)
print(l)
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程為止.就是固定有幾個進程可以使用.
進程池中有兩個方法:
apply:同步,一般不使用
apply_async:異步
from multiprocessing import Process,Pool
import os, time, random
def fun1(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
pool = Pool(5) #創建一個5個進程的進程池
for i in range(10):
pool.apply_async(func=fun1, args=(i,))
pool.close()
pool.join()
print('結束測試')
結果
Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
結束測試
對Pool
對象調用join()
方法會等待所有子進程執行完畢,調用join()
之前必須先調用close()
,調用close()
之後就不能繼續添加新的Process
了.
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if \'jpeg\' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == \'__main__\':
folder = os.path.abspath(
\'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images) #關鍵點,images是一個可迭代對象
pool.close()
pool.join()
上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件,一一生成縮略圖,並將這些縮略圖保存到特定文件夾中. map 函數並不支持手動線程管理,反而使得相關的 debug 工作也變得異常簡單.
multiprocessing 支持三種方式 啟動進程
import multiprocessing as mp
mp = mp.get_context('spawn')
父進程啟動一個新的python解釋器進程.Child processes only inherit running Run()The resources required by the method. Unnecessary file descriptors and handles from the parent process will not be inherited,運行速度比較慢
適用系統:unix and windows
父進程使用os.fork()方法對Python解釋器進行fork.The child process starts out effectively the same as the parent process. 父進程的所有資源都由子進程繼承.請注意,安全的forkMultithreaded processes are problematic.
適用系統:unix
When the program starts and selectsforkserverthe startup method,將啟動服務器進程. 從那時起,每當需要一個新進程時,The parent process will connect to the server,並請求它fork一個新進程. fork server進程是單線程的,所以使用os.fork()是安全的.No unnecessary resources are inherited
適用系統:unix