Preface
Update instructions
1. There is no difference between the upper and lower text variables
2. How to use contextvars modular
3. How to use it gracefully contextvars
4.contextvars Principle
4.1 ContextMeta,ContextVarMeta and TokenMeta
4.2 Token
4.3 Globally unique context
4.4contextvar Self encapsulated Context
4.5 ContextVar
5.contextvars asyncio
5.1 stay asyncio In order to get context
5.2 Operation on context
5.2 copying_task_factory and chainmap_task_factory
6. summary
Preface stay Python3.7 Later, the official library appeared contextvars
modular , Its main function is to support multithreading and asyncio Ecological add context function , Even if the program runs concurrently with multiple coroutines , You can also call the context variable of the program , So as to decouple our logic .
Context , It can be understood as the context in which we speak , In the middle of a conversation , Some words are out of context , His meaning changed , The same is true of program operation . There is also its context in the thread , It's just called a stack , If in python Is saved in thread.local variable , And Xie Cheng has his own context , But it didn't show up , But there is. contextvars
After the module, we can pass contextvars
Module to save and read .
Use contextvars
The benefits of not only prevent ’ A variable spreads all over the sky ’ Something happened outside , It can also be well combined TypeHint, Can make their own code can be mypy as well as IDE Check , Make your code more adaptable to engineering .
But with contextvars
Then there will be some more implicit calls , We need to solve these hidden costs .
Switch web frame sanic
by starlette
Add a self written and can be used for starlette
,fastapi
Of context explain
to update fast_tools.context The latest example and simple modification of the wording .
1. There is no difference between the upper and lower text variables If it works Flask
frame , You know the Flask
Have their own contextual functions , and contextvars Like it , And it also adds the right to asyncio The context of .Flask
The context of is based on threading.local
Realized , threading.local
The isolation effect is very good , But it is only for threads , Only isolate the data state between threads , and werkzeug
In order to support in gevent
Run in , I realized a Local
Variable , frequently-used Flask
Context variable request
An example of this is :
from flask import Flask, requestapp = Flask(__name__)@app.route('/')def root(): so1n_name = request.get('so1n_name') return f'Name is {so1n_name}'
Expanding reading : About Flask Context details
In contrast to that Python
Another classic of Web frame Djano
, It has no context support , Therefore, it can only display the transmission request
object , Examples are as follows :
from django.http import HttpResponsedef root(request): so1n_name = request.get('so1n_name') return HttpResponse(f'Name is {so1n_name}')
Through the comparison of the above two, we can find , stay Django
in , We need to show that the transmission is called request The variable of , and Flask
It is import One is called request Global variable of , And use it directly in the view , Achieve decoupling .
Some may say , That is, the difference between passing a variable , To save this variable , It takes a lot of effort to maintain a context variable , It's not worth it , Take a look at the following example , If there are many levels, there will be ’ One parameter is transmitted for one day ’ The situation of ( However, if the layering is done well or the demand is not satisfied, the following situations will not occur , A good programmer can do a good job of code layering , But there may also be times when there is a pile of rotten demand )
# Pseudo code , For example, one request Yes 3 A function from django.http import HttpResponsedef is_allow(request, uid): if request.ip == '127.0.0.1' and check_permissions(uid): return True else: return Falsedef check_permissions(request, uid): passdef root(request): user_id = request.GET.get('uid') if is_allow(request, id): return HttpResponse('ok') else return HttpResponse('error')
Besides , Except for prevention One parameter is transmitted for one day
Besides this question , Through context , Some decoupling can be done , For example, one of the most classic technical business requirements is to print logs request_id, So as to facilitate link troubleshooting , In this case, if there is a context module , You can read and write request_id To decouple , For example, the following is based on Flask
Frame reading and writing request_id Example :
import loggingfrom typing import Anyfrom flask import g # type: ignorefrom flask.logging import default_handler# This is a Python logging.Filter The object of , The log will go through before it is generated Filter step , At this time we can bind for him request_id Variable class RequestIDLogFilter(logging.Filter): """ Log filter to inject the current request id of the request under `log_record.request_id` """ def filter(self, record: Any) -> Any: record.request_id = g.request_id or None return record# Configuration log format Format , There is an extra one here request_id Variable format_string: str = ( "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s")# by flask Default logger Set up format And add one logging.Filter object default_handler.setFormatter(logging.Formatter(format_string))default_handler.addFilter(RequestIDLogFilter())# This method is used to set request_iddef set_request_id() -> None: g.request_id = request.headers.get("X-Request-Id", str(uuid4()))# initialization FLask object , And set up before_requestapp: Flask = Flask("demo")app.before_request(set_request_id)
2. How to use contextvars modular Here is an example , But there are other solutions for this example . Just by the way, through this example, how to use contextvar modular
First look at the unused contextvars
when ,asyncio Of web How the framework passes variables , according to starlette
Documents , When not in use contextvars
when , Pass on Redis
The method of client instance is through request.stat This variable holds Redis
Client instance , Rewrite the code as follows :
# demo/web_tools.py# Save variables through Middleware class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: request.stat.redis = REDIS_POOL response = await call_next(request) return response# demo/server.py# Call variables @APP.route('/')async def homepage(request): # Pseudo code , Here's execution redis command await request.stat.redis.execute() return JSONResponse({'hello': 'world'})
The code is very simple , It can also operate normally , But the next time you refactor , For example, simply put redis The variable name is changed to new_redis, that IDE Can't recognize , It needs to be changed one by one . meanwhile , When writing code , IDE Never know what type of variable this method calls , IDE Nor can I intelligently help you check ( Such as the input request.stat.redis. when ,IDE There will be no execute, Or when something goes wrong ,IDE It doesn't prompt ). This is very unfavorable to the engineering of the project , And by contextvars
and TypeHints
, Just can solve this problem .
Said so much , Here's a Redis
client As an example , Show how to be in asyncio Used in ecology contextvars
, And introduce TypeHints
( See code for detailed explanation ).
# demo/context.py# This document is stored contextvars relevant import contextvarsif TYPE_CHECKING: from demo.redis_dal import RDS # Here is a redis Encapsulation examples of # Initialize a redis Relevant global contextredis_pool_context = contextvars.ContextVar('redis_pool')# Through function calls, you can get the current coroutine runtime context Context def get_redis() -> 'RDS': return redis_pool_context.get()# demo/web_tool.py# This document is stored starlette Related modules from starlette.middleware.base import BaseHTTPMiddlewarefrom starlette.requests import Requestfrom starlette.middleware.base import RequestResponseEndpointfrom starlette.responses import Responsefrom demo.redis_dal import RDS# Initialize a redis Client variables , Currently empty REDIS_POOL = None # type: Optional[RDS]class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: # Through middleware , Before entering the route , hold redis The client is placed in the context of the current collaboration token = redis_pool_context.set(REDIS_POOL) try: response = await call_next(request) return response finally: # Call complete , Reclaim the currently requested settings redis The context of the client redis_pool_context.reset(token)async def startup_event() -> None: global REDIS_POOL REDIS_POOL = RDS() # Initialize client , Inside through asyncio.ensure_future Logical deferred connection async def shutdown_event() -> None: if REDIS_POOL: await REDIS_POOL.close() # close redis client # demo/server.py# This document is stored starlette main Logic from starlette.applications import Starlettefrom starlette.responses import JSONResponsefrom demo.web_tool import RequestContextMiddlewarefrom demo.context import get_redisAPP = Starlette()APP.add_middleware(RequestContextMiddleware)@APP.route('/')async def homepage(request): # Pseudo code , Here's execution redis command # Just verify id(get_redis()) be equal to demo.web_tool in REDID_POOL Of id Agreement , That proof contextvars It can be for asyncio Maintain a set of context States await get_redis().execute() return JSONResponse({'hello': 'world'})
3. How to use it gracefully contextvars From the example code above , Use contextvar
and TypeHint
Can really make IDE You can identify what this variable is , But too much code was added , What's worse , Every extra variable , You need to write one by yourself context, Initialization of a variable , Of a variable get function , At the same time, it is awkward to use functions in reference .
I am using contextvars
After a while , I think it's too much trouble , Do a lot of repetitive operations every time , And the most commonly used is to extract an instance Headers The parameters of the contextvars in , So I wrote a package fast_tools.context( Compatible with fastapi
and starlette
), It can shield all and contextvars Related logic of , Among them is by ContextModel be responsible for contextvars Of set and get operation ,ContextMiddleware management contextvars The cycle of ,HeaderHeader Responsible for hosting Headers Related parameters , The caller only needs to be in ContextModel Write the variables you need in , Called on reference ContextModel The properties of .
The following is a code example for the caller , The instantiation variable here consists of a http client Instead of , And will allocate one client instance each time , However, in practice, a client instance will not be allocated to each request , It really affects performance :
import asyncioimport uuidfrom contextvars import Context, copy_contextfrom functools import partialfrom typing import Optional, Setimport httpxfrom fastapi import FastAPI, Request, Responsefrom fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelperapp: FastAPI = FastAPI()check_set: Set[int] = set()class ContextModel(ContextBaseModel): """ Through this instance, most of the data associated with contextvars Related operations , If you want to add a variable , Add an attribute to the instance . Property must use Type Hints Writing , Or you won't recognize ( Compulsory use Type Hints) """ # Used to put your own instance ( As mentioned above redis client ) Stored in contextvars in http_client: httpx.AsyncClient # HeaderHepler Used for holding header The variables of are stored in contextvars in request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4())) ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host) user_agent: str = HeaderHelper.i("User-Agent") async def before_request(self, request: Request) -> None: # Hook before request , You can set your own variables through this hook self.http_client = httpx.AsyncClient() check_set.add(id(self.http_client)) async def before_reset_context(self, request: Request, response: Optional[Response]) -> None: # Prepare to exit the hook of the middleware , This step will clear the context await self.http_client.aclose()context_model: ContextModel = ContextModel()app.add_middleware(ContextMiddleware, context_model=context_model)async def test_ensure_future() -> None: assert id(context_model.http_client) in check_setdef test_run_in_executor() -> None: assert id(context_model.http_client) in check_setdef test_call_soon() -> None: assert id(context_model.http_client) in [email protected]("/")async def root() -> dict: # In the use of asyncio.ensure_future When starting another subprocess running task , Context can also be reused asyncio.ensure_future(test_ensure_future()) loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop() # Use call_soon Can also reuse context loop.call_soon(test_call_soon) # Use run_in_executor Can also reuse context , But the context must be used run Method , copy_context Means to copy the current context ctx: Context = copy_context() await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor)) # type: ignore return { "message": context_model.to_dict(is_safe_return=True), # not return CustomQuery "client_id": id(context_model.http_client), }if __name__ == "__main__": import uvicorn # type: ignore uvicorn.run(app)
You can see from the example , Calling through the encapsulated context can be very enjoyable , You can set your own context properties in just oneortwo steps , And don't worry about how to write the lifecycle of the context . In addition, we can see from this example , stay asyncio In the ecological , contextvars Can be applied to include sub processes , Multithreading and so on .
4.contextvars PrincipleAt first use , I'm curious contextvars How to maintain the context of the program , Fortunately contextvars The author of has come up with a downward compatible contextvars library , Although he doesn't support asyncio, But we can still understand its basic principle through the code .
4.1 ContextMeta,ContextVarMeta and TokenMeta There are... In the code repository ContextMeta
,ContextVarMeta
and TokenMeta
These objects , Their function is to prevent users from inheriting Context
,ContextVar
and Token
, The principle is to judge whether the class name is the name of the class you write by metaclass , If not, throw it wrong .
class ContextMeta(type(collections.abc.Mapping)): # contextvars.Context is not subclassable. def __new__(mcls, names, bases, dct): cls = super().__new__(mcls, names, bases, dct) if cls.__module__ != 'contextvars' or cls.__name__ != 'Context': raise TypeError("type 'Context' is not an acceptable base type") return cls
4.2 Token The essence of context is a stack , Every time set An object adds a layer of data to the stack at a time , Every time reset Namely pop Drop the top data , And in the Contextvars
in , adopt Token
Object to maintain the interaction between stacks .
class Token(metaclass=TokenMeta): MISSING = object() def __init__(self, context, var, old_value): # Store context variables respectively , At present set And last time set The data of self._context = context self._var = var self._old_value = old_value self._used = False @property def var(self): return self._var @property def old_value(self): return self._old_value def __repr__(self): r = '<Token ' if self._used: r += ' used' r += ' var={!r} at {:0x}>'.format(self._var, id(self)) return r
You can see Token
There is very little code for , It only saves the current context
Variable , This call set And the last time it was set Old data of . The user can only call contextvar.context
Before you get Token
, Back to Token
Can be called by the user context after , By calling context.reset(token) To clear the saved context , Convenient for this context Variables of can be recycled in time , Go back to the last data .
As I said before , Python Zhongyou threading.local()
Responsible for each thread context, A coroutine belongs to a thread ’ A subset of ’, therefore contextvar Based on the direct threading.local()
Generate your own global context. You can see from his source code , _state
Namely threading.local()
References to , And by setting and reading _state
Of context
Property to write and read the current context , copy_context
The call is also simple , It is also called to threading.local()
API.
def copy_context(): return _get_context().copy()def _get_context(): ctx = getattr(_state, 'context', None) if ctx is None: ctx = Context() _state.context = ctx return ctxdef _set_context(ctx): _state.context = ctx_state = threading.local()
About threading.local()
, Although not the focus of this article , But because of contextvars
Is based on threading.local()
Encapsulated , So we still need to understand threading.local()
Principle , There is no direct source code analysis , Instead, make a simple example to explain .
Using thread local variables in a thread is better than using global variables directly , Because local variables can only be seen by the thread itself , Does not affect other threads , The global variables must be locked , The performance will be very poor , For example, the following example of global variables :
pet_dict = {}def get_pet(pet_name): return pet_dict[pet_name]def set_pet(pet_name): return pet_dict[pet_name]
This code mimics a simple global variable call , If it is a multi-threaded call , Then you need to lock it , Each time before reading or writing, the thread holding the lock must wait until it gives up the lock before competing , It may also contaminate the data stored by other threads .
The thread's local variables let each thread have its own pet_dict
, Suppose each thread calls get_pet
,set_pet
when , Will put their own pid Come in , Then you can avoid multiple threads competing for resources at the same time , At the same time, it will not pollute the data of other threads , Then the code can be changed to this :
pet_dict = {}def get_pet(pet_name, pid): return pet_dict[pid][pet_name]def set_pet(pet_name, pid): return pet_dict[pid][pet_name]
But it is very convenient to use , At the same time, the example does not deal with exception checking and initialization , If the value is complex , We also have to maintain abnormal conditions , It's too much trouble .
Now threading.local()
And that's what happened , He is responsible for helping us with the maintenance work , We just need to make some calls to him , Calling is as simple and convenient as single thread calling , application threading.local()
The following is the code :
import threadingthread_local=threading.local()def get_pet(pet_name): return thread_local[pet_name]def set_pet(pet_name): return thread_local[pet_name]
You can see that the code is like calling a global variable , But there will be no competition .
4.4contextvar Self encapsulated Contextcontextvars
Self encapsulated Context Relatively simple , Here are just two of his core methods ( Other magic methods are like dict
The same magic method ):
class Context(collections.abc.Mapping, metaclass=ContextMeta): def __init__(self): self._data = immutables.Map() self._prev_context = None def run(self, callable, *args, **kwargs): if self._prev_context is not None: raise RuntimeError( 'cannot enter context: {} is already entered'.format(self)) self._prev_context = _get_context() try: _set_context(self) return callable(*args, **kwargs) finally: _set_context(self._prev_context) self._prev_context = None def copy(self): new = Context() new._data = self._data return new
First , stay __init__
And the way you can see self._data, Here we use a name called immutables.Map() Immutable object of , Also on immutables.Map() Do some encapsulation , therefore context It can be regarded as an immutable dict. This prevents calls to copy The context change after the method will affect the original context variable .
see immutables.Map() As you can see in the sample code of , Every time the original object is modified , The original object does not change , And a new object that has changed will be returned .
map2 = map.set('a', 10)print(map, map2)# will print:# <immutables.Map({'a': 1, 'b': 2})># <immutables.Map({'a': 10, 'b': 2})>map3 = map2.delete('b')print(map, map2, map3)# will print:# <immutables.Map({'a': 1, 'b': 2})># <immutables.Map({'a': 10, 'b': 2})># <immutables.Map({'a': 10})>
Besides ,context There's another one called run
Methods , It's executing loop.run_in_executor
I have used it since run
Method , The purpose is to generate a new context variable for another thread to use , At the same time, the new context variable is consistent with the original context variable .
perform run When , It can be seen that copy A new context to call the incoming function , because immutables.Map
The existence of , Changes to the context in the function do not affect the old context variables , To achieve the purpose of write time replication when the process replicates data . stay run
At the end of the method , After the function is executed, it will run again set Old context , This completes a context switch .
def run(self, callable, *args, **kwargs): # There are old context, Throw an exception , Prevent multi-threaded loop calls if self._prev_context is not None: raise RuntimeError( 'cannot enter context: {} is already entered'.format(self)) self._prev_context = _get_context() # Save the current context try: _set_context(self) # Set up the new context return callable(*args, **kwargs) # Execute function finally: _set_context(self._prev_context) # Set to old context self._prev_context = None
4.5 ContextVar We usually use contextvars When the module , What is often used is ContextVar
This class , This class is very simple , Mainly provides set– Set the value ,get– Get value ,reset– There are three ways to reset values , from Context
Class , and set and reset Is through the above token Class to interact with .
set – Set variables for the current context
def set(self, value): ctx = _get_context() # Gets the current context object `Context` data = ctx._data try: old_value = data[self] # obtain Context Old object except KeyError: old_value = Token.MISSING # If it cannot be obtained, fill in a object( Globally unique ) updated_data = data.set(self, value) # Set the new value ctx._data = updated_data return Token(ctx, self, old_value) # Returns... With old values token
get – Get variables from the current context
def get(self, default=_NO_DEFAULT): ctx = _get_context() # Gets the current context object `Context` try: return ctx[self] # Return the obtained value except KeyError: pass if default is not _NO_DEFAULT: return default # Return call get The value that is set when if self._default is not _NO_DEFAULT: return self._default # Return initialization context The default value set when raise LookupError # If you don't have one, you'll make a mistake
reset – Clean up the context data used this time
def reset(self, token): if token._used: # Judge token Has it been used raise RuntimeError("Token has already been used once") if token._var is not self: # Judge token Whether it's the current contextvar Back to raise ValueError( "Token was created by a different ContextVar") if token._context is not _get_context(): # Judge token Whether the context of the follows contextvar Context consistent raise ValueError( "Token was created in a different Context") ctx = token._context if token._old_value is Token.MISSING: # If there is no old value, delete the value ctx._data = ctx._data.delete(token._var) else: # If there is an old value, the current contextvar Change to old value ctx._data = ctx._data.set(token._var, token._old_value) token._used = True # Set up flag, Mark token It's been used
Then this ,contextvar I have understood the principle of , Next, let's see how he did it asyncio Running .
5.contextvars asyncio Due to downward compatibility contextvars
Does not support asyncio, So this is through aiotask-context The source code of asyncio How to get and set context.
Compared with contextvars Complex concepts , stay asyncio in , We can easily get the information of the current collaboration task, And then through task You can easily get task Of context 了 , because Pyhon3.7 Yes asyncio Advanced API The redesign , So you can see that you need to get the current task encapsulate
PY37 = sys.version_info >= (3, 7)if PY37: def asyncio_current_task(loop=None): """Return the current task or None.""" try: return asyncio.current_task(loop) except RuntimeError: # simulate old behaviour return Noneelse: asyncio_current_task = asyncio.Task.current_task
Different versions have different access task Method , Then we can call asyncio_current_task().context
You can get the current context …
alike , After getting the context , We also need it here set, get, reset The operation of , But it's very simple , similar dict Just do the same thing , It has no token The logic of :
set
def set(key, value): """ Sets the given value inside Task.context[key]. If the key does not exist it creates it. :param key: identifier for accessing the context dict. :param value: value to store inside context[key]. :raises """ current_task = asyncio_current_task() if not current_task: raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key)) current_task.context[key] = value
get
def get(key, default=None): """ Retrieves the value stored in key from the Task.context dict. If key does not exist, or there is no event loop running, default will be returned :param key: identifier for accessing the context dict. :param default: None by default, returned in case key is not found. :return: Value stored inside the dict[key]. """ current_task = asyncio_current_task() if not current_task: raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key)) return current_task.context.get(key, default)
clear – That is to say contextvar.ContextVars
Medium reset
def clear(): """ Clear the Task.context. :raises ValueError: if no current task. """ current_task = asyncio_current_task() if not current_task: raise ValueError("No event loop found") current_task.context.clear()
5.2 copying_task_factory and chainmap_task_factory stay Python In the more advanced version of , Settings... Are already supported context 了 , So these two methods can no longer be used . They all ended up using task_factory
Methods .task_factory
Simply put, create a new task, And then through the factory method context, Finally, put context Set to task
def task_factory(loop, coro, copy_context=False, context_factory=None): """ By default returns a task factory that uses a simple dict as the task context, but allows context creation and inheritance to be customized via ``context_factory``. """ # Generate context Factory function context_factory = context_factory or partial( dict_context_factory, copy_context=copy_context) # establish task, Follow asyncio.ensure_future equally task = asyncio.tasks.Task(coro, loop=loop) if task._source_traceback: del [-1] # obtain task Of context try: context = asyncio_current_task(loop=loop).context except AttributeError: context = None # from context Process in the factory context And assign a value in task task.context = context_factory(context) return task
aiotask-context
Provides two pairs of context Functions to process dict_context_factory
and chainmap_context_factory
. stay aiotask-context
in ,context It's a dict object ,dict_context_factory
You can choose to assign or set a new context
def dict_context_factory(parent_context=None, copy_context=False): """A traditional ``dict`` context to keep things simple""" if parent_context is None: # initial context return {} else: # inherit context new_context = parent_context if copy_context: new_context = deepcopy(new_context) return new_context
chainmap_context_factory
And dict_context_factory
The difference is in merging context Not direct inheritance . Simultaneous borrowing ChainMap
Guaranteed merger context after , It can also synchronize context Changes
def chainmap_context_factory(parent_context=None): """ A ``ChainMap`` context, to avoid copying any data and yet preserve strict one-way inheritance (just like with dict copying) """ if parent_context is None: # initial context return ChainMap() else: # inherit context if not isinstance(parent_context, ChainMap): # if a dict context was previously used, then convert # (without modifying the original dict) parent_context = ChainMap(parent_context) return parent_context.new_child()
thus , asyncio in context The simple analysis is finished , If you want to know more about asyncio How to transmit context , You can see asyncio All source code .
6. summarycontextvars Its principle is very simple , But it can make it easier for us to call , Reduce the number of times we pass parameters , At the same time, it can be combined with TypeHint Make the project more processed , But different people have different opinions . However, it is better to add a layer of packaging when using , The best practice is that one process shares the same context Instead of one for each variable context.
This is about python How to use contextvars This is the end of the article on module source code analysis , More about python contextvars Please search the previous articles of software development network or continue to browse the relevant articles below. I hope you will support software development network more in the future !