start
queue Module provides first in, first out for multithreaded programming (FIFO) data structure . Because it's thread safe , So multiple threads can easily use the same instance .
Source code analysis
First, from the initialized function :
class Queue:
def __init__(self, maxsize=0):
# Set the maximum capacity of the queue
self.maxsize = maxsize
self._init(maxsize)
# Thread lock , Mutex variables
self.mutex = threading.Lock()
# Three conditional variables are derived from the lock
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def _init(self, maxsize):
# Initialize the underlying data structure
self.queue = deque()
What information can we get from this initialization function ? First , The capacity of a queue can be set , And the specific underlying storage element uses collections.deque() Data structure of double ended list , This makes it easy to do FIFO . It is also specifically abstracted here as _init Function is to facilitate the coverage of its subclasses , Allow subclasses to use other structures to store elements ( For example, priority queue uses list).
Then there is thread lock self.mutex , For the underlying data structure self.queue All operations must first obtain this lock ; Next, there are three conditional variables , These three Condition Are subject to self.mutex As a parameter , That is to say, they share a lock ; From this we can know something like with self.mutex And with self.not_empty And so on are mutually exclusive .
Some simple operations based on these locks :
class Queue:
...
def qsize(self):
# Returns the number of elements in the queue
with self.mutex:
return self._qsize()
def empty(self):
# Whether the queue is empty
with self.mutex:
return not self._qsize()
def full(self):
# Is the queue full
with self.mutex:
return 0 < self.maxsize <= self._qsize()
def _qsize(self):
return len(self.queue)
This code fragment is very understandable , No need to analyze .
As a queue , The main thing is to complete the operation of joining and leaving the team , The first is to join the team :
class Queue:
...
def put(self, item, block=True, timeout=None):
with self.not_full: # Get condition variables not_full
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full # If block yes False, And the queue is full , Then throw it out Full abnormal
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait() # Block until the remaining space
elif timeout < 0: # Unqualified parameter value , Throw out ValueError
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout # Calculate the end time of waiting
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full # There is no space during waiting , Throw out Full abnormal
self.not_full.wait(remaining)
self._put(item) # Add an element to the underlying data structure
self.unfinished_tasks += 1
self.not_empty.notify()
def _put(self, item):
self.queue.append(item)
Although there are only 20 lines of code , But the logic here is still relatively complex . It deals with timeout and insufficient space left in the queue , The specific situations are as follows :
After processing the parameter logic ,, Save elements in the underlying data structure , And increasing unfinished_tasks, Notice at the same time not_empty , Wake up the thread waiting for data .
Out of line operation :
class Queue:
...
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
def _get(self):
return self.queue.popleft()
get() Operation is put() Reverse operation , Code blocks are also very similar ,get() Is to remove the first inserted element from the queue and return it .
Last , adopt self.queue.popleft() Remove the earliest element placed in the queue , And notify not_full , Wake up the thread waiting for data .
Here's something to note , stay put() Increased in operation self.unfinished_tasks , and get() There is no decrease in , Why is that ?
This is actually to give users time to consume elements ,get() Just get the elements , It does not represent the element processed by the consumer thread , The user needs to call task_done() To notify the queue that the task processing is completed :
class Queue:
...
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0: # That is, the successful call put() The number of calls is less than task_done() Number of times , It throws an exception
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all() # When unfinished by 0 when , Will inform all_tasks_done
self.unfinished_tasks = unfinished
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks: # If there are unfinished tasks , Will call wait() Method wait
self.all_tasks_done.wait()
because task_done() Called by the user , When task_done() More times than put() The number of times will throw an exception .
task_done() The function of the operation is to wake up the blocking join() operation .join() Methods will always block , Until all the elements in the queue are taken out , And was dealt with ( And thread join The method is similar to ). in other words join() The method must cooperate with task_done() To use .
LIFO LIFO queue
LifoQueue Use last in first out sequence , Similar to stack structure :
class LifoQueue(Queue):
'''Variant of Queue that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
This is it. LifoQueue It's all code , That's exactly what it is. Queue One reason why the design is great , It abstracts the underlying data operations into four operation functions , Handle thread safety issues by itself , So that its subclasses only need to pay attention to the underlying operations .
LifoQueue The underlying data structure is changed to list To hold the , adopt self.queue.pop() Will be able to list Remove the last element in , There is no need to reset the index .
PriorityQueue Priority queue
from heapq import heappush, heappop
class PriorityQueue(Queue):
'''Variant of Queue that retrieves open entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
'''
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
heappush(self.queue, item)
def _get(self):
return heappop(self.queue)
Priority queues use heapq Module structure , That is, the structure of the smallest heap . Priority queues are more commonly used , The processing order of items in the queue needs to be based on the characteristics of these items , A simple example :
import queue
class A:
def __init__(self, priority, value):
self.priority = priority
self.value = value
def __lt__(self, other):
return self.priority < other.priority
q = queue.PriorityQueue()
q.put(A(1, 'a'))
q.put(A(0, 'b'))
q.put(A(1, 'c'))
print(q.get().value) # 'b'
When using priority queues , Need to define lt Magic methods , To define how they compare sizes . If the element's priority identical , Still use the first in, first out sequence .
The above is all the content shared this time , Want to know more python Welcome to official account :Python Programming learning circle , send out “J” Free access to , Daily dry goods sharing