Keep creating , Accelerate growth ! This is my participation 「 Nuggets day new plan · 6 Yuegengwen challenge 」 Of the 4 God , Click to see the event details
Before in the article 《Python-gRPC practice (3)-- Use Python Realization gRPC service 》 The implementation of gRPC
The service uses a set of custom protocols to pass errors , But this is not an elegant solution , Because the compatibility of this scheme is very poor , Fortunately, the official definition of a solution , This scheme enables different services to deliver errors .
In writing ordinary HTTP/1.1 Interface , We all customize a set of business-related errors to HTTP The marked error is distinguished , For example, such a structure is usually returned :
{
"code": "0",
"msg": "success",
"data": {}
}
This structure contains code
,msg
and data
Three fields , They are error codes , error message , And the structure to return . After the client receives the response , Will judge code
What is the value of , If it belongs to the defined success status code, it passes data
Extract the data , Otherwise msg
Information is thrown out through exceptions .
In the use of gRPC
Is no exception , Because we're using gRPC
Invocation time , It's like calling a normal function , however gRPC
Services are delivered through message Data to interact with , Request per call message And response message Have been fixed , If we want to return an error message , Then it must be different from the response structure , So the structure of the error message must match the response body , Otherwise, we can only find another way , For example, the error information field is embedded in each response body , as follows :
message Demo {
string a=1;
int32 b=2;
int32 err_code=3;
string err_msg=4;
}
Then the server judges that the call execution is wrong and converts the error to the corresponding err_code
and err_msg
Then insert into message To the client , And every time the client receives the call response, it will judge err_code
If there is a value , There is an exception request , Only err_code
and err_msg
Extract it to generate an exception and throw it to the caller , Otherwise, the data will be returned normally .
This method is compatible with every call , But not very elegant , If you can transfer data to the client through other protocol containers , The client parses the error information through the corresponding protocol and produces exceptions , What I introduced before gRPC
In service , Is to adopt gRPC.metadata
To transmit data . At the same time, in order to automatically handle the exception capture of the server and the exception generation of the client , A top-level interceptor will be set on the client and server respectively , The top-level interceptor code of the server is as follows ( Because other interceptors may throw the wrong , Therefore, interceptors that catch errors must be placed at the top level ):
# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.5/grpc_example_common/interceptor/server_interceptor/customer_top.py
import logging
import time
from typing import Any, Callable, List, Tuple
import grpc
from grpc_example_common.helper.context import context_proxy
from .base import BaseInterceptor
class CustomerTopInterceptor(BaseInterceptor):
def intercept( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any:
return_initial_metadata: List[Tuple] = [("customer-user-agent", "Python3")]
try:
# perform gRPC Call to
return next_handler_method(request_proto_message, context)
except Exception as e:
# The client is limited to the following Key-Value To set the error message
if self.metadata_dict.get("customer-user-agent", "") == "Python3":
return_initial_metadata.append(("exc_name", e.__class__.__name__))
return_initial_metadata.append(("exc_info", str(e)))
# Throw an exception , such gRPC The server can catch the corresponding exception , It is convenient for the server to carry out subsequent processing
raise e
finally:
# End of sending metadata Stream to client
context.send_initial_metadata(return_initial_metadata)
The interceptor will catch the exception of the call , Then save the exception method name and exception information metedata in , The reason for setting the value to metadata in , Not through context.set_code
,context.set_details
There is a reason to set the error code and error message .
First of all code
,gRPC
It limits the setting of the allowed code
, So this will limit us to customize code
, At the same time, we should not set the business error code into the response error code , So don't use it here context.set_code
; And for set_details
, It's because gRPC
The server will resolve the corresponding exception after catching the exception , Then pass the abnormal data through context.set_details
Set to details
in , as follows :
def _call_behavior(rpc_event, state, behavior, argument, request_deserializer, send_response_callback=None):
from grpc import _create_servicer_context
with _create_servicer_context(rpc_event, state,
request_deserializer) as context:
try:
response_or_iterator = None
# Call request
if send_response_callback is not None:
response_or_iterator = behavior(argument, context,
send_response_callback)
else:
response_or_iterator = behavior(argument, context)
return response_or_iterator, True
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if state.aborted:
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
b'RPC Aborted')
elif exception not in state.rpc_errors:
# The judgment here does not belong to grpc Error of , The error message will be set to details
details = 'Exception calling application: {}'.format(
exception)
_LOGGER.exception(details)
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
_common.encode(details))
return None, False
This means that even if we set the interceptor details
, But because the thrown exception does not belong to gRPC
It's abnormal , therefore details
Finally, it is covered by abnormal information .
Understand the interceptor implementation on the server side , Next, let's look at the interceptor implementation on the client side , The code is as follows :
# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.5/grpc_example_common/interceptor/client_interceptor/customer_top.py
import inspect
import logging
from typing import Any, Callable, Dict, List, Optional, Type
from .base import GRPC_RESPONSE, BaseInterceptor, ClientCallDetailsType
class CustomerTopInterceptor(BaseInterceptor):
def __init__(self, exc_list: Optional[List[Type[Exception]]] = None):
self.exc_dict: Dict[str, Type[Exception]] = {}
for key, exc in globals()["__builtins__"].items():
# register Python It's a built-in anomaly
if inspect.isclass(exc) and issubclass(exc, Exception):
self.exc_dict[key] = exc
if exc_list:
# Register user specified exceptions
for exc in exc_list:
if issubclass(exc, Exception):
self.exc_dict[exc.__name__] = exc
def intercept( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE:
if call_details.metadata is not None:
# Add contract information
call_details.metadata.append(("customer-user-agent", "Python3")) # type: ignore
response: GRPC_RESPONSE = method(call_details, request_or_iterator)
metadata_dict: dict = {item.key: item.value for item in response.initial_metadata()}
if metadata_dict.get("customer-user-agent") == "Python3":
# Extract exception information
exc_name: str = metadata_dict.get("exc_name", "")
exc_info: str = metadata_dict.get("exc_info", "")
# adopt exc_name Look for exceptions
exc: Optional[Type[Exception]] = self.exc_dict.get(exc_name)
if exc:
# Throw an exception
raise exc(exc_info)
return response
It can be seen that the client interceptor obtains the information returned by the server metada To determine whether there is abnormal information , If so, extract and throw an error , Otherwise, the response will be returned normally . In this way, as long as the client and server are set with the correct interceptors , The client can get the error information of the server and throw an exception , But this implementation depends on gRPC.metadata
The transmission of data , and gRPC.metadata
The value of must be ASCII Or canonical bytes , Otherwise, the request will not be transmitted or even stuck , This means that we need to do some serialization of the error information .
Because the above implementation is not very elegant , So I surf the Internet to find an official implementation , At last Github
Found in the Official error transmission example , The official server example code is as follows :
def create_greet_limit_exceed_error_status(name):
# Create a Message object
detail = any_pb2.Any()
# Turn a custom error into a Any The object of , In this way, the verification failure will not occur when sending and receiving messages
detail.Pack(
error_details_pb2.QuotaFailure(violations=[
error_details_pb2.QuotaFailure.Violation(
subject="name: %s" % name,
description="Limit one greeting per person",
)
],))
# Generate a Status object , This object includes code,message,details Three fields
return status_pb2.Status(
code=code_pb2.RESOURCE_EXHAUSTED,
message='Request limit exceeded.',
# Error object array
details=[detail],
)
class LimitedGreeter(helloworld_pb2_grpc.GreeterServicer):
def __init__(self):
self._lock = threading.RLock()
self._greeted = set()
def SayHello(self, request, context):
# Corresponding gRPC call
with self._lock:
if request.name in self._greeted:
rich_status = create_greet_limit_exceed_error_status(
request.name)
context.abort_with_status(rpc_status.to_status(rich_status))
else:
self._greeted.add(request.name)
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
The... In the sample code SayHello
The method logic is very simple , It judges if name
non-existent , Just put name
Add to collection , And return to , If it already exists , Then Mr. Cheng becomes Status
object , Re pass to_status
Method generation One _Status
object , Finally through abort_with_stauts
Method to _Status
Object passing in , In this way, the error data is transmitted to the client .
among abort_with_stauts
Method causes the request to throw an exception and terminate in an abnormal state , And then put the user specified Status
Object to the client , and to_status
The source code is as follows :
def to_status(status):
return _Status(code=code_to_grpc_status_code(status.code),
details=status.message,
trailing_metadata=((GRPC_DETAILS_METADATA_KEY,
status.SerializeToString()),))
From the source code, we can see that this function is to status.code
To gRPC
Responsive code
, hold status.message
To gRPC
Of details
, Finally, put status
To a legal string , And pass GRPC_DETAILS_METADATA_KEY
Set the string to metadata in .
For the client side, it is relatively simple , Source code is as follows :
def process(stub):
try:
response = stub.SayHello(helloworld_pb2.HelloRequest(name='Alice'))
_LOGGER.info('Call success: %s', response.message)
except grpc.RpcError as rpc_error:
_LOGGER.error('Call failure: %s', rpc_error)
# adopt `grpc.RpcError` extract Status object
status = rpc_status.from_call(rpc_error)
for detail in status.details:
# Read detail The object in it , And judge whether it is corresponding message, If so, print an error log , If not, throw it wrong
if detail.Is(error_details_pb2.QuotaFailure.DESCRIPTOR):
info = error_details_pb2.QuotaFailure()
detail.Unpack(info)
_LOGGER.error('Quota failure: %s', info)
else:
raise RuntimeError('Unexpected failure: %s' % detail)
In this code , If it is a normal response , Print the response body , And if it is abnormal , The client will find the response body code
Not a normal status code , So I'll throw a grpc.RpcError
abnormal , And then through rpc_status.from_call
Function extraction exception , The logic of this function is very simple , Source code is as follows :
def from_call(call):
# without metadata The data directly returns null
if call.trailing_metadata() is None:
return None
# Once there is data, you can traverse the data
for key, value in call.trailing_metadata():
# If Key Officially designated Key, Enter the data extraction logic
if key == GRPC_DETAILS_METADATA_KEY:
# Deserialize the data into a message object
rich_status = status_pb2.Status.FromString(value)
# Verify whether the object data is the same as the response body
if call.code().value[0] != rich_status.code:
raise ValueError(
'Code in Status proto (%s) doesn\'t match status code (%s)'
% (code_to_grpc_status_code(rich_status.code), call.code()))
if call.details() != rich_status.message:
raise ValueError(
'Message in Status proto (%s) doesn\'t match status details (%s)'
% (rich_status.message, call.details()))
return rich_status
return None
From the source code, we can see that this logic is the same as the custom error passing , through metadata
Extract the data and assemble it into an exception object . however , It should be noted that from_call
Of call
Parameters not only support grpc.RpcError
, It also supports the response
object , because call
Parameter in form_call
Used in the trailing_metadata
,code
and details
Methods are grpc.RpcError
and response
Methods shared by objects .
In a simple understanding of gRPC
After passing an example of the error, you can find , The official approach is very similar to custom error passing , It just defines a standard Key, In this way, everyone will think that Key The corresponding value is a Status
A string formed by serialization of objects ( Due to serialization , There is no need to worry about the existence of non - ASCII The character problem ). And this Status
Object contains code
,message
and detail
Three fields , Corresponding to the above error structure :
{
"code": "0",
"msg": "success",
"data": {}
}
Medium code
,msg
and data
, But here's the thing detail
Is an array , It can store multiple customized Message
object .
It can be found through the official error transmission , This example requires the business logic of the server to actively pass context.abort_with_status
Logic to actively set the error message to metadata
in , Client side capture is also required grpc.RpcError
Print out the exception , This is very verbose for the business layer , So I try to combine the error transmission implementation of the official protocol with the user-defined error transmission .
The first is to define an internally unified message
:
message Exec{
string name = 1; // Exception names
string msg = 2; // Abnormal information
}
This Message Only for internal business services , If the server is developed for other departments , And they are not compatible with this message
, They can also pass through code
and detail
Know what kind of mistake it is .
Then start to toss the top interceptor of the server , The interceptor simply modifies the code that captures the exception part , Source code is as follows :
# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.7/grpc_example_common/interceptor/server_interceptor/customer_top.py
class CustomerTopInterceptor(BaseInterceptor):
def intercept( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any:
try:
# The service call
return next_handler_method(request_proto_message, context)
except Exception as e:
# Create a Message object
detail = any_pb2.Any()
# Turn a custom error into a Any The object of , In this way, the verification failure will not occur when sending and receiving messages
# It should be noted that , Here is our own definition message.Exec
detail.Pack(
Exec(
name=e.__class__.__name__,
msg=str(e)
)
)
# adopt abort_with_status Pass the data through metadata Pass it to the client
context.abort_with_status(
rpc_status.to_status(
status_pb2.Status(
code=code_pb2.RESOURCE_EXHAUSTED, # Only fill in here gRPC Error code , For example, we defined the error code of the business as 2001, however HTTP The status code of is still 200 equally
message=str(e),
details=[detail], # Here is an array , Therefore, multiple sets of abnormal objects can be defined here to be compatible with different systems , However, there is only one set of methods in internal calls
)
)
)
# Throw an exception , however gRPC The server judges that the call has been marked as abort, It's not going to continue
# But it is useful for other functions , such as opentelemetry The official implementation of is in channel Apply another channel, So it needs to catch the exception and generate the corresponding Event
raise e
Then we toss the top interceptor of the client , Similarly, it only needs to change the data acquisition , Source code is as follows :
# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.7/grpc_example_common/interceptor/client_interceptor/customer_top.py
class CustomerTopInterceptor(BaseInterceptor):
# Register abnormal band press morgue
...
def intercept( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE:
response: GRPC_RESPONSE = method(call_details, request_or_iterator)
# It was said that `from_call` It also supports the client interceptor `method` Method derived response object
status: Optional[status_pb2.Status] = rpc_status.from_call(response)
# If not for None, It is proved that the abnormal data is obtained
if status:
for detail in status.details:
# Judge this detail Is it what we want Message
if detail.Is(Exec.DESCRIPTOR):
# Get data through deserialization
exec_instance: Exec = Exec()
detail.Unpack(exec_instance)
# Generate an exception and throw
exec_class: Type[Exception] = self.exc_dict.get(exec_instance.name) or RuntimeError
raise exec_class(exec_instance.msg)
else:
raise RuntimeError('Unexpected failure: %s' % detail)
return response
thus , The new error passing implementation is complete , Now through a simple demo To verify the results ,demo The code is as follows :
# grpc_example_common url:https://github.com/so1n/grpc-example-common/tree/v0.1.7
# Server code
from concurrent import futures
from typing import List
import grpc
from grpc_example_common.interceptor.server_interceptor.base import BaseInterceptor
from google.protobuf.empty_pb2 import Empty # type: ignore
from grpc_example_common.protos.user import user_pb2 as user_message
from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor
from grpc_example_common.protos.user import user_pb2_grpc as user_service
class UserService(user_service.UserServicer):
def delete_user(self, request: user_message.DeleteUserRequest, context: grpc.ServicerContext) -> Empty:
uid: str = request.uid
if uid == "123":
return Empty()
else:
raise ValueError(f"Not found user:{uid}")
def main(host: str = "127.0.0.1", port: str = "9000") -> None:
interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()]
server: grpc.server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=interceptor_list,
)
user_service.add_UserServicer_to_server(UserService(), server)
server.add_insecure_port(f"{host}:{port}")
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
main()
# Client code
import grpc
from grpc_example_common.protos.user import user_pb2 as user_message
from grpc_example_common.protos.user import user_pb2_grpc as user_service
from grpc_example_common.interceptor.client_interceptor.customer_top import CustomerTopInterceptor
channel: grpc.Channel = grpc.intercept_channel(
grpc.insecure_channel("127.0.0.1:9000"), CustomerTopInterceptor()
)
user_stub: user_service.UserStub = user_service.UserStub(channel)
user_stub.delete_user(user_message.DeleteUserRequest(uid="123"))
user_stub.delete_user(user_message.DeleteUserRequest(uid="456"))
written demo And then start running , After running, the client throws the following error message :
Traceback (most recent call last):
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/demo.py", line 11, in <module>
user_stub.delete_user(user_message.DeleteUserRequest(uid="456"))
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc/_interceptor.py", line 216, in __call__
response, ignored_call = self._with_call(request,
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc/_interceptor.py", line 254, in _with_call
call = self._interceptor.intercept_unary_unary(continuation,
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc_example_common/interceptor/client_interceptor/base.py", line 74, in intercept_unary_unary
return self.intercept(continuation, request, call_details)
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc_example_common/interceptor/client_interceptor/customer_top.py", line 44, in intercept
raise exec_class(exec_instance.msg)
ValueError: Not found user:456
Through information, we can find , Redesigned error delivery for perfect operation .