In the last two articles , We introduced in detail Python Two thread synchronization methods in — Locks and condition objects . Python Thread synchronization ( One ) — Race conditions and thread locks python Thread synchronization ( Two ) — Conditions of the object
In this paper, we introduce one of the oldest and classic thread synchronization primitives in the history of computer science — Semaphore .
We have already introduced Linux And Java The semaphore in .
Semaphore is a classic thread synchronization primitive in operating system , In fact, it is a mutex with counting function , Used to protect a resource that only allows a specified number of operations . Semaphores are very similar to locking mechanisms , But he maintains an internal count , Each lock atomically subtracts the count value 1, If the return value is not negative, it means that locking is successful , Otherwise add back 1 And block waiting until awakened , When unlocking, add... To the semaphore count 1 operation . Generally speaking , The change to the count value is made by CAS The operation realizes . CAS Thought and java The realization of atomic operation
python In the standard library threading The semaphore object is implemented in the package .
The constructor of this object has a parameter value Used to initialize the count value in the semaphore described above , The default is 1.
threading.Semaphore(value=1)
acquire(blocking=True, timeout=None)
The execution logic of the locking method has been described in detail above . Python The semaphore locking method allows two parameters to be passed in , Indicates whether it is blocked , And the maximum waiting time ( Number of seconds ) Lock success returns True.
release()
The unlocking method is to add the counter in the semaphore 1, If the original value of the counter is 0, Wake up all threads blocked on the semaphore . Different from ordinary lock objects ,Python Semaphores in allow calls to be made without locking release Method to increment the counter 1.
import logging
from threading import Thread, Semaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = Semaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()
Printed out :
2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(0) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(1) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(2) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(4) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(5) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(6) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(7) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(8) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(9) start running 2019-05-12 22:12:24,016 - INFO: all the threads are running 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(0) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(1) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) hold the semaphore
You can see , We created 10 Thread and start , But since the initial count of semaphores is 0, So all 10 Threads immediately block and wait on semaphores after startup . Our main thread directly calls without locking the semaphore release Method , There is no error in this report , It activates 10 One of the threads runs .
In the example above , We see ,Python The semaphore in allows us to directly call the unlocking method without locking to increase the counter value in the semaphore 1, This seems to make the constructor pass in value Value loses its value . Python There is another semaphore in , It is only a little different from the semaphore we explained above , That is when release Method attempts to increment the counter to a value greater than that passed in by the constructor value When the value of , Will throw out ValueError abnormal . therefore , In common use Semaphore And BoundedSemaphore There's no difference .
We change the semaphore in the example of unlocking without locking to BoundedSemaphore Try again :
import logging
from threading import Thread, BoundedSemaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = BoundedSemaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()
Printed out :
2019-05-13 00:08:35,020 - INFO: SemaphoreTestThread(0) start running 2019-05-13 00:08:35,024 - INFO: SemaphoreTestThread(1) start running 2019-05-13 00:08:35,025 - INFO: SemaphoreTestThread(2) start running 2019-05-13 00:08:35,027 - INFO: SemaphoreTestThread(3) start running 2019-05-13 00:08:35,028 - INFO: SemaphoreTestThread(4) start running 2019-05-13 00:08:35,034 - INFO: SemaphoreTestThread(5) start running 2019-05-13 00:08:35,039 - INFO: SemaphoreTestThread(6) start running 2019-05-13 00:08:35,043 - INFO: SemaphoreTestThread(7) start running 2019-05-13 00:08:35,053 - INFO: SemaphoreTestThread(8) start running 2019-05-13 00:08:35,060 - INFO: all the threads are running 2019-05-13 00:08:35,060 - INFO: add 1 on the semaphore Traceback (most recent call last): File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1741, in <module> 2019-05-13 00:08:35,054 - INFO: SemaphoreTestThread(9) start running main() File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1735, in main globals = debugger.run(setup[‘file’], None, None, is_module) File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1135, in run pydev_imports.execfile(file, globals, locals) # execute the script File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, ’exec’), glob, loc) File "D:/Workspace/code/python/fluentpython/thread/semaphore.py", line 34, in <module> semaphore.release() File "C:\Users\zeyu\Anaconda3\lib\threading.py", line 483, in release raise ValueError("Semaphore released too many times") ValueError: Semaphore released too many times
from threading import BoundedSemaphore, Semaphore
class PooledDB:
def __init__(self, creator, minconnections, maxconnections, *args, **kwargs):
self._args, self._kwargs = args, kwargs
self._creator = creator
self._minconnections = minconnections
self._maxconnections = maxconnections
self._max_semaphore = Semaphore(maxconnections)
self._min_semaphore = BoundedSemaphore(minconnections)
self._idle_cache = []
idle = [self.get_connection() for _ in range(minconnections)]
while idle:
idle.pop().close()
def get_connection(self, timeout=None):
hold = self._max_semaphore.acquire(timeout=timeout)
if hold:
hold = self._min_semaphore.acquire(blocking=False)
if hold:
return self._idle_cache.pop(0)
else:
return PooledConnection(self._creator, self, args=self._args, kwargs=self._kwargs)
return None
def returnConnect(self, connection):
try:
self._min_semaphore.release()
self._idle_cache.append(connection)
except ValueError:
connection.close(True)
finally:
self._max_semaphore.release()
class PooledConnection:
def __init__(self, creator, pool, *args, **kwargs):
self._pool = pool
self._creator = creator
self._con = self._creator.connect(args, kwargs)
def close(self, force_close=False):
if force_close:
self._con.close()
else:
self._pool.returnConnect(self)
This is just a simple example DB Connection pool implementation , meanwhile , For connection classes PooledConnection We omitted begin、commit、rollback、cursor、ping And so on , Because these have nothing to do with the content of this section , Only the connection creation method and close Method , All parameters are omitted 、 Boundary judgment , Just to make the example more concise , Soon I'll write an article detailing the DB Connection pool source code analysis , Coming soon . In the example above , Our connection pool construction method has two parameters — Maximum connections and minimum connections . We created two BoundedSemaphore object , They are respectively used to limit the maximum in the concurrent environment 、 Minimum connections .
In the initial state, we have added the minimum number of connections to the idle queue , We see , When creating a connection , We first try to lock the semaphore with the maximum number of connections , This ensures that the number of connection pool connections in the concurrent environment will not exceed maxconnections value . then , The minimum connection number semaphore is locked , If the lock is successful, the connection is obtained from the idle queue , Otherwise, create a new connection .
When the connection is closed , We first try to release the minimum number of connections semaphore , Here it shows BoundedSemaphore The value of , Once the number of releases exceeds that passed in by the construction parameter minconnections It means that the number of releases is greater than the number of locks , in other words , The released connection is not from the idle queue _idle_cache Out of , and BoundedSemaphore Throw... At this point ValueError Exceptions allow us to force the connection to close directly , Instead of letting him go back to the connection pool . Compared with the minimum number of connections semaphore , Maximum connections semaphore usage Semaphore That's all right. .