最近在看Python基礎,剛好看到生產者與消費者這快內容,視頻使用了queue
模塊來處理,這裡記錄一下學習的內容
生產者與消費者是一個比較容易理解的概念,比如游泳池中一頭進水一頭出水,就是很典型的例子。
視頻中的代碼主要是下面這塊:
# ecoding=utf-8
# Author: 翁彥彬 | Sven_Weng
# Email : [email protected]
# Web : http://wybblog.applinzi.com
from threading import current_thread, Thread
import time
import random
import queue
q = queue.Queue(5)
class Productor(Thread):
def run(self):
name = current_thread().getName()
nums = range(100)
while 1:
nowput = random.choice(nums)
if q.full(): # 消息隊列滿則停止生產
print "隊列已經達到上限{0}".format(q.qsize())
time.sleep(10)
q.put(nowput)
print "生產者{0}生產了{1}".format(name, nowput)
sl = random.choice([1, 2, 3])
time.sleep(sl)
print "生產者休息了{0}秒".format(sl)
class Consumer(Thread):
def run(self):
name = current_thread().getName()
while 1:
if q.empty(): # 消息隊列空的時候則暫停消費
print "隊列空了,暫停消費"
time.sleep(5)
num = q.get()
q.task_done()
print "消費者{0}消費了{1}".format(name, num)
sl = random.choice([1, 2, 3])
time.sleep(sl)
print "消費者休息了{0}秒".format(sl)
if __name__ == '__main__':
p1 = Productor()
p1.start()
p2 = Productor()
p2.start()
c1 = Consumer()
c1.start()
c2 = Consumer()
c2.start()
c3 = Consumer()
c3.start()
用的是threading
模塊來一邊生產內容一邊消費內容,整個代碼是比較簡單,但是不是特別容易理解,尤其是新手理解的時候不夠直觀。
消息隊列這個東西可以理解為一個復雜的list
,比如要實現先進先出,那麼每次返回list[0]
就行了,同理,如果要實現後進先出,那麼每次返回list[len(list)]
就行了,這麼理解起來比較容易。
當然,消息隊列相比起list
來是復雜了一些,但是原理基本上就是這樣,比如queue
使用的是threading
模塊來實現的,再復雜的消息隊列,比如Python
中用的比較多的celery
,可選的消息隊列就有RabbitMQ
或者Redis
鑒於視頻中的例子過於復雜,我自己寫了一個簡單直觀的例子,用Flask
寫了一個簡單的Web
化的消息隊列服務。
from flask import Flask, jsonify, request
from queue import Queue
app = Flask(__name__)
@app.before_first_request
def init_queue():
app.q = Queue(5)
@app.route('/')
def hello_world():
data = {
"code": "0000",
"queue_count": app.q.qsize()
}
return jsonify(data)
@app.route('/put')
def put_to_queue():
num = request.args['num']
print num
if app.q.full():
data = {
"code": "0001",
"msg": "The Queue is full"
}
else:
app.q.put(num)
data = {
"code": "0000",
"msg": "Success put {1} to the Queue, current count is {0}".format(app.q.qsize(), num)
}
return jsonify(data)
@app.route('/get')
def get_from_queue():
if app.q.empty():
data = {
"code": "0002",
"msg": "The Queue is empty"
}
else:
data = {
"code": "0000",
"msg": "Success get from the Queue, current count is {0}".format(app.q.qsize()),
"num": app.q.get()
}
app.q.task_done()
return jsonify(data)
if __name__ == '__main__':
app.run(debug=True)
首先在啟動的時候調用before_first_request
來初始化隊列,放到app.q
這個全局變量中,用不同的請求來執行不同的隊列操作,在頁面上就可以看到不同的結果了。演示例子中隊列數量5個就滿了,再執行put
就會返回錯誤信息,同理,如果空了,也會返回錯誤信息。