http://quotes.money.163.com/stock/, We got a stock in China's stock market through the website csv Data files , Now I want to download multiple stocks csv data , And convert it to xml file .
Additional requirements : Implement a thread , Will be converted out of xml File compression packaging , For example, every time the conversion thread produces 100 individual xml file , Just tell the packaging thread to package them into a xxx.tgz file , And delete xml file . After packing , The packaging thread in turn informs the transformation thread , The conversion thread continues to work and reproduce 100 individual xml Then package the file , This is the process of circulation .
Solution :
Event notification between threads , You can use the standard library Threading.Event, When in use, two threads are used at the same time Event object :
(1) Call... At the end of the wait event wait, Events wait .
(2) Call... On the notification event side set, Notification event .
(wait Blocking function , Will cause the thread to enter a sleep state , Until the peer calls set Method , At this time wait Will return .)
Introduction to daemon threads :
TarThread How a thread exits an infinite loop , You can set the packaging thread as a daemon thread , A daemon thread is a thread that automatically exits after other threads exit , This is more in line with TarThead Threads , Because it serves other threads .
# decompression tar Package to the upper level directory
resource % tar -zxvf test.tgz -C ../
(1)python How to compress and package files in
# _*_ encoding:utf-8 _*_
import tarfile
import os
# Put all... In the current directory xml The document is typed as a tar package
def tar_xml(tf_name):
# open tar package , Pattern w For writing ,gz For the compression algorithm
tf = tarfile.open(tf_name, 'w:gz')
# Traverse all files in the current directory
for f_name in os.listdir('./resource/'):
if f_name.endswith('.xml'):
# xml Add files to tar In bag
print(f_name)
tf.add('resource/' + f_name)
# Delete self xml file
os.remove('resource/' + f_name)
# At present tar The bag doesn't have xml When the file is empty , Delete tar package
if not tf.getmembers():
os.remove(tf_name)
tf.close()
# Appoint tar The name of the bag
tar_xml('./resource/test.tgz')
(2) Inter thread notification Event Easy to use
from threading import Event, Thread
# Implement a sub thread function ,e by Event object
def f(e):
print('f 0')
# Blocking , Wait for an event notification
e.wait()
print('f 1')
e = Event()
# Create a thread
t = Thread(target=f, args=(e,))
t.start()
# Notification thread
e.set()
# We need to pay attention to ,set Call... Later wait It won't stop ,
# If you need to use it again, you need to call clear After clearing it, it can be blocked
# e.clear()
(3) Implement the packaging thread
Implementing the packaging thread requires two events :
The first event , When the conversion thread produces enough XML After file , It notifies the packaging thread to package .
The second event , It's the reverse , After the packaging is completed, notify the transformation thread to continue the transformation .
import csv
from xml.etree.ElementTree import Element, ElementTree, tostring
import requests
from io import StringIO, BytesIO
from xml_pretty import pretty
from threading import Thread, Event
from collections import deque
import queue
import tarfile
import os
# You can create double ended queues , Communicate between threads
# q = deque()
'''
Multiple threads access at the same time q It's not safe , If you want secure access, you need to lock it .
A simple approach is to use the thread safe data structure standard library Queue.Queue,
This thread safe data structure implements the lock internally , Help us complete the synchronization work .
Use thread safe Queue Replace deque, Frequent use of global variables is also a bad design pattern .
'''
# Implement the download thread
class DownloadThread(Thread):
def __init__(self, sid, queue1):
Thread.__init__(self) # Call the constructor of the parent class
self.sid = sid
self.url = 'http://quotes.money.163.com/service/chddata.html?' \
'code=1%s&start=20210101&end=20220505'
# take %s Replace with 00000X
self.url %= str(sid).rjust(6, '0')
self.queue1 = queue1
# download csv data
def download(self, url):
response = requests.get(url, timeout=3)
if response.ok:
# print(response.content)
# StringIO Is a memory object that supports file operations
return StringIO(response.content.decode(encoding='gb18030'))
# Implement the thread entry point of the thread class
def run(self):
print('Download', self.sid)
# Stock data download
data = self.download(self.url)
# Transfer the downloaded data to the conversion thread ,(sid, data)
# q.append((self.sid, data))
self.queue1.put((self.sid, data))
# Implement the conversion thread
class ConvertThread(Thread):
def __init__(self, queue1, cEvent, tEvent):
Thread.__init__(self)
self.queue1 = queue1
self.cEvent = cEvent
self.tEvent = tEvent
# take csv The data format is converted to xml data format
def csv_to_xml(self, scsv, fxml):
reader = csv.reader(scsv)
headers = next(reader)
# print(headers)
# xml Of tag Not for Chinese , Replace it with the following label
headers = ['Date', 'Code', 'Name', 'CLose',
'High', 'Low', 'Open', 'Before',
'Price', 'Rise', 'Rate', 'Volume',
'Amount', 'Total', 'Value', 'Number']
# print(headers)
# headers = map(lambda h: h.replace(' ', ''), headers)
# print(list(headers))
root = Element('Data')
# print(tostring(root))
for row in reader:
e_row = Element('Row')
root.append(e_row)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
e_row.append(e)
pretty(root)
et = ElementTree(root)
et.write(fxml)
def run(self):
count = 0 # Count the number of documents
# It needs to be put into the loop because there is only one consumer thread
while True:
# Get data from the queue , Accept sid Thread number and data data
sid, data = self.queue1.get()
print('Convert', sid)
if sid == -1: # sid be equal to -1 Exit loop
# There were not enough files at the last exit 5 You also need to pack
self.cEvent.set()
self.tEvent.wait()
break
if data:
# Convert the received data into xml
f_name = '1' + str(sid).rjust(6, '0') + '.xml'
with open('resource/' + f_name, 'wb') as wf:
self.csv_to_xml(data, wf)
count += 1
# The number of files per time is 5 Pack the notification
if count == 5:
self.cEvent.set()
# Wait for the opposite end to be packaged
self.tEvent.wait()
self.tEvent.clear()
count = 0
# Implement the packaging thread
class TarThread(Thread):
def __init__(self, cEvent, tEvent):
Thread.__init__(self)
self.count = 0
self.cEvent = cEvent
self.tEvent = tEvent
# Setting up the guardian thread
self.setDaemon(True)
# Put all... In the current directory xml The document is typed as a tar package
def tar_xml(self):
# Construct package name
self.count += 1
tf_name = './resource/%d.tgz' % self.count
# open tar package , Pattern w For writing ,gz For the compression algorithm
tf = tarfile.open(tf_name, 'w:gz')
# Traverse all files in the current directory
for f_name in os.listdir('./resource/'):
if f_name.endswith('.xml'):
# xml Add files to tar In bag
print(f_name)
tf.add('resource/' + f_name)
# Delete self xml file
os.remove('resource/' + f_name)
# At present tar The bag doesn't have xml When the file is empty , Delete tar package
if not tf.getmembers():
os.remove(tf_name)
tf.close()
def run(self):
# wait for ConvertThread Send a notice
while True:
# Wait for the event conversion to complete
self.cEvent.wait()
self.tar_xml()
# Clean up events for reuse
self.cEvent.clear()
# Notify the opposite end
self.tEvent.set()
if __name__ == '__main__':
q = queue.Queue()
# Create multiple download threads
dThreads = [DownloadThread(i, q) for i in range(1, 13)]
# Create two events
cEvent = Event()
tEvent = Event()
# Create a transformation thread and a packaging thread
cThread = ConvertThread(q, cEvent, tEvent)
tThread = TarThread(cEvent, tEvent)
# Open all threads
tThread.start()
for t in dThreads:
t.start()
cThread.start()
for t in dThreads:
t.join()
# Convert Of run Exit loop
q.put((-1, None))