Actual case :
http://quotes.money.163.com/stock/, We got a stock in China's stock market through the website csv Data files , Now I want to download multiple stocks csv data , And convert it to xml file .
Because of the global interpreter lock , Multithreading CPU Intensive operations don't improve execution efficiency , We modify the program architecture :
[1] The use of multiple DownloadThread Thread to download (I/O operation );
[2] Use one ConvertThread Thread conversion (CPU Intensive operation );
[3] The download thread safely passes the download data to the conversion thread .
( A typical producer consumer problem , Now there is only one consumer and multiple producers )
Solution :
Use standard library queue.Queue (python2 In Chinese, it means Queue.Queue), It's a thread safe queue .
Download The thread puts the download data into the queue ,Covert Threads extract data from queues .
expand :
Python3 in Queue and queue difference :
import queue,queue.Queue(). For communication between multiple threads , Cannot be used for " Multi process " Communication between .
from multiprocessing import Queue, For data sharing between processes .
import csv
from xml.etree.ElementTree import Element, ElementTree, tostring
import requests
from io import StringIO, BytesIO
from xml_pretty import pretty
from threading import Thread
from collections import deque
import queue
# You can create double ended queues , Communicate between threads
# q = deque()
'''
Multiple threads access at the same time q It's not safe , If you want secure access, you need to lock it .
A simple approach is to use the thread safe data structure standard library queue.Queue,
This thread safe data structure implements the lock internally , Help us complete the synchronization work .
Use thread safe Queue Replace deque, Frequent use of global variables is also a bad design pattern .
'''
# Implement the download thread
class DownloadThread(Thread):
def __init__(self, sid, queue1):
Thread.__init__(self) # Call the constructor of the parent class
self.sid = sid
self.url = 'http://quotes.money.163.com/service/chddata.html?' \
'code=1%s&start=20210101&end=20220505'
# take %s Replace with 00000X
self.url %= str(sid).rjust(6, '0')
self.queue1 = queue1
# download csv data
def download(self, url):
response = requests.get(url, timeout=3)
if response.ok:
# print(response.content)
# StringIO Is a memory object that supports file operations
return StringIO(response.content.decode(encoding='gb18030'))
# Implement the thread entry point of the thread class
def run(self):
print('Download', self.sid)
# Stock data download
data = self.download(self.url)
# Transfer the downloaded data to the conversion thread ,(sid, data)
# q.append((self.sid, data))
self.queue1.put((self.sid, data))
# Implement the conversion thread
class ConvertThread(Thread):
def __init__(self, queue1):
Thread.__init__(self)
self.queue1 = queue1
# take csv The data format is converted to xml data format
def csv_to_xml(self, scsv, fxml):
reader = csv.reader(scsv)
headers = next(reader)
# print(headers)
# xml Of tag Not for Chinese , Replace it with the following label
headers = ['Date', 'Code', 'Name', 'CLose',
'High', 'Low', 'Open', 'Before',
'Price', 'Rise', 'Rate', 'Volume',
'Amount', 'Total', 'Value', 'Number']
# print(headers)
# headers = map(lambda h: h.replace(' ', ''), headers)
# print(list(headers))
root = Element('Data')
# print(tostring(root))
for row in reader:
e_row = Element('Row')
root.append(e_row)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
e_row.append(e)
pretty(root)
et = ElementTree(root)
et.write(fxml)
def run(self):
# It needs to be put into the loop because there is only one consumer thread
while True:
# Get data from the queue , Accept sid Thread number and data data
sid, data = self.queue1.get()
print('Convert', sid)
if sid == -1: # sid be equal to -1 Exit loop
break
if data:
# Convert the received data into xml
f_name = '1' + str(sid).rjust(6, '0') + '.xml'
with open('resource/' + f_name, 'wb') as wf:
self.csv_to_xml(data, wf)
if __name__ == '__main__':
q = queue.Queue()
# Create multiple download threads
dThreads = [DownloadThread(i, q) for i in range(1, 11)]
# Create a transformation thread
cThread = ConvertThread(q)
# Open all threads
for t in dThreads:
t.start()
cThread.start()
# Wait for all child processes to finish before the main process exits
for t in dThreads:
t.join()
# Convert Of run Exit loop
q.put((-1, None))