攜手創作,共同成長!這是我參與「掘金日新計劃 · 8 月更文挑戰」的第4天,點擊查看活動詳情
在python
There is also a lot about parallelism,比如:進程、線程、協程、異步等等,資料也不少.I'm here to talk a little bit about this today.
Both multi-process and multi-threading will be used hereconcurrent.futures
庫來實現,是 3.2 version of the appearing features,But I don't believe anyone still uses less than 3.2 版本的 python 吧,不會吧,不會吧
在concurrent.futures
The thread pool and process pool are very similar in use,It looks like just the referenced classes are different,線程池是:ThreadPoolExecutor
,進程池是ProcessPoolExecutor
.
一般有兩種方式,一個是使用with
Created as a context manager,One is the way to create objects directly.If it is used, there is less logic,幾行代碼就寫完了,那就直接使用with
上下文管理器,It's convenient and quick, and you don't have to close it yourself,If the logic involved is more complicated,There are many places where you can create your own objects and manage them yourself,根據需要自己選擇
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index):
time.sleep(index)
def main():
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pass
# Create and close manually
pool = ThreadPoolExecutor()
pool.shutdown()
if __name__ == '__main__':
main()
復制代碼
再看看參數,Most of the time it's just one that needs attentionmax_workers
,Represents the maximum worker threads/進程的數量,This number can be specified by yourself,There will also be a default value if not specified
submit
The method is a single submission of the task,下面是函數簽名:
def submit(self, fn, /, *args, **kwargs):
pass
復制代碼
fn The parameter is the name of the function to submit,不用加括號,*args, **kwargs
是函數的參數,直接寫進去就行,下面看示例
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, index_2=None):
print(f"索引:{index} {index_2}")
time.sleep(2)
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
for i in numbers:
pool.submit(test, i, index_2=i)
if __name__ == '__main__':
main()
# 運行結果
索引:1 1
索引:2 2
索引:3 3
索引:4 4
索引:5 5
索引:6 6
復制代碼
The above is to submit the tasks one by one,This is a batch submission task,下面是函數簽名:
def map(self, fn, *iterables, timeout=None, chunksize=1):
pass
復制代碼
Watch the first one most of the time、The second parameter will do,The first parameter is the name of the function to call,Also without parentheses,The second is iteratorslist、set、tuple
之類的,But this time, this function can only pass one parameter by default, that is, the iteration is the data,如下:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index):
print(f"索引:{index}")
time.sleep(2)
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pool.map(test, numbers)
if __name__ == '__main__':
main()
復制代碼
But if your function really has several parameters,you have to use map,You can use partial functionsfunctools.partial
Reconstruct a function,看例子就懂了:
import time
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, arg_2):
print(f"索引:{index} {arg_2}")
time.sleep(2)
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pool.map(functools.partial(test, arg_2=2), numbers)
if __name__ == '__main__':
main()
# 結果
索引:1 2
索引:2 2
索引:3 2
索引:4 2
索引:5 2
索引:6 2
復制代碼
The above just submits the task,The task will be executed automatically after submission,先說說map
獲取結果的方式,map 會“Returns a list of results directly”
,Strictly speaking this is not the case,但可以這麼理解,就是這樣:
import time
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, arg_2):
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = pool.map(functools.partial(test, arg_2=2), numbers)
for result in results:
print(f"結果:{result}")
if __name__ == '__main__':
main()
# 結果
索引:1 2
索引:2 2
索引:3 2
索引:4 2
索引:5 2
索引:6 2
結果:1
結果:2
結果:3
結果:4
結果:5
結果:6
復制代碼
下面在看看 submit 獲取結果的方法,Use the above slightly differently,稍微麻煩一點點,需要用到as_completed
來等待完成,Just look at the example to understand:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def test(index, arg_2):
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = [pool.submit(test, index, 2) for index in numbers]
for result in as_completed(results):
print(f"結果:{result.result()}")
if __name__ == '__main__':
main()
復制代碼
It needs to be used when getting the result.result()
方法
If an exception occurs in a function in the thread pool or process pool,The program is not interrupted,will not be captured directly,比如下面:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def test(index, arg_2):
raise ValueError("報錯了")
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pool.map(functools.partial(test, arg_2=2), numbers)
for index in numbers:
pool.submit(test, index, 2)
if __name__ == '__main__':
main()
復制代碼
You will find that the program outputs nothing and is not aborted,But if you try to run the program by capturing the return value,It is found that the program has thrown an exception and was aborted,So catching exceptions is to catch them at this time: map 捕獲異常
import time
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, arg_2):
raise ValueError("報錯了")
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = pool.map(functools.partial(test, arg_2=2), numbers)
try:
for result in results:
print(f"結果:{result}")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
復制代碼
submit 捕獲異常
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def test(index, arg_2):
raise ValueError("報錯了")
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = [pool.submit(test, index, 2) for index in numbers]
for result in as_completed(results):
try:
print(f"結果:{result.result()}")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
復制代碼
It can be seen that the difference is,map Catching exceptions can only be done outside the loop,但 submit Both inside and out.
如題,That's it for basic usage,Of course, there is still a lot of knowledge in this library,I'm just taking everyone through the door,For more in-depth information, you can go to the relevant documentation to understand:
This section is here first,The next section discusses the rest.