程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python 線程同步(三) -- 信號量

編輯:Python

1. 引言

上兩篇文章中,我們詳細介紹了 Python 中的兩種線程同步方式 — 鎖與條件對象。 Python 線程同步(一) — 競爭條件與線程鎖python 線程同步(二) — 條件對象

本文我們來介紹一個計算機科學史上最為古老和經典的線程同步原語之一 — 信號量。

2. 信號量

我們此前已經介紹過 Linux 的信號量與 Java 中的信號量。

信號量是操作系統中的一個經典線程同步原語,實際上他是帶有計數功能的互斥鎖,用來保護某個只允許指定數量操作的資源。 信號量與鎖機制非常類似,但他維護了一個內部的計數值,每次加鎖原子性的將計數值減1,返回不為負則意味著加鎖成功,否則加回1並阻塞等待直到被喚醒,而解鎖時則在信號量計數上進行加1操作。 一般來說,對計數值的修改是通過 CAS 操作實現的。 CAS 思想與 java 原子操作的實現

3. Python 中的信號量 — threading.Semaphore

python 標准庫中的 threading 包中實現了信號量對象。

3.1. 構造方法

該對象的構造方法有一個參數 value 用於初始化上文所述的信號量內的計數值,默認為 1。

threading.Semaphore(value=1)

3.1.1. value 的取值

  • 當 value 傳入大於 1,這是最為常用的用法,用來限制最多 value 個線程可以同時共享資源
  • 當 value 傳入為 1 時,信號量退化為了一個普通的線程鎖,雖然這是默認行為,但與 threading 中提供的鎖對象相比,通過信號量實現基本的線程鎖雖然在使用方式上是一樣的,但其執行效率要低一些,因此不建議這樣使用
  • 當 value 傳入 0 時,所有試圖加鎖的線程都將阻塞在該信號量對象上,但 Python 允許不經加鎖直接調用解鎖方法來增加計數值,但這通常是錯誤的用法,應該避免這樣使用
  • 當 value 傳入小於 0 時,會拋出 ValueError 異常

3.2. 加鎖

acquire(blocking=True, timeout=None)

加鎖方法的執行邏輯我們已經在上面有過詳細介紹。 Python 信號量的加鎖方法允許傳入兩個參數,分別表示是否阻塞,與最長等待時間(秒數) 加鎖成功則返回 True。

3.3. 解鎖

release()

解鎖方法就是將信號量中的計數器加 1,如果計數器的原值為 0,則喚醒所有阻塞在該信號量上的線程。 與普通的鎖對象不同,Python 中的信號量允許在未加鎖的情況下調用 release 方法來讓計數器加 1。

import logging
from threading import Thread, Semaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = Semaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()

打印出了:

2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(0) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(1) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(2) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(4) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(5) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(6) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(7) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(8) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(9) start running 2019-05-12 22:12:24,016 - INFO: all the threads are running 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(0) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(1) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) hold the semaphore

可以看到,我們創建了 10 個線程並啟動,但由於信號量的初始計數為 0,因此所有 10 個線程在啟動後均立即阻塞等待在信號量上。 我們的主線程在未對信號量加鎖的情況下直接調用了 release 方法,這並沒有報錯,而是激活了 10 個線程中的某個線程運行。

4. 有界信號量 — BoundedSemaphore

上面的例子中,我們看到,Python 中的信號量允許我們在未加鎖的情況下直接調用解鎖方法來讓信號量內計數器值加 1,這似乎讓構造方法傳入的 value 值失去了他的價值。 Python 中存在另一種信號量,他與我們上面講解的信號量僅有一點區別,那就是當 release 方法試圖將計數器增加到大於構造方法傳入的 value 值時,會拋出 ValueError 異常。 因此,在通常使用中 Semaphore 與 BoundedSemaphore 並沒有什麼區別。

我們把上文未經加鎖即解鎖例子中的信號量改為 BoundedSemaphore 再來試一下:

import logging
from threading import Thread, BoundedSemaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = BoundedSemaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()

打印出了:

2019-05-13 00:08:35,020 - INFO: SemaphoreTestThread(0) start running 2019-05-13 00:08:35,024 - INFO: SemaphoreTestThread(1) start running 2019-05-13 00:08:35,025 - INFO: SemaphoreTestThread(2) start running 2019-05-13 00:08:35,027 - INFO: SemaphoreTestThread(3) start running 2019-05-13 00:08:35,028 - INFO: SemaphoreTestThread(4) start running 2019-05-13 00:08:35,034 - INFO: SemaphoreTestThread(5) start running 2019-05-13 00:08:35,039 - INFO: SemaphoreTestThread(6) start running 2019-05-13 00:08:35,043 - INFO: SemaphoreTestThread(7) start running 2019-05-13 00:08:35,053 - INFO: SemaphoreTestThread(8) start running 2019-05-13 00:08:35,060 - INFO: all the threads are running 2019-05-13 00:08:35,060 - INFO: add 1 on the semaphore Traceback (most recent call last): File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1741, in <module> 2019-05-13 00:08:35,054 - INFO: SemaphoreTestThread(9) start running main() File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1735, in main globals = debugger.run(setup[‘file’], None, None, is_module) File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1135, in run pydev_imports.execfile(file, globals, locals) # execute the script File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, ’exec’), glob, loc) File "D:/Workspace/code/python/fluentpython/thread/semaphore.py", line 34, in <module> semaphore.release() File "C:\Users\zeyu\Anaconda3\lib\threading.py", line 483, in release raise ValueError("Semaphore released too many times") ValueError: Semaphore released too many times

5. 示例 — 一個簡易的 DB 連接池

from threading import BoundedSemaphore, Semaphore
class PooledDB:
def __init__(self, creator, minconnections, maxconnections, *args, **kwargs):
self._args, self._kwargs = args, kwargs
self._creator = creator
self._minconnections = minconnections
self._maxconnections = maxconnections
self._max_semaphore = Semaphore(maxconnections)
self._min_semaphore = BoundedSemaphore(minconnections)
self._idle_cache = []
idle = [self.get_connection() for _ in range(minconnections)]
while idle:
idle.pop().close()
def get_connection(self, timeout=None):
hold = self._max_semaphore.acquire(timeout=timeout)
if hold:
hold = self._min_semaphore.acquire(blocking=False)
if hold:
return self._idle_cache.pop(0)
else:
return PooledConnection(self._creator, self, args=self._args, kwargs=self._kwargs)
return None
def returnConnect(self, connection):
try:
self._min_semaphore.release()
self._idle_cache.append(connection)
except ValueError:
connection.close(True)
finally:
self._max_semaphore.release()
class PooledConnection:
def __init__(self, creator, pool, *args, **kwargs):
self._pool = pool
self._creator = creator
self._con = self._creator.connect(args, kwargs)
def close(self, force_close=False):
if force_close:
self._con.close()
else:
self._pool.returnConnect(self)

這只是一個用於示例的簡易 DB 連接池實現,同時,對於連接類 PooledConnection 我們省略了 begin、commit、rollback、cursor、ping 等方法的實現,因為這些與我們本節的內容並沒什麼關系,只實現了連接創建方法與 close 方法,同時省略了所有的參數、邊界判斷,只為了讓示例更加精簡,很快我會寫一篇詳細介紹生產環境可用的 DB 連接池的源碼解析,敬請期待。 上面的例子中,我們的連接池構造方法擁有兩個參數 — 最大連接數和最小連接數。 我們創建了兩個 BoundedSemaphore 對象,分別用來限制並發環境中的最大、最小連接數。

5.1. 創建連接

初始狀態下我們就已經向空閒隊列中添加了最小連接數個數個空閒連接,我們看到,在創建連接時,我們先試圖對最大連接數信號量進行加鎖,從而保證並發環境下連接池連接數不會超過 maxconnections 值。 然後,對最小連接數信號量進行了加鎖,加鎖成功則從空閒隊列中獲取連接,否則新建連接。

5.2. 關閉連接

關閉連接時,我們首先試圖釋放最小連接數信號量,這裡就體現出了 BoundedSemaphore 的價值,一旦釋放次數超過構造參數傳入的 minconnections 則意味著我們的釋放次數大於了加鎖次數,也就是說,這個被釋放連接並不是從空閒隊列 _idle_cache 中取出的,而 BoundedSemaphore 在此時拋出 ValueError 異常讓我們可以直接強制關閉該連接,而不是讓他回到連接池。 與最小連接數信號量相比,最大連接數信號量使用 Semaphore 就可以了。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved