前记
在Python3.7后官方库出现了contextvars
模块, 它的主要功能就是可以为多线程以及asyncio生态添加上下文功能,即使程序在多个协程并发运行的情况下,也能调用到程序的上下文变量, 从而使我们的逻辑解耦.
上下文,可以理解为我们说话的语境, 在聊天的过程中, 有些话脱离了特定的语境,他的意思就变了,程序的运行也是如此.在线程中也是有他的上下文,只不过称为堆栈,如在python中就是保存在thread.local变量中,而协程也有他自己的上下文,但是没有暴露出来,不过有了contextvars
模块后我们可以通过contextvars
模块去保存与读取.
使用contextvars
的好处不仅可以防止’一个变量传遍天’的事情发生外,还能很好的结合TypeHint,可以让自己的代码可以被mypy以及IDE检查,让自己的代码更加适应工程化.
不过用了contextvars
后会多了一些隐性的调用, 需要解决好这些隐性的成本.
更新说明
1.有无上下文传变量的区别
如果有用过Flask
框架, 就知道了Flask
拥有自己的上下文功能, 而contextvars跟它很像, 而且还增加了对asyncio的上下文提供支持。
Flask
的上下文是基于threading.local
实现的, threading.local
的隔离效果很好,但是他是只针对线程的,只隔离线程之间的数据状态, 而werkzeug
为了支持在gevent
中运行,自己实现了一个Local
变量, 常用的Flask
上下文变量request
的例子如下:
1 2 3 4 5 6 | from flask import Flask, request
app = Flask(__name__)
@app .route( '/' )
def root():
so1n_name = request.get( 'so1n_name' )
return f 'Name is {so1n_name}'
|
与之相比的是Python
的另一个经典Web框架Djano
, 它没有上下文的支持, 所以只能显示的传request
对象, 例子如下:
1 2 3 4 | from django.http import HttpResponse
def root(request):
so1n_name = request.get( 'so1n_name' )
return HttpResponse(f 'Name is {so1n_name}' )
|
通过上面两者的对比可以发现, 在Django
中,我们需要显示的传一个叫request的变量,而Flask
则是import一个叫request的全局变量,并在视图中直接使用,达到解耦的目的.
可能会有人说, 也就是传个变量的区别,为了省传这个变量,而花许多功夫去维护一个上下文变量,有点不值得,那可以看看下面的例子,如果层次多就会出现’一个参数传一天’的情况(不过分层做的好或者需求不坑爹一般不会出现像下面的情况,一个好的程序员能做好代码的分层, 但可能也有出现一堆烂需求的时候)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | from django.http import HttpResponse
def is_allow(request, uid):
if request.ip = = '127.0.0.1' and check_permissions(uid):
return True
else :
return False
def check_permissions(request, uid):
pass
def root(request):
user_id = request.GET.get( 'uid' )
if is_allow(request, id ):
return HttpResponse( 'ok' )
else
return HttpResponse( 'error' )
|
此外, 除了防止一个参数传一天
这个问题外, 通过上下文, 可以进行一些解耦, 比如有一个最经典的技术业务需求就是在日志打印request_id, 从而方便链路排查, 这时候如果有上下文模块, 就可以把读写request_id给解耦出来, 比如下面这个基于Flask
框架读写request_id的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import logging
from typing import Any
from flask import g
from flask.logging import default_handler
class RequestIDLogFilter(logging. Filter ):
def filter ( self , record: Any ) - > Any :
record.request_id = g.request_id or None
return record
format_string: str = (
"[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s"
)
default_handler.setFormatter(logging.Formatter(format_string))
default_handler.addFilter(RequestIDLogFilter())
def set_request_id() - > None :
g.request_id = request.headers.get( "X-Request-Id" , str (uuid4()))
app: Flask = Flask( "demo" )
app.before_request(set_request_id)
|
2.如何使用contextvars模块
这里举了一个例子, 但这个例子也有别的解决方案. 只不过通过这个例子顺便说如何使用contextvar模块
首先看看未使用contextvars
时,asyncio的web框架是如何传变量的,根据starlette
的文档,在未使用contextvars
时,传递Redis
客户端实例的办法是通过request.stat这个变量保存Redis
客户端的实例,改写代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | class RequestContextMiddleware(BaseHTTPMiddleware):
async def dispatch(
self , request: Request, call_next: RequestResponseEndpoint
) - > Response:
request.stat.redis = REDIS_POOL
response = await call_next(request)
return response
@APP .route( '/' )
async def homepage(request):
await request.stat.redis.execute()
return JSONResponse({ 'hello' : 'world' })
|
代码非常简便, 也可以正常的运行, 但你下次在重构时, 比如简单的把redis这个变量名改为new_redis, 那IDE不会识别出来, 需要一个一个改。 同时, 在写代码的时候, IDE永远不知道这个方法调用到的变量的类型是什么, IDE也无法智能的帮你检查(如输入request.stat.redis.时,IDE不会出现execute,或者出错时,IDE并不会提示). 这非常不利于项目的工程化, 而通过contextvars
和TypeHints
, 恰好能解决这个问题.
说了那么多, 下面以一个Redis
client为例子,展示如何在asyncio生态中使用contextvars
, 并引入TypeHints
(详细解释见代码).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | import contextvars
if TYPE_CHECKING:
from demo.redis_dal import RDS
redis_pool_context = contextvars.ContextVar( 'redis_pool' )
def get_redis() - > 'RDS' :
return redis_pool_context.get()
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.middleware.base import RequestResponseEndpoint
from starlette.responses import Response
from demo.redis_dal import RDS
REDIS_POOL = None
class RequestContextMiddleware(BaseHTTPMiddleware):
async def dispatch(
self , request: Request, call_next: RequestResponseEndpoint
) - > Response:
token = redis_pool_context. set (REDIS_POOL)
try :
response = await call_next(request)
return response
finally :
redis_pool_context.reset(token)
async def startup_event() - > None :
global REDIS_POOL
REDIS_POOL = RDS()
async def shutdown_event() - > None :
if REDIS_POOL:
await REDIS_POOL.close()
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from demo.web_tool import RequestContextMiddleware
from demo.context import get_redis
APP = Starlette()
APP.add_middleware(RequestContextMiddleware)
@APP .route( '/' )
async def homepage(request):
await get_redis().execute()
return JSONResponse({ 'hello' : 'world' })
|
3.如何优雅的使用contextvars
从上面的示例代码来看, 使用contextvar
和TypeHint
确实能让让IDE可以识别到这个变量是什么了, 但增加的代码太多了,更恐怖的是, 每多一个变量,就需要自己去写一个context,一个变量的初始化,一个变量的get函数,同时在引用时使用函数会比较别扭.
自己在使用了contextvars
一段时间后,觉得这样太麻烦了,每次都要做一堆重复的操作,且平时使用最多的就是把一个实例或者提炼出Headers的参数放入contextvars中,所以写了一个封装fast_tools.context(同时兼容fastapi
和starlette
), 它能屏蔽所有与contextvars的相关逻辑,其中由ContextModel负责contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader负责托管Headers相关的参数, 调用者只需要在ContextModel中写入自己需要的变量,引用时调用ContextModel的属性即可.
以下是调用者的代码示例, 这里的实例化变量由一个http client代替, 且都会每次请求分配一个客户端实例, 但在实际使用中并不会为每一个请求都分配一个客户端实例, 很影响性能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | import asyncio
import uuid
from contextvars import Context, copy_context
from functools import partial
from typing import Optional, Set
import httpx
from fastapi import FastAPI, Request, Response
from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper
app: FastAPI = FastAPI()
check_set: Set [ int ] = set ()
class ContextModel(ContextBaseModel):
http_client: httpx.AsyncClient
request_id: str = HeaderHelper.i( "X-Request-Id" , default_func = lambda request: str (uuid.uuid4()))
ip: str = HeaderHelper.i( "X-Real-IP" , default_func = lambda request: request.client.host)
user_agent: str = HeaderHelper.i( "User-Agent" )
async def before_request( self , request: Request) - > None :
self .http_client = httpx.AsyncClient()
check_set.add( id ( self .http_client))
async def before_reset_context( self , request: Request, response: Optional[Response]) - > None :
await self .http_client.aclose()
context_model: ContextModel = ContextModel()
app.add_middleware(ContextMiddleware, context_model = context_model)
async def test_ensure_future() - > None :
assert id (context_model.http_client) in check_set
def test_run_in_executor() - > None :
assert id (context_model.http_client) in check_set
def test_call_soon() - > None :
assert id (context_model.http_client) in check_set
@app .get( "/" )
async def root() - > dict :
asyncio.ensure_future(test_ensure_future())
loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()
loop.call_soon(test_call_soon)
ctx: Context = copy_context()
await loop.run_in_executor( None , partial(ctx.run, test_run_in_executor))
return {
"message" : context_model.to_dict(is_safe_return = True ),
"client_id" : id (context_model.http_client),
}
if __name__ = = "__main__" :
import uvicorn
uvicorn.run(app)
|
可以从例子中看到, 通过封装的上下文调用会变得非常愉快, 只要通过一两步方法就能设置好自己的上下文属性, 同时不用考虑如何编写上下文的生命周期. 另外也能通过这个例子看出, 在asyncio生态中, contextvars能运用到包括子协程, 多线程等所有的场景中.
4.contextvars的原理
在第一次使用时,我就很好奇contextvars是如何去维护程序的上下文的,好在contextvars的作者出了一个向下兼容的contextvars库,虽然他不支持asyncio,但我们还是可以通过代码了解到他的基本原理.
4.1 ContextMeta,ContextVarMeta和TokenMeta
代码仓中有ContextMeta
,ContextVarMeta
和TokenMeta
这几个对象, 它们的功能都是防止用户来继承Context
,ContextVar
和Token
,原理都是通过元类来判断类名是否是自己编写类的名称,如果不是则抛错.
1 2 3 4 5 6 7 | class ContextMeta( type (collections.abc.Mapping)):
def __new__(mcls, names, bases, dct):
cls = super ().__new__(mcls, names, bases, dct)
if cls .__module__ ! = 'contextvars' or cls .__name__ ! = 'Context' :
raise TypeError( "type 'Context' is not an acceptable base type" )
return cls
|
4.2 Token
上下文的本质是一个堆栈, 每次set一次对象就向堆栈增加一层数据, 每次reset就是pop掉最上层的数据, 而在Contextvars
中, 通过Token
对象来维护堆栈之间的交互.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | class Token(metaclass = TokenMeta):
MISSING = object ()
def __init__( self , context, var, old_value):
self ._context = context
self ._var = var
self ._old_value = old_value
self ._used = False
@property
def var( self ):
return self ._var
@property
def old_value( self ):
return self ._old_value
def __repr__( self ):
r = '<Token '
if self ._used:
r + = ' used'
r + = ' var={!r} at {:0x}>' . format ( self ._var, id ( self ))
return r
|
可以看到Token
的代码很少, 它只保存当前的context
变量, 本次调用set的数据和上一次被set的旧数据. 用户只有在调用contextvar.context
后才能得到Token
, 返回的Token
可以被用户在调用context后, 通过调用context.reset(token)来清空保存的上下文,方便本次context的变量能及时的被回收, 回到上上次的数据.
4.3 全局唯一context
前面说过, Python中由threading.local()
负责每个线程的context, 协程属于线程的’子集’,所以contextvar直接基于threading.local()
生成自己的全局context. 从他的源代码可以看到, _state
就是threading.local()
的引用, 并通过设置和读取_state
的context
属性来写入和读取当前的上下文, copy_context
调用也很简单, 同样也是调用到threading.local()
API.
1 2 3 4 5 6 7 8 9 10 11 | def copy_context():
return _get_context().copy()
def _get_context():
ctx = getattr (_state, 'context' , None )
if ctx is None :
ctx = Context()
_state.context = ctx
return ctx
def _set_context(ctx):
_state.context = ctx
_state = threading.local()
|
关于threading.local()
,虽然不是本文重点,但由于contextvars
是基于threading.local()
进行封装的,所以还是要明白threading.local()
的原理,这里并不直接通过源码分析, 而是做一个简单的示例解释.
在一个线程里面使用线程的局部变量会比直接使用全局变量的性能好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁, 性能会变得很差, 比如下面全局变量的例子:
1 2 3 4 5 | pet_dict = {}
def get_pet(pet_name):
return pet_dict[pet_name]
def set_pet(pet_name):
return pet_dict[pet_name]
|
这份代码就是模仿一个简单的全局变量调用, 如果是多线程调用的话, 那就需要加锁啦, 每次在读写之前都要等到持有锁的线程放弃了锁后再去竞争, 而且还可能污染到了别的线程存放的数据.
而线程的局部变量则是让每个线程有一个自己的pet_dict
, 假设每个线程调用get_pet
,set_pet
时,都会把自己的pid传入进来, 那么就可以避免多个线程去同时竞争资源, 同时也不会污染到别的线程的数据, 那么代码可以改为这样子:
1 2 3 4 5 | pet_dict = {}
def get_pet(pet_name, pid):
return pet_dict[pid][pet_name]
def set_pet(pet_name, pid):
return pet_dict[pid][pet_name]
|
不过这样子使用起来非常方便, 同时示例例子没有对异常检查和初始化等处理, 如果值比较复杂, 我们还要维护异常状况, 这样太麻烦了.
这时候threading.local()
就应运而生了,他负责帮我们处理这些维护的工作,我们只要对他进行一些调用即可,调用起来跟单线程调用一样简单方便, 应用threading.local()
后的代码如下:
1 2 3 4 5 6 | import threading
thread_local = threading.local()
def get_pet(pet_name):
return thread_local[pet_name]
def set_pet(pet_name):
return thread_local[pet_name]
|
可以看到代码就像调用全局变量一样, 但是又不会产生竞争状态。
4.4contextvar自己封装的Context
contextvars
自己封装的Context比较简单, 这里只展示他的两个核心方法(其他的魔术方法就像dict
的魔术方法一样):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | class Context(collections.abc.Mapping, metaclass = ContextMeta):
def __init__( self ):
self ._data = immutables. Map ()
self ._prev_context = None
def run( self , callable , * args, * * kwargs):
if self ._prev_context is not None :
raise RuntimeError(
'cannot enter context: {} is already entered' . format ( self ))
self ._prev_context = _get_context()
try :
_set_context( self )
return callable ( * args, * * kwargs)
finally :
_set_context( self ._prev_context)
self ._prev_context = None
def copy( self ):
new = Context()
new._data = self ._data
return new
|
首先, 在__init__
方法可以看到self._data,这里使用到了一个叫immutables.Map()的不可变对象,并对immutables.Map()进行一些封装,所以context可以看成一个不可变的dict。这样可以防止调用copy方法后得到的上下文的变动会影响到了原本的上下文变量。
查看immutables.Map()的示例代码可以看到,每次对原对象的修改时,原对象并不会发生改变,并会返回一个已经发生改变的新对象.
1 2 3 4 5 6 7 8 9 10 11 | map2 = map . set ( 'a' , 10 )
print ( map , map2)
map3 = map2.delete( 'b' )
print ( map , map2, map3)
|
此外,context还有一个叫run
的方法, 上面在执行loop.run_in_executor
时就用过run
方法, 目的就是可以产生一个新的上下文变量给另外一个线程使用, 同时这个新的上下文变量跟原来的上下文变量是一致的.
执行run的时候,可以看出会copy一个新的上下文来调用传入的函数, 由于immutables.Map
的存在, 函数中对上下文的修改并不会影响旧的上下文变量, 达到进程复制数据时的写时复制的目的. 在run
方法的最后, 函数执行完了会再次set旧的上下文, 从而完成一次上下文切换.
1 2 3 4 5 6 7 8 9 10 11 12 | def run( self , callable , * args, * * kwargs):
if self ._prev_context is not None :
raise RuntimeError(
'cannot enter context: {} is already entered' . format ( self ))
self ._prev_context = _get_context()
try :
_set_context( self )
return callable ( * args, * * kwargs)
finally :
_set_context( self ._prev_context)
self ._prev_context = None
|
4.5 ContextVar
我们一般在使用contextvars模块时,经常使用的就是ContextVar
这个类了,这个类很简单,主要提供了set–设置值,get–获取值,reset–重置值三个方法, 从Context
类中写入和获取值, 而set和reset的就是通过上面的token类进行交互的.
set – 为当前上下文设置变量
1 2 3 4 5 6 7 8 9 10 | def set ( self , value):
ctx = _get_context()
data = ctx._data
try :
old_value = data[ self ]
except KeyError:
old_value = Token.MISSING
updated_data = data. set ( self , value)
ctx._data = updated_data
return Token(ctx, self , old_value)
|
get – 从当前上下文获取变量
1 2 3 4 5 6 7 8 9 10 11 | def get( self , default = _NO_DEFAULT):
ctx = _get_context()
try :
return ctx[ self ]
except KeyError:
pass
if default is not _NO_DEFAULT:
return default
if self ._default is not _NO_DEFAULT:
return self ._default
raise LookupError
|
reset – 清理本次用到的上下文数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def reset( self , token):
if token._used:
raise RuntimeError( "Token has already been used once" )
if token._var is not self :
raise ValueError(
"Token was created by a different ContextVar" )
if token._context is not _get_context():
raise ValueError(
"Token was created in a different Context" )
ctx = token._context
if token._old_value is Token.MISSING:
ctx._data = ctx._data.delete(token._var)
else :
ctx._data = ctx._data. set (token._var, token._old_value)
token._used = True
|
则此,contextvar的原理了解完了,接下来再看看他是如何在asyncio运行的.
5.contextvars asyncio
由于向下兼容的contextvars
并不支持asyncio, 所以这里通过aiotask-context的源码简要的了解如何在asyncio中如何获取和设置context。
5.1在asyncio中获取context
相比起contextvars复杂的概念,在asyncio中,我们可以很简单的获取到当前协程的task, 然后通过task就可以很方便的获取到task的context了,由于Pyhon3.7对asyncio的高级API 重新设计,所以可以看到需要对获取当前task进行封装
1 2 3 4 5 6 7 8 9 10 11 | PY37 = sys.version_info > = ( 3 , 7 )
if PY37:
def asyncio_current_task(loop = None ):
try :
return asyncio.current_task(loop)
except RuntimeError:
return None
else :
asyncio_current_task = asyncio.Task.current_task
|
不同的版本有不同的获取task方法, 之后我们就可以通过调用asyncio_current_task().context
即可获取到当前的上下文了…
5.2 对上下文的操作
同样的,在得到上下文后, 我们这里也需要set, get, reset的操作,不过十分简单, 类似dict一样的操作即可, 它没有token的逻辑:
set
1 2 3 4 5 6 7 8 9 10 11 12 | def set (key, value):
current_task = asyncio_current_task()
if not current_task:
raise ValueError(NO_LOOP_EXCEPTION_MSG. format (key))
current_task.context[key] = value
|
get
1 2 3 4 5 6 7 8 9 10 11 12 | def get(key, default = None ):
current_task = asyncio_current_task()
if not current_task:
raise ValueError(NO_LOOP_EXCEPTION_MSG. format (key))
return current_task.context.get(key, default)
|
clear – 也就是contextvar.ContextVars
中的reset
1 2 3 4 5 6 7 8 9 | def clear():
current_task = asyncio_current_task()
if not current_task:
raise ValueError( "No event loop found" )
current_task.context.clear()
|
5.2 copying_task_factory和chainmap_task_factory
在Python的更高级版本中,已经支持设置context了,所以这两个方法可以不再使用了.他们最后都用到了task_factory
的方法.
task_factory
简单说就是创建一个新的task,再通过工厂方法合成context,最后把context设置到task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | def task_factory(loop, coro, copy_context = False , context_factory = None ):
context_factory = context_factory or partial(
dict_context_factory, copy_context = copy_context)
task = asyncio.tasks.Task(coro, loop = loop)
if task._source_traceback:
del [ - 1 ]
try :
context = asyncio_current_task(loop = loop).context
except AttributeError:
context = None
task.context = context_factory(context)
return task
|
aiotask-context
提供了两个对context处理的函数dict_context_factory
和chainmap_context_factory
.在aiotask-context
中,context是一个dict对象,dict_context_factory
可以选择赋值或者设置新的context
1 2 3 4 5 6 7 8 9 10 11 | def dict_context_factory(parent_context = None , copy_context = False ):
if parent_context is None :
return {}
else :
new_context = parent_context
if copy_context:
new_context = deepcopy(new_context)
return new_context
|
chainmap_context_factory
与dict_context_factory
的区别就是在合并context而不是直接继承.同时借用ChainMap
保证合并context后,还能同步context的改变
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def chainmap_context_factory(parent_context = None ):
if parent_context is None :
return ChainMap()
else :
if not isinstance (parent_context, ChainMap):
parent_context = ChainMap(parent_context)
return parent_context.new_child()
|
以上就是python怎么使用contextvars模块的详细内容
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。