新聞中心
concurrent.futures —- 啟動(dòng)并行任務(wù)
3.2 新版功能.

成都創(chuàng)新互聯(lián)公司主營(yíng)阿里地區(qū)網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,App定制開發(fā),阿里地區(qū)h5成都微信小程序搭建,阿里地區(qū)網(wǎng)站營(yíng)銷推廣歡迎阿里地區(qū)等地區(qū)企業(yè)咨詢
源碼: Lib/concurrent/futures/thread.py 和 Lib/concurrent/futures/process.py
concurrent.futures 模塊提供異步執(zhí)行可調(diào)用對(duì)象高層接口。
異步執(zhí)行可以由 ThreadPoolExecutor 使用線程或由 ProcessPoolExecutor 使用單獨(dú)的進(jìn)程來(lái)實(shí)現(xiàn)。 兩者都是實(shí)現(xiàn)抽像類 Executor 定義的接口。
Availability: not Emscripten, not WASI.
This module does not work or is not available on WebAssembly platforms wasm32-emscripten and wasm32-wasi. See WebAssembly platforms for more information.
Executor 對(duì)象
class concurrent.futures.Executor
抽象類提供異步執(zhí)行調(diào)用方法。要通過(guò)它的子類調(diào)用,而不是直接調(diào)用。
submit(fn, /, \args, **kwargs*)
調(diào)度可調(diào)用對(duì)象 fn,以
fn(*args, **kwargs)方式執(zhí)行并返回一個(gè)代表該可調(diào)用對(duì)象的執(zhí)行的 Future 對(duì)象。
with ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 323, 1235)print(future.result())map(func, \iterables, timeout=None, chunksize=1*)
類似于 map(func, *iterables) 函數(shù),除了以下兩點(diǎn):
iterables 是立即執(zhí)行而不是延遲執(zhí)行的;
func 是異步執(zhí)行的,對(duì) func 的多個(gè)調(diào)用可以并發(fā)執(zhí)行。
The returned iterator raises a [TimeoutError]($e69e6dad35dc3610.md#TimeoutError "TimeoutError") if [\_\_next\_\_()]($c2b73e6dbd6fbe32.md#iterator.__next__ "iterator.__next__") is called and the result isn't available after *timeout* seconds from the original call to [Executor.map()](#concurrent.futures.Executor.map "concurrent.futures.Executor.map"). *timeout* can be an int or a float. If *timeout* is not specified or `None`, there is no limit to the wait time.如果 *func* 調(diào)用引發(fā)一個(gè)異常,當(dāng)從迭代器中取回它的值時(shí)這個(gè)異常將被引發(fā)。使用 [ProcessPoolExecutor](#concurrent.futures.ProcessPoolExecutor "concurrent.futures.ProcessPoolExecutor") 時(shí),這個(gè)方法會(huì)將 *iterables* 分割任務(wù)塊并作為獨(dú)立的任務(wù)并提交到執(zhí)行池中。這些塊的大概數(shù)量可以由 *chunksize* 指定正整數(shù)設(shè)置。 對(duì)很長(zhǎng)的迭代器來(lái)說(shuō),使用大的 *chunksize* 值比默認(rèn)值 1 能顯著地提高性能。 *chunksize* 對(duì) [ThreadPoolExecutor](#concurrent.futures.ThreadPoolExecutor "concurrent.futures.ThreadPoolExecutor") 沒有效果。在 3.5 版更改: 加入 *chunksize* 參數(shù)。
shutdown(wait=True, **, cancel_futures=False*)
當(dāng)待執(zhí)行的 future 對(duì)象完成執(zhí)行后向執(zhí)行者發(fā)送信號(hào),它就會(huì)釋放正在使用的任何資源。 在關(guān)閉后調(diào)用 Executor.submit() 和 Executor.map() 將會(huì)引發(fā) RuntimeError。
如果 wait 為
True則此方法只有在所有待執(zhí)行的 future 對(duì)象完成執(zhí)行且釋放已分配的資源后才會(huì)返回。 如果 wait 為False,方法立即返回,所有待執(zhí)行的 future 對(duì)象完成執(zhí)行后會(huì)釋放已分配的資源。 不管 wait 的值是什么,整個(gè) python 程序?qū)⒌鹊剿写龍?zhí)行的 future 對(duì)象完成執(zhí)行后才退出。如果 cancel_futures 為
True,此方法將取消所有執(zhí)行器還未開始運(yùn)行的掛起的 Future。 任何已完成或正在運(yùn)行的 Future 將不會(huì)被取消,無(wú)論 cancel_futures 的值是什么?如果 cancel_futures 和 wait 均為
True,則執(zhí)行器已開始運(yùn)行的所有 Future 將在此方法返回之前完成。 其余的 Future 會(huì)被取消。如果使用 with 語(yǔ)句,你就可以避免顯式調(diào)用這個(gè)方法,它將會(huì)停止 Executor (就好像 Executor.shutdown() 調(diào)用時(shí) wait 設(shè)為
True一樣等待):
import shutilwith ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, 'src1.txt', 'dest1.txt')e.submit(shutil.copy, 'src2.txt', 'dest2.txt')e.submit(shutil.copy, 'src3.txt', 'dest3.txt')e.submit(shutil.copy, 'src4.txt', 'dest4.txt')在 3.9 版更改: 增加了 cancel_futures。
ThreadPoolExecutor
ThreadPoolExecutor 是 Executor 的子類,它使用線程池來(lái)異步執(zhí)行調(diào)用。
當(dāng)可調(diào)用對(duì)象已關(guān)聯(lián)了一個(gè) Future 然后在等待另一個(gè) Future 的結(jié)果時(shí)就會(huì)導(dǎo)致死鎖情況。例如:
import timedef wait_on_b():time.sleep(5)print(b.result()) # b will never complete because it is waiting on a.return 5def wait_on_a():time.sleep(5)print(a.result()) # a will never complete because it is waiting on b.return 6executor = ThreadPoolExecutor(max_workers=2)a = executor.submit(wait_on_b)b = executor.submit(wait_on_a)
與:
def wait_on_future():f = executor.submit(pow, 5, 2)# This will never complete because there is only one worker thread and# it is executing this function.print(f.result())executor = ThreadPoolExecutor(max_workers=1)executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=’’, initializer=None, initargs=())
Executor 子類使用最多 max_workers 個(gè)線程的線程池來(lái)異步執(zhí)行調(diào)用。
All threads enqueued to ThreadPoolExecutor will be joined before the interpreter can exit. Note that the exit handler which does this is executed before any exit handlers added using atexit. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that ThreadPoolExecutor not be used for long-running tasks.
initializer 是在每個(gè)工作者線程開始處調(diào)用的一個(gè)可選可調(diào)用對(duì)象。 initargs 是傳遞給初始化器的元組參數(shù)。任何向池提交更多工作的嘗試, initializer 都將引發(fā)一個(gè)異常,當(dāng)前所有等待的工作都會(huì)引發(fā)一個(gè) BrokenThreadPool。
在 3.5 版更改: 如果 max_workers 為 None 或沒有指定,將默認(rèn)為機(jī)器處理器的個(gè)數(shù),假如 ThreadPoolExecutor 則重于I/O操作而不是CPU運(yùn)算,那么可以乘以 5 ,同時(shí)工作線程的數(shù)量可以比 ProcessPoolExecutor 的數(shù)量高。
3.6 新版功能: 添加 thread_name_prefix 參數(shù)允許用戶控制由線程池創(chuàng)建的 threading.Thread 工作線程名稱以方便調(diào)試。
在 3.7 版更改: 加入 initializer 和*initargs* 參數(shù)。
在 3.8 版更改: max_workers 的默認(rèn)值已改為 min(32, os.cpu_count() + 4)。 這個(gè)默認(rèn)值會(huì)保留至少 5 個(gè)工作線程用于 I/O 密集型任務(wù)。 對(duì)于那些釋放了 GIL 的 CPU 密集型任務(wù),它最多會(huì)使用 32 個(gè) CPU 核心。這樣能夠避免在多核機(jī)器上不知不覺地使用大量資源。
現(xiàn)在 ThreadPoolExecutor 在啟動(dòng) max_workers 個(gè)工作線程之前也會(huì)重用空閑的工作線程。
ThreadPoolExecutor 例子
import concurrent.futuresimport urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contentsdef load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor
ProcessPoolExecutor 類是 Executor 的子類,它使用進(jìn)程池來(lái)異步地執(zhí)行調(diào)用。 ProcessPoolExecutor 會(huì)使用 multiprocessing 模塊,這允許它繞過(guò) 全局解釋器鎖 但也意味著只可以處理和返回可封存的對(duì)象。
__main__ 模塊必須可以被工作者子進(jìn)程導(dǎo)入。這意味著 ProcessPoolExecutor 不可以工作在交互式解釋器中。
從可調(diào)用對(duì)象中調(diào)用 Executor 或 Future 的方法提交給 ProcessPoolExecutor 會(huì)導(dǎo)致死鎖。
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
異步地執(zhí)行調(diào)用的 Executor 子類使用最多具有 max_workers 個(gè)進(jìn)程的進(jìn)程池。 如果 max_workers 為 None 或未給出,它將默認(rèn)為機(jī)器的處理器個(gè)數(shù)。 如果 max_workers 小于等于 0,則將引發(fā) ValueError。 在 Windows 上,max_workers 必須小于等于 61,否則將引發(fā) ValueError。 如果 max_workers 為 None,則所選擇的默認(rèn)值最多為 61,即使存在更多的處理器。 mp_context 可以是一個(gè)多進(jìn)程上下文或是 None。 它將被用來(lái)啟動(dòng)工作進(jìn)程。 如果 mp_context 為 None 或未給出,則將使用默認(rèn)的多進(jìn)程上下文。
initializer 是一個(gè)可選的可調(diào)用對(duì)象,它會(huì)在每個(gè)工作進(jìn)程啟動(dòng)時(shí)被調(diào)用;initargs 是傳給 initializer 的參數(shù)元組。 如果 initializer 引發(fā)了異常,則所有當(dāng)前在等待的任務(wù)以及任何向進(jìn)程池提交更多任務(wù)的嘗試都將引發(fā) BrokenProcessPool。
max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is None which means worker processes will live as long as the pool. When a max is specified, the “spawn” multiprocessing start method will be used by default in absence of a mp_context parameter. This feature is incompatible with the “fork” start method.
在 3.3 版更改: 如果其中一個(gè)工作進(jìn)程被突然終止,BrokenProcessPool 就會(huì)馬上觸發(fā)。 可預(yù)計(jì)的行為沒有定義,但執(zhí)行器上的操作或它的 future 對(duì)象會(huì)被凍結(jié)或死鎖。
在 3.7 版更改: 添加 mp_context 參數(shù)允許用戶控制由進(jìn)程池創(chuàng)建給工作者進(jìn)程的開始方法 。
加入 initializer 和*initargs* 參數(shù)。
在 3.11 版更改: The max_tasks_per_child argument was added to allow users to control the lifetime of workers in the pool.
ProcessPoolExecutor 例子
import concurrent.futuresimport mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__':main()
Future 對(duì)象
Future 類將可調(diào)用對(duì)象封裝為異步執(zhí)行。Future 實(shí)例由 Executor.submit() 創(chuàng)建。
class concurrent.futures.Future
將可調(diào)用對(duì)象封裝為異步執(zhí)行。Future 實(shí)例由 Executor.submit() 創(chuàng)建,除非測(cè)試,不應(yīng)直接創(chuàng)建。
cancel()
嘗試取消調(diào)用。 如果調(diào)用正在執(zhí)行或已結(jié)束運(yùn)行不能被取消則該方法將返回
False,否則調(diào)用會(huì)被取消并且該方法將返回True。cancelled()
如果調(diào)用成功取消返回
True。running()
如果調(diào)用正在執(zhí)行而且不能被取消那么返回
True。done()
如果調(diào)用已被取消或正常結(jié)束那么返回
True。result(timeout=None)
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or
None, there is no limit to the wait time.如果 futrue 在完成前被取消則 CancelledError 將被觸發(fā)。
如果調(diào)用引發(fā)了一個(gè)異常,這個(gè)方法也會(huì)引發(fā)同樣的異常。
exception(timeout=None)
Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or
None, there is no limit to the wait time.如果 futrue 在完成前被取消則 CancelledError 將被觸發(fā)。
如果調(diào)用正常完成那么返回
None。add_done_callback(fn)
附加可調(diào)用 fn 到 future 對(duì)象。當(dāng) future 對(duì)象被取消或完成運(yùn)行時(shí),將會(huì)調(diào)用 fn,而這個(gè) future 對(duì)象將作為它唯一的參數(shù)。
加入的可調(diào)用對(duì)象總被屬于添加它們的進(jìn)程中的線程按加入的順序調(diào)用。如果可調(diào)用對(duì)象引發(fā)一個(gè) Exception 子類,它會(huì)被記錄下來(lái)并被忽略掉。如果可調(diào)用對(duì)象引發(fā)一個(gè) BaseException 子類,這個(gè)行為沒有定義。
如果 future 對(duì)象已經(jīng)完成或已取消,fn 會(huì)被立即調(diào)用。
下面這些 Future 方法用于單元測(cè)試和 Executor 實(shí)現(xiàn)。
set_running_or_notify_cancel()
這個(gè)方法只可以在執(zhí)行關(guān)聯(lián) Future 工作之前由 Executor 實(shí)現(xiàn)調(diào)用或由單測(cè)試調(diào)用。
If the method returns
Falsethen the Future was cancelled, i.e. Future.cancel() was called and returnedTrue. Any threads waiting on the Future completing (i.e. through as_completed() or wait()) will be woken up.If the method returns
Truethen the Future was not cancelled and has been put in the running state, i.e. calls to Future.running() will returnTrue.這個(gè)方法只可以被調(diào)用一次并且不能在調(diào)用 Future.set_result() 或 Future.set_exception() 之后再調(diào)用。
set_result(result)
設(shè)置將 Future 關(guān)聯(lián)工作的結(jié)果給 result 。
這個(gè)方法只可以由 Executor 實(shí)現(xiàn)和單元測(cè)試使用。
在 3.8 版更改: 如果 Future 已經(jīng)完成則此方法會(huì)引發(fā) concurrent.futures.InvalidStateError。
set_exception(exception)
設(shè)置 Future 關(guān)聯(lián)工作的結(jié)果給 Exception exception 。
這個(gè)方法只可以由 Executor 實(shí)現(xiàn)和單元測(cè)試使用。
在 3.8 版更改: 如果 Future 已經(jīng)完成則此方法會(huì)引發(fā) concurrent.futures.InvalidStateError。
模塊函數(shù)
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由 fs 指定的 Future 實(shí)例(可能由不同的 Executor 實(shí)例創(chuàng)建)完成。 重復(fù)傳給 fs 的 future 會(huì)被移除并將只返回一次。 返回一個(gè)由集合組成的具名 2 元組。 第一個(gè)集合的名稱為 done,包含在等待完成之前已完成的 future(包括正常結(jié)束或被取消的 future)。 第二個(gè)集合的名稱為 not_done,包含未完成的 future(包括掛起的或正在運(yùn)行的 future)。
timeout 可以用來(lái)控制返回前最大的等待秒數(shù)。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或?yàn)?None ,則不限制等待時(shí)間。
return_when 指定此函數(shù)應(yīng)在何時(shí)返回。它必須為以下常數(shù)之一:
|
常量 |
描述 |
|---|---|
|
|
函數(shù)將在任意可等待對(duì)象結(jié)束或取消時(shí)返回。 |
|
|
函數(shù)將在任意可等待對(duì)象因引發(fā)異常而結(jié)束時(shí)返回。當(dāng)沒有引發(fā)任何異常時(shí)它就相當(dāng)于 |
|
|
函數(shù)將在所有可等待對(duì)象結(jié)束或取消時(shí)返回。 |
concurrent.futures.as_completed(fs, timeout=None)
Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
參見
PEP 3148 — future 對(duì)象 - 異步執(zhí)行指令。
該提案描述了Python標(biāo)準(zhǔn)庫(kù)中包含的這個(gè)特性。
Exception 類
exception concurrent.futures.CancelledError
future 對(duì)象被取消時(shí)會(huì)觸發(fā)。
exception concurrent.futures.TimeoutError
A deprecated alias of TimeoutError, raised when a future operation exceeds the given timeout.
在 3.11 版更改: This class was made an alias of TimeoutError.
exception concurrent.futures.BrokenExecutor
當(dāng)執(zhí)行器被某些原因中斷而且不能用來(lái)提交或執(zhí)行新任務(wù)時(shí)就會(huì)被引發(fā)派生于 RuntimeError 的異常類。
3.7 新版功能.
exception concurrent.futures.InvalidStateError
當(dāng)某個(gè)操作在一個(gè)當(dāng)前狀態(tài)所不允許的 future 上執(zhí)行時(shí)將被引發(fā)。
3.8 新版功能.
exception concurrent.futures.thread.BrokenThreadPool
當(dāng) ThreadPoolExecutor 中的其中一個(gè)工作者初始化失敗時(shí)會(huì)引發(fā)派生于 BrokenExecutor 的異常類。
3.7 新版功能.
exception concurrent.futures.process.BrokenProcessPool
當(dāng) ThreadPoolExecutor 中的其中一個(gè)工作者不完整終止時(shí)(比如,被外部殺死)會(huì)引發(fā)派生于 BrokenExecutor ( 原名 RuntimeError ) 的異常類。
3.3 新版功能.
網(wǎng)站標(biāo)題:創(chuàng)新互聯(lián)Python教程:concurrent.futures—-啟動(dòng)并行任務(wù)
分享地址:http://www.dlmjj.cn/article/cdjsojc.html


咨詢
建站咨詢
