程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

【python】python異步抓取網站數據【詳細過程】

編輯:Python

項目介紹

  • askWeb/index.py 網站爬取數據類
  • database/index.py 數據庫類(數據庫封裝)
  • utils/index.py 工具文件
  • main.py 項目入口文件

1.main.py入口文件介紹

from askWeb.index import AskUrl
import datetime
# 爬取的網站網址
from database import database
from utils import utils
import asyncio,aiohttp
# 抓取網站
#urlName網站名
#url網站路徑
#requestType 請求類型
#網站類型
async def getUrlContent(urlName, url, requestType="get", webType=1):
# 開始時間
startTime = datetime.datetime.now()
print("抓取" + urlName + "開始計時...........")
await AskUrl(url).handleGetYourContent(requestType, webType)
# 結束時間
endTime = datetime.datetime.now()
lastTime = str((endTime - startTime).seconds)
print("抓取"+urlName+"總用時:" + lastTime)
# 今日熱榜的初始路徑
# "https://tophub.today"
if __name__ == "__main__":
startTime = datetime.datetime.now()
urlArr = [
{

"urlName": "b站熱榜",
"url": "https://www.bilibili.com/v/popular/rank/all",
"requestType": "get",
"type": 1,
},
{

"urlName": "微博熱榜",
"url": "https://tophub.today/n/KqndgxeLl9",
"requestType": "get",
"type": 6,
},
{

"urlName": "微信熱榜",
"url": "https://tophub.today/n/WnBe01o371",
"requestType": "get",
"type": 5,
},
{

"urlName": "抖音視頻榜",
"url": "https://tophub.today/n/WnBe01o371",
"requestType": "get",
"type": 4,
},
{

"urlName": "CSDN綜合熱榜",
"url": "https://blog.csdn.net/phoenix/web/blog/hot-rank",
"requestType": "get",
"type": 3,
},
{

"urlName": "IT資訊熱榜",
"url": "https://it.ithome.com/",
"requestType": "get",
"type": 2,
},
]
# 任務列表
task_list = []
# enumerate函數將一個可遍歷的數據對象組合為一個索引序列
for key, value in enumerate(urlArr):
future = asyncio.ensure_future(getUrlContent(value["urlName"], value["url"], value["requestType"], value["type"]))
task_list.append(future)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
# 需要注銷loop.close 因為任務還在執行時就被關閉了,所以我直接注銷loop.close。如果你有更好的方法,可以評論區留言
# loop.close()
#結束的時間
endTime = datetime.datetime.now()
#使用的時間
lastTime = str((endTime - startTime).seconds)
print("總用時:" + lastTime)

2.askWeb/index.py 網站爬取數據類

上傳數據庫需要把注釋取消,並改成自己數據庫的字段

import ssl
import time
from urllib.error import HTTPError
import aiohttp
from bs4 import BeautifulSoup # 網頁解析 獲取數據
import requests
import urllib3
from urllib import request
from http import cookiejar
from utils import utils
import database.database
import random
import json
urllib3.disable_warnings()
ssl._create_default_https_context = ssl._create_unverified_context
# 訪問網站類
class AskUrl():
##類初始化
def __init__(self, url):
self.dealSql = database.database.ConnectSql()
self.url = url
# 獲取隨機的userAgent
def handleRandomUserAgent(self):
allUserAgent = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.67 Safari/537.36 OPR/87.0.4390.45 (Edition Campaign 34)",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.42 Safari/537.36 Edg/103.0.1264.21"
,
]
return allUserAgent[random.randint(0, 3)]
# 獲取網站cookie
def getWebCookie(self):
# 聲明一個cookiejar對象實例保存cookie
cookie = cookiejar.CookieJar()
# 利用urllib中的request庫裡的HTTPCookieProcessor方法創建cookie處理器
handler = request.HTTPCookieProcessor(cookie)
# 通過cookieHandler創建opener
opener = request.build_opener(handler)
print(self.url)
# 打開網頁
try:
opener.open(self.url)
except HTTPError as e:
print("捕獲網站cookie報錯信息:%s" % e)
return ""
cookieStr = ""
for item in cookie:
cookieStr = cookieStr + item.name + "=" + item.value + ";"
return cookieStr[:-1]
# 異步協程
# 訪問網站
async def visitWeb(self, method, param="", header="", session=""):
# 關閉請求警告
requests.packages.urllib3.disable_warnings()
proxies = {

"http": None,
"https": None,
}
if header == "":
cookie = self.getWebCookie()
# print(cookie,"cookiecookiecookie")
header = {

"Cache-Control": "no-cache",
"Cookie": cookie,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,"
"application/signed-exchange;v=b3;q=0.9",
'User-Agent': self.handleRandomUserAgent(),
}
if method == 'get':
async with await session.get(self.url, data=param and param or {
}, headers=header) as resp:
page_text = await resp.content.read(999999999999999)
else:
async with await session.post(self.url, data=param and param or "", headers=header) as resp:
page_text = await resp.content.read(999999999999999)
# 編碼格式轉換,防止中文亂碼
page_text.decode("utf-8", "ignore")
# 實例化beautifulSoup對象,需要將頁面源碼數據加載到該對象中
soup = BeautifulSoup(page_text, 'html.parser')
# print(soup)
return soup
# 抓取頭部信息和標題
def handleGetWebTitleAndLogo(self):
# 1.實例化beautifulSoup對象,需要將頁面源碼數據加載到該對象中
soup = self.visitWeb("get")
try:
webTitle = soup.select("title") # 網站標題
webTitle = webTitle and webTitle[0].text or ""
except HTTPError as e:
webTitle = ""
print("網站標題報錯信息:%s" % e)
try:
webLogo = soup.select("link[type='image/x-icon']") # 網站logo
webLogo = webLogo and webLogo[0].get("href") or ""
except HTTPError as e:
webLogo = ""
print("網站logo報錯信息:%s" % e)
try:
webDescription = soup.select("meta[name='description']") # 網站描述
webDescription = webDescription and webDescription[0].get("content") or ""
except HTTPError as e:
webDescription = ""
print("網站描述報錯信息:%s" % e)
try:
webKeywords = soup.select("meta[name='keywords']") # 網站關鍵詞
webKeywords = webKeywords and (
webKeywords[0].get("content") is None and "" or webKeywords[0].get("content")) or ""
except HTTPError as e:
webKeywords = ""
print("網站關鍵詞報錯信息:%s" % e)
return {
"webTitle": webTitle, "webLogo": webLogo, "webDescription": webDescription, "webKeywords": webKeywords}
# 獲取你想要的數據 過濾網站內容
# type 抓取網站類型
async def handleGetYourContent(self, requestType="get", type=1, params=""):
""" aiohttp:發送http請求 1.創建愛你一個ClientSession對象 2.通過ClientSession對方發送get,post,put登錄請求 3.await 異步等待返回結果(程序掛起) """
async with aiohttp.ClientSession() as session:
# 1.實例化beautifulSoup對象,需要將頁面源碼數據加載到該對象中
soup = await self.visitWeb(requestType, params, session=session)
if type == 1:
await self.handleGrabBliWeb(soup)
elif type == 2:
await self.handleItHomeWeb(soup)
elif type == 3:
await self.handleGetCsdnWeb(soup)
elif type == 4:
await self.handleGetDyVideoWeb(soup)
elif type == 5:
await self.handleGetWeChatWeb(soup)
elif type == 6:
await self.handleGetWeiBoWeb(soup)
print('操作完成')
# 微博熱搜榜
async def handleGetWeiBoWeb(self, soup, num=1, page=0):
# 2.通過標簽獲取列表裡的數據
li_list = soup.select(".table")[0].select("tbody>tr")
# 循環遍歷列表數據
for item in li_list:
href = item.select(".al a")[0].get("href") # 訪問路徑
title = item.select(".al a")[0].text # 標題
hotData = item.select("td")[2].text
# res = self.dealSql.handleInsert(table="g_hot_list", title=title, url=href, hot_num=hotData, type=5,
# add_time=utils.FormatDate(), update_time=utils.FormatDate())
res = True
if res:
data = "第 %s 條插入成功:標題: %s 訪問量: %s 訪問路徑:%s" % (num, title, hotData, href)
print(data)
else:
data = "第 %s 條插入失敗"
print(data)
# time.sleep(1)
num += 1
# 微信熱文熱榜
async def handleGetWeChatWeb(self, soup, num=1):
# 2.通過標簽獲取列表裡的數據
li_list = soup.select(".table")[0].select("tbody>tr")
# 循環遍歷列表數據
for item in li_list:
href = item.select(".al a")[0].get("href") # 訪問路徑
title = item.select(".al a")[0].text # 標題
hotData = item.select("td")[2].text
hotData = hotData.split(" ")[0] # 熱度
# res = self.dealSql.handleInsert(table="g_hot_list", title=title, url=href, hot_num=hotData, type=5,
# add_time=utils.FormatDate(), update_time=utils.FormatDate())
res = True
if res:
data = "第 %s 條插入成功:標題: %s 訪問量: %s 訪問路徑:%s" % (num, title, hotData, href)
print(data)
else:
data = "第 %s 條插入失敗"
print(data)
# time.sleep(1)
num += 1
# 抖音短視頻熱榜
async def handleGetDyVideoWeb(self, soup, num=1):
# 2.通過標簽獲取列表裡的數據
li_list = soup.select(".table")[0].select("tbody>tr")
# 循環遍歷列表數據
for item in li_list:
href = item.select(".al a")[0].get("href") # 訪問路徑
title = item.select(".al a")[0].text # 標題
hotData = item.select("td")[2].text # 熱度
# res = self.dealSql.handleInsert(table="g_hot_list", title=title, url=href, hot_num=hotData, type=4,
# add_time=utils.FormatDate(), update_time=utils.FormatDate())
res = True
if res:
data = "第 %s 條插入成功:標題: %s 訪問量: %s 訪問路徑:%s" % (num, title, hotData, href)
print(data)
else:
data = "第 %s 條插入失敗"
print(data)
# time.sleep(1)
num += 1
# csdn文章熱榜
async def handleGetCsdnWeb(self, soup, num=1, page=0):
# 2.通過API獲取列表裡的數據
# 字符串轉為數組
li_list = json.loads(str(soup),strict=False)["data"]
# 循環遍歷列表數據
for item in li_list:
href = item["articleDetailUrl"] # 訪問路徑
title = item["articleTitle"] # 訪問標題
hotData = item["hotRankScore"] # 熱度
# res = self.dealSql.handleInsert(table="g_hot_list", title=title, url=href, hot_num=hotData, type=3,
# add_time=utils.FormatDate(), update_time=utils.FormatDate())
res = True
if res:
data = "第 %s 條插入成功:標題: %s 熱度量:%s 訪問路徑:%s" % (num, title, hotData, href)
print(data)
else:
data = "第 %s 條插入失敗"
print(data)
# time.sleep(1)
num += 1
if page < 4:
curPage = page + 1
async with aiohttp.ClientSession() as session:
soup = await self.visitWeb("get", {
"page": curPage, "pageSize": 25, "type": ""}, session=session)
return await self.handleGetCsdnWeb(soup, num, curPage)
# b站熱榜
async def handleGrabBliWeb(self, soup, num=1):
# 2.通過標簽獲取列表裡的數據
li_list = soup.select(".rank-list-wrap>ul>li")
# 循環遍歷列表數據
for item in li_list:
href = item.select(".info a")[0].get("href") # 訪問路徑
title = item.select(".info a")[0].text # 標題
# "".join()去除空格
hotData = "".join(item.select(".info .detail-state .data-box")[0].text.split()) # 播放量
if href.find("//", 0) >= 0:
href = href.split("//")[1]
# res = self.dealSql.handleInsert(table="g_hot_list", title=title, url=href, hot_num=hotData, type=1,
# add_time=utils.FormatDate(), update_time=utils.FormatDate())
res = True
if res:
data = "第 %s 條插入成功:標題: %s 訪問量: %s 訪問路徑:%s" % (num, title, hotData, href)
print(data)
else:
data = "第 %s 條插入失敗"
print(data)
# time.sleep(1)
num += 1
# it之家熱榜
async def handleItHomeWeb(self, soup, num=1, nexPage=1):
# 第一頁
if nexPage == 1:
# 2.通過標簽獲取列表裡的數據
li_list = soup.select(".fl>ul>li")
# 第二頁以後
else:
li_list = soup.select("li")
# 循環遍歷列表數據
for item in li_list:
href = item.select("a[class='img']")[0].get("href") # 訪問路徑
title = item.select("a[class='img']")[0].select("img")[0].get("alt") # 標題
# res = self.dealSql.handleInsert(table="g_hot_list", title=title, url=href, type=2,
# add_time=utils.FormatDate(), update_time=utils.FormatDate())
res = True
if res:
data = "第 %s 條插入成功:標題: %s 訪問路徑:%s" % (num, title, href)
print(data)
else:
data = "第 %s 條插入失敗"
print(data)
# print(data)
# time.sleep(1)
num += 1
if nexPage == 1:
nexPageUrl = "https://it.ithome.com/category/domainpage"
header = {

"Cache-Control": "no-cache",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9"}
param = {
"domain": "it", "subdomain": "", "ot": int(time.time()) * 1000}
async with aiohttp.ClientSession() as session:
resData = await AskUrl(nexPageUrl).visitWeb("post", param=param, header=header, session=session)
# 獲取數據時轉換json數據需要設置strict=False,否則報錯
soup = BeautifulSoup(json.loads(str(resData),strict=False)["content"]["html"], 'html.parser')
return await self.handleItHomeWeb(soup, num, nexPage + 1)

3.utils/index.py 工具文件

import time
##時間轉換
def FormatDate(timeNow="",fmt="%Y-%m-%d %H:%M:%S"):
if timeNow=="":
# 獲取當前時間
timeNow = int(time.time())
# 轉換成localtime
time_local = time.localtime(timeNow)
# 轉換成新的時間格式(2016-05-09 18:59:20)
dt = time.strftime(fmt, time_local)
return dt
# 2022-1-1轉為時間戳
def time_to_str(val):
return int(time.mktime(time.strptime(val, "%Y-%m-%d")))
# 當前時間戳
def cur_time_to_str():
return int(time.mktime(time.localtime(time.time())))

4.database/index.py 數據庫類(數據庫封裝)

需要填寫自己的ip,mysql賬號,密碼


# 鏈接數據庫類
class ConnectSql():
# 成員屬性
# 鏈接myql的ip地址
__host = "xxxx"
# 鏈接mysql的賬號
__user = "xxxx"
# 鏈接mysql的密碼
__passwd = "xxxxx"
# mysql端口號
__port = 3306
# 數據庫名稱
__db = "xxxx"
# 字符編碼
__charset = "utf8"
cur = ""
# 構造函數//類初始化信息
def __init__(self):
try:
# 連接數據庫
self.conn = pymysql.connect(host=self.__host,user=self.__user, password=self.__passwd,port=self.__port,database=self.__db,charset= self.__charset)
self.cur = self.conn.cursor() # 生成游標對象
except pymysql.Error as e:
print("鏈接錯誤:%s" % e)
# 摧毀對象,釋放空間內存
def __del__(self):
print("摧毀")
# 關閉數據庫
def closedatabase(self):
# 如果數據打開,則關閉;否則沒有操作
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 執行execute方法,返回影響的行數
def handleExcute(self, sql):
try:
self.cur.execute(sql) # 執行插入的sql語句
self.conn.commit() # 提交到數據庫執行
count = self.cur.rowcount
if count > 0:
return True
else:
return False
except TypeError:
print("錯誤內容:", TypeError)
# 執行mysql失敗,事務回滾
self.conn.rollback()
self.closedatabase()
return False
# 執行sql語句
# 執行源生sql語句
def dealMysql(self, dataSql):
self.handleExcute(dataSql)
# 插入數據
def handleInsert(self, **params):
table = "table" in params and params["table"] or ""
sql = "INSERT INTO %s(" % table
del params["table"]
fields = ""
values = ""
for k, v in params.items():
fields += "%s," % k
# 判斷數據類型,插入對應的數據
if type(v) == type("test"):
values += "'%s'," % v
elif type(v) == type(1):
values += "%s," % v
fields = fields.rstrip(',')
values = values.rstrip(',')
sql = sql + fields + ")values(" + values + ")"
print(sql, "handleUpdate")
return self.handleExcute(sql)
# 刪除數據
def handleDel(self, **params):
table = "table" in params and params["table"] or ""
where = "where" in params and params["where"] or ""
sql = "DELETE FROM %s WHERE %s " % (table, where)
print(sql, "handleUpdate")
return self.handleExcute(sql)
# 編輯數據
def handleUpdate(self, **params):
table = "table" in params and params["table"] or ""
where = "where" in params and params["where"] or ""
params.pop("table")
params.pop("where")
sql = "UPDATE %s SET " % table
for k, v in params.items():
# 判斷數據類型,插入對應的數據
if type(v) == type("test"):
sql += "%s='%s'" % (k, v)
elif type(v) == type(1):
sql += "%s=%s" % (k, v)
sql += "WHERE %s" % where
print(sql, "handleUpdate")
return self.handleExcute(sql)
# 查詢多條數據
def handleFindAllData(self, **params):
# table fields where order limit
table = "table" in params and params["table"] or ""
where = "where" in params and "WHERE " + params["where"] or ""
field = "field" in params and params["field"] or "*"
order = "order" in params and "ORDER BY " + params["order"] or ""
sql = "SELECT %s FROM %s %s %s %s" % (field, table, where, order)
print(sql, "handleFindAllData")
return self.handleExcute(sql)
# 查詢單條數據
def handleFindOneData(self, **params):
# table fields where order limit
table = "table" in params and params["table"] or ""
where = "where" in params and "WHERE " + params["where"] or ""
field = "field" in params and params["field"] or "*"
order = "order" in params and "ORDER BY " + params["order"] or ""
sql = "SELECT %s FROM %s %s %s %s LIMIT 1" % (field, table, where, order)
print(sql,"handleFindOneData")
return self.handleExcute(sql)

項目源碼下載地址:https://download.csdn.net/download/qq_36977923/85762109?spm=1001.2014.3001.5501

踩 坑 不 易 , 還 希 望 各 位 大 佬 支 持 一 下 \textcolor{gray}{踩坑不易,還希望各位大佬支持一下} 踩坑不易,還希望各位大佬支持一下

個 人 主 頁 : \textcolor{green}{個人主頁:} 個人主頁:沉默小管

個 人 網 站 : \textcolor{green}{個人網站:} 個人網站:沉默小管

技 術 交 流 Q Q 群 : 837051545 \textcolor{green}{技術交流QQ群:837051545} 技術交流QQ群:837051545

點 贊 , 你 的 認 可 是 我 創 作 的 動 力 ! \textcolor{green}{點贊,你的認可是我創作的動力!} 點贊,你的認可是我創作的動力!

️ 收 藏 , 你 的 青 睐 是 我 努 力 的 方 向 ! \textcolor{green}{收藏,你的青睐是我努力的方向!} 收藏,你的青睐是我努力的方向!

️ 評 論 , 你 的 意 見 是 我 進 步 的 財 富 ! \textcolor{green}{評論,你的意見是我進步的財富!} 評論,你的意見是我進步的財富!

如果有不懂可以留言,我看到了應該會回復
如有錯誤,請多多指教


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved