目錄
任務簡介
解決步驟
代碼實現
總結
大家好 我是政胤 今天教大家爬取金融期貨數據
首先,客戶原需求是獲取https://hq.smm.cn/copper網站上的價格數據(注:獲取的是網站上的公開數據),如下圖所示:
如果以該網站為目標,則需要解決的問題是“登錄”用戶,再將價格解析為表格進行輸出即可。但是,實際上客戶核心目標是獲取“滬銅CU2206”的歷史價格,雖然該網站也有提供數據,但是需要“會員”才可以訪問,而會員需要氪金......
數據的價值!!!
鑒於,客戶需求僅僅是“滬銅CU2206”一項期貨的歷史價格,氪金會員性價比不高,因此,實際的任務目標變為如何獲取的歷史價格,目標變為全網有公開提供數據的網址。而最終解決該問題,是求助於萬能的百度^_^。找到了合適的網站,且獲取數據的難度也幾乎降到了最低難度。
百度搜索資源:這個步驟是整個任務完整的最難點(實際不難),但這裡賣個關子,全文不公布最終找到的網站,大家試試看能否搜索到,以及花費多少時間^_^。
解析網站的請求,最終找到的網站經解析後,發現獲取數據是通過get的方式提交參數。而請求的參數如下:/price?starttime=1638545822&endtime=1654357022&classid=48
,一看就知是開始時間、結束時間的時間戳,以及商品id。再解析headers,居然連cookie都不需要,說明沒有反爬!沒有反爬!沒有反爬!不得不說運氣爆棚!
解析響應數據:由於響應數據是規整的json格式數據,使用pandas的read_json直接能夠獲取dataframe格式的數據,該步驟也並無難度。
鑒於網站沒有反爬,且參數簡單,實際上的任務主要是規劃一下如何設計增量更新數據信息的流程,具體代碼如下:
# @author: zheng yin
# @contact: [email protected]
"""
1. 這是爬取滬銅的程序
2. 該網站滬銅當月的數據實際請求地址是:'(實際網址)/price?starttime={starttime}&endtime={endtime}&classid={classid}'
2.1. starttime為起始日期的時間戳
2.2. endtime為結束日期的時間戳
2.3. classid為查詢商品的id
3. 該網址可以直接發起請求獲取數據
我是政胤 期待你的關注
"""
import time
from datetime import datetime
import pathlib as pl
import requests
import pandas as pd
class Spider:
"""
爬取網站數據的爬蟲對象
"""
def __init__(self, starttime: str = None, endtime: str = None, classid: int = 48):
"""
初始化對象屬性
:param starttime: 數據的起始日期,文本日期格式,示例 2022-1-1
:param endtime: 數據的結束日期,文本日期格式,示例 2022-1-1
:param classid: 商品id,默認48
"""
self.classid = classid # 商品id
self.data = pd.DataFrame() # 初始化空dataframe
self.data_file = pl.Path('./data/hutong.xlsx') # 爬取的數據存儲文件
# 列名字典
self.cols_dict = {
'createtime': '日期',
'classid': '商品',
'start': '開盤',
'end': '收盤',
'min': '最低',
'max': '最高',
'move': '漲跌',
'move_percent': '漲跌百分比'
}
# 商品id字典
self.classid_dict = {
48: 'CU2206'
}
# 獲取爬取的開始時間與結束時間
self.starttime, self.endtime = self.make_starttime_endtime(starttime=starttime, endtime=endtime)
# 初始化需要爬取的url
self.url = '(實際地址)/price?starttime={starttime}&endtime={endtime}&classid={classid}'
# 初始化headers
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
def make_starttime_endtime(self, starttime: str, endtime: str):
"""
制作起始日期,邏輯如下;
1.如果有傳入日期,則根據傳入的日期,定義起始日期與結束日期
2.如果未傳入參數,則根據讀取到的歷史數據文件中的最大日期作為起始日期、以當前日期為結束日期
3.如果未讀取到歷史數據文件,或文件中的最大日期為空,則以2021-1-1作為起始日期,以當前日期作為結束日期
:param starttime: 數據的起始日期,文本日期格式,示例 2022-1-1
:param endtime: 數據的結束日期,文本日期格式,示例 2022-1-1
:return:
"""
self.read_data() # 讀取歷史爬取數據
now = datetime.now() # 獲取當前時間的時間戳整數部分
if endtime: # 如果非空
year, month, day = endtime.split('-')
endtime = int(now.replace(year=int(year), month=int(month), day=int(day)).timestamp())
else:
endtime = int(now.timestamp())
if starttime:
year, month, day = starttime.split('-')
starttime = int(now.replace(year=int(year), month=int(month), day=int(day)).timestamp())
else:
starttime = self.data['日期'].max()
if pd.isnull(starttime): # 如果開始日期是空值
starttime = int(now.replace(year=2021, month=1, day=1).timestamp())
else:
starttime = int(
now.replace(year=starttime.year, month=starttime.month, day=starttime.day).timestamp())
return starttime, endtime
def read_data(self):
"""
讀取歷史數據
:return:
"""
if self.data_file.is_file(): # 如果歷史數據文件存在
self.data = pd.read_excel(self.data_file)
self.data['日期'] = self.data['日期'].map(lambda x: x.date())
else: # 如果歷史數據文件不存在,那麼初始化一個只有列名的dataframe,
self.data = pd.DataFrame(self.cols_dict.values()).set_index(0).T
def crawl_data(self):
"""
爬取數據
:return:
"""
retry_times = 0
while retry_times < 10: # 重試10次
try:
res = requests.get(
self.url.format(starttime=self.starttime, endtime=self.endtime, classid=self.classid),
headers=self.headers, timeout=30)
if res.status_code == 200: # 如果返回狀態至為200,進行後續數據加工
data = pd.read_json(res.text) # json格式轉換為dataframe
data['createtime'] = data['createtime'].map(lambda x: datetime.fromtimestamp(x).date()) # 時間戳日期轉換為日期
data.rename(columns=self.cols_dict, inplace=True) # 重命名列
data = data[self.cols_dict.values()] # 截取需要的列
data['商品'] = self.classid_dict.get(self.classid, '未知商品,請維護classid_dict字典') # 轉換商品名
data.sort_values(by=['商品', '日期'], ascending=True, inplace=True) # 按日期升序排序
return data
else:
retry_times += 1
print(f'返回狀態碼是 {res.status_code},等待5秒後重新發起請求')
time.sleep(5)
except Exception as e:
retry_times += 1
print(f'請求發生錯誤,等待5秒後重新發起請求, 錯誤信息: {e}')
time.sleep(5)
print('發起10次請求均未能獲得數據')
return pd.DataFrame()
def concat_and_write_data(self, data: pd.DataFrame):
"""
合並數據,並將數據寫入文件
:param data: 傳入需要合並的數據
:return:
"""
self.data = pd.concat([self.data, data]) # 合並數據
self.data = self.data.drop_duplicates(['日期', '商品'], keep='last') # 數據根據商品名稱與日期進行去重,每次保留最新的記錄
if not self.data_file.parent.is_dir(): # 檢查數據文件的目錄是否存在,如不存在則創建新目錄
self.data_file.parent.mkdir()
self.data.to_excel(self.data_file, index=False, encoding='utf-8') # 輸出數據為excel格式
def run(self):
"""
運行程序
:return:
"""
data = spider.crawl_data() # 運行爬取
if len(data) > 0: # 如果爬取到的數據不為空
self.concat_and_write_data(data)
start = str(datetime.fromtimestamp(self.starttime))[:10]
end = str(datetime.fromtimestamp(self.endtime))[:10]
print(f'{start}至{end}數據爬取任務完成')
def pivot_data(self):
"""
將數據轉換為透視表式的格式
:return:
"""
data = self.data.copy()
data['年月'] = data['日期'].map(lambda x: f'{str(x)[:7]}')
data['日'] = data['日期'].map(lambda x: x.day)
data = data.pivot_table(values='收盤', index='日', columns='年月', aggfunc='sum')
data_mean = data.mean().to_frame().T
data_mean.index = ['平均值']
data = pd.concat([data, data_mean])
data.to_excel(self.data_file.parent.parent / 'data.xlsx', encoding='utf-8')
if __name__ == '__main__':
spider = Spider()
spider.run()
spider.pivot_data()
print(spider.data)
從技術角度來看,經過一步步解析,任務是簡單的,入門requests爬蟲以及入門pandas數據分析就可以完成(唯一的難度在找到合適的目標)。但是換個角度,從經濟價值來看,又是很有價值的,即節約了某網站高昂的年費(注:並不是說年費不值得,只是局限在需求僅僅是CU2206一項數據上時,性價比太低),同時又避免了人工操作的繁瑣,以及可能產生的錯誤。用很小的學習成本就能解決大大的問題
所以,還等什麼呢?開啟Python之路吧!
我是政胤 期待你的關注