http://quotes.money.163.com/stock/,我們通過網站獲取了中國股市某支股票csv數據文件,現在要下載多只股票的csv數據,並將其轉換為xml文件。
額外需求:實現一個線程,將轉換出的xml文件壓縮打包,比如轉換線程每生產出100個xml文件,就通知打包線程將它們打包成一個xxx.tgz文件,並刪除xml文件。打包完成後,打包線程反過來通知轉換線程,轉換線程繼續工作再生產100個xml文件再進行打包,就這樣循環的過程。
解決方案:
線程間的事件通知,可以使用標准庫中Threading.Event,在使用的時候兩個線程同時持用Event對象:
(1)在等待事件一端調用wait,等待事件。
(2)在通知事件一端調用set,通知事件。
(wait阻塞函數,將導致線程進入一個睡眠狀態,直到對端調用了set方法,此時的wait將返回回來。)
守護線程介紹:
TarThread線程怎麼從死循環中退出,可以把打包線程設置成一個守護線程,守護線程就是其它線程退出以後它會自動退出,這比較符合TarThead線程,因為它是為其它線程服務的。
# 解壓tar包到上一級目錄下
resource % tar -zxvf test.tgz -C ../
(1)python中如何進行文件的壓縮打包
# _*_ encoding:utf-8 _*_
import tarfile
import os
# 把當前目錄下的所有xml文件打成一個tar包
def tar_xml(tf_name):
# 打開tar包,模式w為寫,gz為壓縮算法
tf = tarfile.open(tf_name, 'w:gz')
# 遍歷當前目錄下所有文件
for f_name in os.listdir('./resource/'):
if f_name.endswith('.xml'):
# xml文件添加到tar包內
print(f_name)
tf.add('resource/' + f_name)
# 刪除自身的xml文件
os.remove('resource/' + f_name)
# 當前tar包沒有xml文件為空時,刪除tar包
if not tf.getmembers():
os.remove(tf_name)
tf.close()
# 指定tar包的名字
tar_xml('./resource/test.tgz')
(2)線程間通知Event簡單使用
from threading import Event, Thread
# 實現一個子線程函數,e為Event對象
def f(e):
print('f 0')
# 阻塞,等待一個事件通知
e.wait()
print('f 1')
e = Event()
# 創建一個線程
t = Thread(target=f, args=(e,))
t.start()
# 通知線程
e.set()
# 需要注意,set以後再調用wait就阻塞不住了,
# 需要再次使用需要調用clear將它清理掉之後又可以進行阻塞
# e.clear()
(3)實現打包線程
實現打包線程需要兩個事件:
第一個事件,當轉換線程生產出足夠多的XML文件以後,它來通知打包線程進行打包。
第二個事件,就是反過來,打包完成以後去通知轉換線程繼續轉換。
import csv
from xml.etree.ElementTree import Element, ElementTree, tostring
import requests
from io import StringIO, BytesIO
from xml_pretty import pretty
from threading import Thread, Event
from collections import deque
import queue
import tarfile
import os
# 可以創建雙端隊列,進行線程間的通信
# q = deque()
'''
多個線程同時訪問q是不安全的,想安全的訪問就要加鎖。
一種簡潔的做法就是使用線程安全的數據結構標准庫中Queue.Queue,
這種線程安全的數據結構他在內部實現了鎖,幫我們完成了同步工作。
使用線程安全的Queue替換deque,經常使用全局變量也是不良的設計模式。
'''
# 實現下載線程
class DownloadThread(Thread):
def __init__(self, sid, queue1):
Thread.__init__(self) # 調用父類的構造器
self.sid = sid
self.url = 'http://quotes.money.163.com/service/chddata.html?' \
'code=1%s&start=20210101&end=20220505'
# 將%s替換成00000X
self.url %= str(sid).rjust(6, '0')
self.queue1 = queue1
# 下載csv數據
def download(self, url):
response = requests.get(url, timeout=3)
if response.ok:
# print(response.content)
# StringIO是支持文件操作的內存對象
return StringIO(response.content.decode(encoding='gb18030'))
# 實現線程類的線程入口點
def run(self):
print('Download', self.sid)
# 股票數據下載
data = self.download(self.url)
# 將下載的數據傳給轉換線程,(sid, data)
# q.append((self.sid, data))
self.queue1.put((self.sid, data))
# 實現轉換線程
class ConvertThread(Thread):
def __init__(self, queue1, cEvent, tEvent):
Thread.__init__(self)
self.queue1 = queue1
self.cEvent = cEvent
self.tEvent = tEvent
# 將csv數據格式轉換成xml數據格式
def csv_to_xml(self, scsv, fxml):
reader = csv.reader(scsv)
headers = next(reader)
# print(headers)
# xml的tag不能為中文,將其替換成以下標簽
headers = ['Date', 'Code', 'Name', 'CLose',
'High', 'Low', 'Open', 'Before',
'Price', 'Rise', 'Rate', 'Volume',
'Amount', 'Total', 'Value', 'Number']
# print(headers)
# headers = map(lambda h: h.replace(' ', ''), headers)
# print(list(headers))
root = Element('Data')
# print(tostring(root))
for row in reader:
e_row = Element('Row')
root.append(e_row)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
e_row.append(e)
pretty(root)
et = ElementTree(root)
et.write(fxml)
def run(self):
count = 0 # 統計文件數量
# 需要放到循環中因為只有一個消費者線程
while True:
# 從隊列中獲取數據,接受sid線程號和data數據
sid, data = self.queue1.get()
print('Convert', sid)
if sid == -1: # sid等於-1退出循環
# 最後退出時文件數不夠5個也需要打包
self.cEvent.set()
self.tEvent.wait()
break
if data:
# 將接受的數據轉換成xml
f_name = '1' + str(sid).rjust(6, '0') + '.xml'
with open('resource/' + f_name, 'wb') as wf:
self.csv_to_xml(data, wf)
count += 1
# 每次文件數量為5就通知打包
if count == 5:
self.cEvent.set()
# 等待對端打包完畢
self.tEvent.wait()
self.tEvent.clear()
count = 0
# 實現打包線程
class TarThread(Thread):
def __init__(self, cEvent, tEvent):
Thread.__init__(self)
self.count = 0
self.cEvent = cEvent
self.tEvent = tEvent
# 設置守護線程
self.setDaemon(True)
# 把當前目錄下的所有xml文件打成一個tar包
def tar_xml(self):
# 構造打包名
self.count += 1
tf_name = './resource/%d.tgz' % self.count
# 打開tar包,模式w為寫,gz為壓縮算法
tf = tarfile.open(tf_name, 'w:gz')
# 遍歷當前目錄下所有文件
for f_name in os.listdir('./resource/'):
if f_name.endswith('.xml'):
# xml文件添加到tar包內
print(f_name)
tf.add('resource/' + f_name)
# 刪除自身的xml文件
os.remove('resource/' + f_name)
# 當前tar包沒有xml文件為空時,刪除tar包
if not tf.getmembers():
os.remove(tf_name)
tf.close()
def run(self):
# 等待ConvertThread發通知
while True:
# 等待事件轉換完畢
self.cEvent.wait()
self.tar_xml()
# 清理事件重復使用
self.cEvent.clear()
# 通知對端
self.tEvent.set()
if __name__ == '__main__':
q = queue.Queue()
# 創建多個下載線程
dThreads = [DownloadThread(i, q) for i in range(1, 13)]
# 創建兩個事件
cEvent = Event()
tEvent = Event()
# 創建一個轉換線程和打包線程
cThread = ConvertThread(q, cEvent, tEvent)
tThread = TarThread(cEvent, tEvent)
# 開啟所有線程
tThread.start()
for t in dThreads:
t.start()
cThread.start()
for t in dThreads:
t.join()
# Convert的run退出循環
q.put((-1, None))