實際案例:
http://quotes.money.163.com/stock/,我們通過網站獲取了中國股市某支股票csv數據文件,現在要下載多只股票的csv數據,並將其轉換為xml文件。
由於全局解釋器鎖的存在,多線程進行CPU密集型操作並不能提高執行效率,我們修改程序架構:
[1] 使用多個DownloadThread線程進行下載(I/O操作);
[2] 使用一個ConvertThread線程進行轉換(CPU密集型操作);
[3] 下載線程把下載數據安全地傳遞給轉換線程。
(典型的生產者與消費者問題,現在只有一個消費者多個生產者)
解決方案:
使用標准庫中queue.Queue (python2中為Queue.Queue),它是一個線程安全的隊列。
Download線程把下載數據放入隊列,Covert線程從隊列裡提取數據。
拓展:
Python3中Queue和queue區別:
import queue,queue.Queue()。用於多線程間的通訊,不能用於"多進程"之間的通訊。
from multiprocessing import Queue,用於進程之間的數據共享。
import csv
from xml.etree.ElementTree import Element, ElementTree, tostring
import requests
from io import StringIO, BytesIO
from xml_pretty import pretty
from threading import Thread
from collections import deque
import queue
# 可以創建雙端隊列,進行線程間的通信
# q = deque()
'''
多個線程同時訪問q是不安全的,想安全的訪問就要加鎖。
一種簡潔的做法就是使用線程安全的數據結構標准庫中queue.Queue,
這種線程安全的數據結構他在內部實現了鎖,幫我們完成了同步工作。
使用線程安全的Queue替換deque,經常使用全局變量也是不良的設計模式。
'''
# 實現下載線程
class DownloadThread(Thread):
def __init__(self, sid, queue1):
Thread.__init__(self) # 調用父類的構造器
self.sid = sid
self.url = 'http://quotes.money.163.com/service/chddata.html?' \
'code=1%s&start=20210101&end=20220505'
# 將%s替換成00000X
self.url %= str(sid).rjust(6, '0')
self.queue1 = queue1
# 下載csv數據
def download(self, url):
response = requests.get(url, timeout=3)
if response.ok:
# print(response.content)
# StringIO是支持文件操作的內存對象
return StringIO(response.content.decode(encoding='gb18030'))
# 實現線程類的線程入口點
def run(self):
print('Download', self.sid)
# 股票數據下載
data = self.download(self.url)
# 將下載的數據傳給轉換線程,(sid, data)
# q.append((self.sid, data))
self.queue1.put((self.sid, data))
# 實現轉換線程
class ConvertThread(Thread):
def __init__(self, queue1):
Thread.__init__(self)
self.queue1 = queue1
# 將csv數據格式轉換成xml數據格式
def csv_to_xml(self, scsv, fxml):
reader = csv.reader(scsv)
headers = next(reader)
# print(headers)
# xml的tag不能為中文,將其替換成以下標簽
headers = ['Date', 'Code', 'Name', 'CLose',
'High', 'Low', 'Open', 'Before',
'Price', 'Rise', 'Rate', 'Volume',
'Amount', 'Total', 'Value', 'Number']
# print(headers)
# headers = map(lambda h: h.replace(' ', ''), headers)
# print(list(headers))
root = Element('Data')
# print(tostring(root))
for row in reader:
e_row = Element('Row')
root.append(e_row)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
e_row.append(e)
pretty(root)
et = ElementTree(root)
et.write(fxml)
def run(self):
# 需要放到循環中因為只有一個消費者線程
while True:
# 從隊列中獲取數據,接受sid線程號和data數據
sid, data = self.queue1.get()
print('Convert', sid)
if sid == -1: # sid等於-1退出循環
break
if data:
# 將接受的數據轉換成xml
f_name = '1' + str(sid).rjust(6, '0') + '.xml'
with open('resource/' + f_name, 'wb') as wf:
self.csv_to_xml(data, wf)
if __name__ == '__main__':
q = queue.Queue()
# 創建多個下載線程
dThreads = [DownloadThread(i, q) for i in range(1, 11)]
# 創建一個轉換線程
cThread = ConvertThread(q)
# 開啟所有線程
for t in dThreads:
t.start()
cThread.start()
# 等待所有的子進程都結束後主進程再退出
for t in dThreads:
t.join()
# Convert的run退出循環
q.put((-1, None))