您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

[Python engineers high performance crawler]



How to be in spiders Use in Asynchronous operations Achieve high performance Data crawling
First, let's talk about the way of asynchronous crawler :

  1. Multithreading 、 Multi process ( Don't suggest ):
    disadvantages : unable unlimited On Multithreading or multiprocessing
    advantage : It can be used separately for method classes that are related to blocking Open thread or process , So as to realize the asynchronous execution of scripts
  2. Thread pool 、 The process of pool ( Use properly ):
    disadvantages : Thread pool or process pool There is an upper limit to the quantity Of .
    advantage : Fixed the number of threads and processes , So as to reduce the number of processes or threads created and destroyed by the system , It can be very good Reduce system overhead .
  3. Single thread + Asynchronous coroutine ( recommend ):
    Some concepts and two keywords :
    ①event_loop( The event loop ): It's like an infinite loop , We can register some functions on this event loop , When certain conditions are met , The function will be executed in a loop .
    ②coroutline( Coroutine object ): We can register the coroutine object in the event loop , It will be called by the event loop . We can use async Keyword to define a method , This method will not be executed immediately when called , Instead, it returns a coroutine object .
    ③task( Mission ):, It is a further encapsulation of the coroutine object , Contains the various states of the task .
    ④future( Mission ): Represents tasks to be or not to be performed in the future , Actually and task There is no essential difference .
    ⑤async( coroutines ): Define a coroutine .
    ⑥await( Waiting for execution ): Used to suspend the execution of a blocking method



await The statement must be followed by a Can wait for the object , There are three kinds of waiting objects :Python coroutines ,Task,Future. In general, it is not necessary to create in application level code Future object .


 coroutines (Coroutine), Also called tasklet , fibers . We usually think of threads as lightweight processes , Therefore, we also understand the process as a lightweight thread, that is, a micro thread .
The function of a coroutine is to execute a function A Can be interrupted at any time to execute functions B, Then interrupt the function B Continue executing functions A( You can switch freely ).
The interruption here , Not a function call , It's kind of like CPU The interrupt . This whole process looks like multithreading , However, there is only one thread in the process .
The advantages of synergy
Very efficient execution , Because it's a subroutine ( function ) Switching is not thread switching , Controlled by the program itself , There is no cost of switching threads . So compared to multithreading , The more threads ,
The more obvious the performance advantages of coprocessing .
No lock mechanism is needed , Because there's only one thread , There is no conflict between writing variables at the same time , There is no need to lock when controlling shared resources , Just judge the State , Therefore, the implementation efficiency is much higher .
The process can handle IO The efficiency of intensive programs , But it is not suitable for processing CPU Intensive program , If we want to give full play to CPU Utilization should be combined with multiple processes + coroutines .


asyncio yes Python3.4 A standard library introduced , Direct built-in for asynchronous IO Support for .asyncio Module provides a tool to build concurrent applications by using CO process . It uses a single thread
Single process concurrency , All parts of the application work together , Switching tasks that can be displayed , It's usually blocked in the program I/O Context switching occurs during operation, such as waiting for reading and writing files ,
Or request network . meanwhile asyncio It also supports scheduling code to run at a specific event in the future , Thus, it supports one process waiting for another process to complete , To process system signals and identify their
He has some events .
stay asyncio Although the synchronization code used in the program will not report an error , But it also loses the meaning of concurrency , For example, network request , If you use a that only supports synchronization requests,
After a request is initiated, no other request can be initiated before the response result is received , When you want to visit multiple web pages concurrently , Even if asyncio, Sending a request
after , Switching to another collaboration will still be blocked due to synchronization problems , There can be no improvement in speed , At this time, you need other request libraries that support asynchronous operations, such as aiohttp.

Single threaded crawler

Use here requests request ,requests Is a class library for synchronous requests

import requests
headers = {

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36'
def get_content(url):
response = requests.get(url=url, headers=headers)
if response.status_code == 200:
return response.content
def parse_content(content):
print(' The length of the corresponding data is :', len(content))
if __name__ == "__main__":
urls = [
for url in urls:
content = get_content(url)


asyncio yes Python Asynchronous in IO library , Used to write concurrent procedures , Apply to IO Blocking and requiring a lot of concurrency , Like reptiles 、 File read and write .

asyncio stay Python3.4 By introducing , After several iterations , characteristic 、 Grammar sugar has been improved to varying degrees , This also makes different versions Python stay asyncio There are different usages of , It looks a little messy , In the past, it was also based on the principle of being able to use , I took some detours in writing , Right now Python3.7+ and Python3.6 in asyncio Make a comb of the usage of , So that it can be better used in the future

import asyncio
async def request(url):
return url
c = request('www.baidu.com')
def callback_func(task):
# Bind a callback 
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
# Bind the callback function to the task object 

Single thread asynchronous co process implementation

stay request On the basis of Use asynchronous IO Library asyncio

import requests
import asyncio
import time
start = time.time()
urls = [
async def get_page(url):
print(' Downloading ', url)
response = requests.get(url)
print(' The download ', response.text)
tasks = []
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
print(' Total time :', time.time()-start)

Thread pool crawls data

from multiprocessing.dummy import Pool as Pool
import time
def func(msg):
print('msg:', msg)
# Three threads 
pool = Pool(processes=3)
for i in range(1, 5):
msg = 'hello %d' % (i)
# Non blocking 
pool.apply_async(func, (msg,))
# Blocking ,apply() From built-in function , Used for calling functions indirectly , And pass Yuanzu or dictionary as a parameter by position .
# pool.apply(func,(msg,))
# Non blocking , Pay attention to and apply The difference between the transmitted parameters 
# pool.imap(func,[msg,])
# Blocking 
# pool.map(func, [msg, ])

Here's a demonstration of aiohttp Realize multi task asynchronous collaboration

aiohttp It's a building on asyncio Upper , Support both http And support websocket A library . And support both client and server .

import asyncio
import logging
import time
import json
from threading import Thread
from aiohttp import ClientSession, ClientTimeout, TCPConnector, BasicAuth
import base64
from urllib.parse import unquote, quote
# Default request header 

'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
# "User-Agent": "curl/7.x/line",
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
# Default timeout 
def start_loop(loop):
class AioCrawl:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.proxyServer = None
# Start the event cycle 
self.event_loop = asyncio.new_event_loop()
self.t = Thread(target=start_loop, args=(self.event_loop,))
self.concurrent = 0 # Record concurrency 
async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None):
""" Collect fiber path :param url: str :param method: 'GET' or 'POST' :param headers: dict() :param timeout: int :param cookies: :param data: dict() :param proxy: str :return: (status, content) """
method = 'POST' if method.upper() == 'POST' else 'GET'
headers = headers if headers else HEADERS
timeout = ClientTimeout(total=timeout)
cookies = cookies if cookies else None
data = data if data and isinstance(data, dict) else {
proxy = proxy if proxy else self.proxyServer
tcp_connector = TCPConnector(limit=64) # Disable certificate validation 
async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session:
if method == 'GET':
async with session.get(url, proxy=proxy) as response:
content = await response.read()
return response.status, content
async with session.post(url, data=data, proxy=proxy) as response:
content = await response.read()
return response.status, content
except Exception as e:
raise e
def callback(self, future):
""" Callback function 1. Process and convert to Result object 2. Write database """
msg = str(future.exception()) if future.exception() else 'success'
code = 1 if msg == 'success' else 0
status = future.result()[0] if code == 1 else None
data = future.result()[1] if code == 1 else b'' # Empty string 
data_len = len(data) if data else 0
if code == 0 or (status is not None and status != 200): # Print small exceptions 
self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format(
future.url, code, msg, status, data_len))
self.concurrent -= 1 # Concurrency number -1
return data
def add_tasks(self, tasks, method='GET', data=None, headers=None):
""" Add tasks :param tasks: list <class Task> :return: future """
resultList = []
for task in tasks:
headers = headers if headers else HEADERS
# asyncio.run_coroutine_threadsafe Receive a coroutine object and , Event loop object 
future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers), self.event_loop)
future.add_done_callback(self.callback) # to future Object to add a callback function 
self.concurrent += 1 # Concurrent number plus 1
result = future.result()
# print(result)
resultList.append(str(result[1], encoding="utf-8"))
return resultList
def add_one_tasks(self, task, headers=None, method='GET', data=None, proxy=None):
""" Add tasks :param tasks: list <class Task> :return: future """
future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers, proxy=proxy), self.event_loop)
future.add_done_callback(self.callback) # to future Object to add a callback function 
result = future.result()
return [str(result[1], encoding="utf-8")]
def getProductParm(self, productguid):
base = '{"productguid":"%s","areacode":"","referer":"https://zc.plap.mil.cn/productdetail.html?productguid=%s"}' % (
productguid, productguid)
# code 
base_d = quote(base)
return str(base64.b64encode(base_d.encode("utf-8")), "utf-8")
if __name__ == '__main__':
a = AioCrawl()
headers = {

"Host": "api.erp.idodb.com",
"Accept": "application/json",
"Content-Type": "application/json;charset=UTF-8",
"token": "f62f837d0c9fda331fd6ce35d0017a16",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"
data = {
"ware_name": " masks ", "ware_model": "", "ware_brand_name": " Han Dun ", "pagesize": 10, "pageindex": 2,
"sc_id": "4A6F7946-0704-41B2-8027-2CC13B6E96F2"}
result = a.add_one_tasks(
method="POST") # Simulate dynamically adding tasks 

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved