fastapi微服務系列(2)-之GRPC的interceptor攔截器簡單使用(中間件)

語言: CN / TW / HK

theme: juejin

對於一個框架來説,通常具備有所謂的中間件,有時候也可以説是攔截器,其實和鈎子差不多的概念。

那grpc也不例外。但是使用python如何應用到我們的攔截器的吶? 攔截器又可以做哪些事情呢?

1:grpc的攔截器可以做啥?

本身攔截器的概念和我們的中間件類似,所以類似fastapi中我們的中間件能做,攔截器都可以做:

  • 身份驗證
  • 日誌請求記錄
  • 全局上下文的信息處理等
  • 多個攔截器和多箇中間件遵循的請求規則都是洋葱模型
  • 攔截器必須有返回值,返回是響應報文體

PS:而且相對GRPC來説不止於我們的服務端有鈎子,客户端也有鈎子(攔截器),和我們的httpx庫提供的類似的鈎子函數差不多!

PS:攔截器可以作用再客户端和服務端:客户端攔截器和服務端攔截器

2:grpc的攔截器分類

  • 一元攔截器(UnaryServerInterceptor)-客户端中
  • 流式攔截器(StreamClientInterceptor)- 客户端中
  • python中的服務端是實現ServerInterceptor

image.png

3:在python實現grpc攔截器

查看服務傳遞的攔截器參數説明:

image.png

3.1 服務端的自帶攔截器

主要注意點:

  • 攔截器傳入是一個實例化的對象
  • 攔截器列表的傳入,可以是元組也可以是列表
  • 多攔截器的形式遵循洋葱模型

服務端攔截器需要實現攔截器的抽象方法:

image.png

完整服務端示例代碼: ``` from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal

實現 proto文件中定義的 GreeterServicer的接口

class Greeter(hello_pb2_grpc.GreeterServicer): # 實現 proto 文件中定義的 rpc 調用 def SayHello(self, request, context): # 返回是我們的定義的響應體的對象 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

def SayHelloAgain(self, request, context):
    # 返回是我們的定義的響應體的對象

    # # 設置異常狀態碼
    # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    # context.set_details("你沒有這個訪問的權限")
    # raise context

    # 接收請求頭的信息
    print("接收到的請求頭元數據信息", context.invocation_metadata())
    # 設置響應報文頭信息
    context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
    # 三種的壓縮機制處理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 局部的數據進行壓縮
    context.set_compression(grpc.Compression.Gzip)
    return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

class MyUnaryServerInterceptor1(grpc.ServerInterceptor):

def intercept_service(self,continuation, handler_call_details):
    print("我是攔截器1號:開始----1")
    respn = continuation(handler_call_details)
    print("我是攔截器1號:結束----2",respn)
    return respn

class MyUnaryServerInterceptor2(grpc.ServerInterceptor):

def intercept_service(self,continuation, handler_call_details):
    print("我是攔截器2號:開始----1")
    respn = continuation(handler_call_details)
    print("我是攔截器2號:結束----2",respn)
    return respn

def serve():

# 實例化一個rpc服務,使用線程池的方式啟動我們的服務
# 服務一些參數信息的配置
options = [
    ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制發送的最大的數據大小
    ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的數據的大小
]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全局的數據傳輸的壓縮機制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=options,
                     compression=compression,
                     interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 添加我們服務
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置啟動的端口
server.add_insecure_port('[::]:50051')
#  開始啟動的服務
server.start()

def stop_serve(signum, frame):
    print("進程結束了!!!!")
    # sys.exit(0)
    raise KeyboardInterrupt

# 註銷相關的信號
# SIGINT 對應windos下的 ctrl+c的命令
# SIGTERM 對應的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)

# wait_for_termination --主要是為了目標啟動後主進程直接的結束!需要一個循環的方式進行進行進程運行
server.wait_for_termination()

if name == 'main': serve() ```

關鍵的配置地方是:

image.png 此時使用我們的客户端請求服務端,服務端會輸出一下的信息:

~~~ 我是攔截器1號:開始----1 我是攔截器2號:開始----1 我是攔截器2號:結束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=, response_serializer=, unary_unary=<bound method Greeter.SayHelloAgain of <main.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None) 我是攔截器1號:結束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=, response_serializer=, unary_unary=<bound method Greeter.SayHelloAgain of <main.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None) 接收到的請求頭元數據信息 (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', value='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))

~~~

3.2 客户端的自帶攔截器

客户端攔截器的需要實現類和服務端的不一樣:

image.png

且當我們的使用客户端攔截器的時候,主要鏈接到我們的RPC的時候的方式也有所改變:

image.png

完整客户端示例代碼:

``` import grpc import hello_pb2 import hello_pb2_grpc

class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客户端的攔截器1:---開始1") resp = continuation(client_call_details, request) print("客户端的攔截器1:---結束2", resp) return resp

class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客户端的攔截器2:---開始1") resp = continuation(client_call_details, request) print("客户端的攔截器2:---結束2", resp) return resp

def run(): # 連接 rpc 服務器 options = [ ('grpc.max_send_message_length', 100 * 1024 * 1024), ('grpc.max_receive_message_length', 100 * 1024 * 1024), ('grpc.enable_retries', 1), ('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }') ]

# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全局的數據傳輸的壓縮機制
compression = grpc.Compression.Gzip
# with grpc.insecure_channel(target='localhost:50051',
#                            options=options,
#                            compression=compression
#                            ) as channel:


with grpc.insecure_channel(target='localhost:50051',
                           options=options,
                           compression=compression
                           ) as channel:
    # 通過通道服務一個服務intercept_channel
    interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2())
    stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
    # 生成請求我們的服務的函數的時候,需要傳遞的參數體,它放在hello_pb2裏面-請求體為:hello_pb2.HelloRequest對象
    try:

        reest_header = (
            ('mesasge', '1010'),
            ('error', 'No Error')
        )

        response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='歡迎下次光臨'),
                                                          # 設置請求的超時處理
                                                          timeout=5,
                                                          # 設置請求的頭的信息
                                                          metadata=reest_header,
                                                          )
        print("SayHelloAgain函數調用結果的返回: " + response.message)
        print("SayHelloAgain函數調用結果的返回---響應報文頭信息: ", callbask.trailing_metadata())
    except grpc._channel._InactiveRpcError as e:
        print(e.code())
        print(e.details())

if name == 'main': run() ```

4:grpc攔截器上下文傳遞

我們的都知道作為中間件的話,一般某些業務場景下是有些使用承載請求上下文的傳遞的任務滴,然是自帶的攔截器,似乎完全沒有對應的 request, context 相關的引入傳遞,如果我們的需要傳遞上下文的時候呢?這就無法實現了!!!!

要實現具有上下文的傳遞攔截器的話使用第三方庫來實現:

pip install grpc-interceptor

這個庫還字典的相關的測試: $ pip install grpc-interceptor[testing]

4.1 改造服務端攔截器

該用 第三方庫後的完整服務端改造示例:

``` from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal from typing import Any,Callable

實現 proto文件中定義的 GreeterServicer的接口

class Greeter(hello_pb2_grpc.GreeterServicer): # 實現 proto 文件中定義的 rpc 調用 def SayHello(self, request, context): # 返回是我們的定義的響應體的對象 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

def SayHelloAgain(self, request, context):
    # 返回是我們的定義的響應體的對象

    # # 設置異常狀態碼
    # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    # context.set_details("你沒有這個訪問的權限")
    # raise context

    # 接收請求頭的信息
    print("接收到的請求頭元數據信息", context.invocation_metadata())
    # 設置響應報文頭信息
    context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
    # 三種的壓縮機制處理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 局部的數據進行壓縮
    context.set_compression(grpc.Compression.Gzip)
    return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

from grpc_interceptor import ServerInterceptor from grpc_interceptor.exceptions import GrpcException from grpc_interceptor.exceptions import NotFound class MyUnaryServerInterceptor1(ServerInterceptor):

def intercept(
        self,
        method: Callable,
        request: Any,
        context: grpc.ServicerContext,
        method_name: str,
) -> Any:

    rsep = None
    try:
        print("我是攔截器1號:開始----1")
        rsep= method(request, context)
    except GrpcException as e:
        context.set_code(e.status_code)
        context.set_details(e.details)
        raise
    finally:
        print("我是攔截器1號:結束----2",rsep)
        return rsep

class MyUnaryServerInterceptor2(ServerInterceptor):

def intercept(
        self,
        method: Callable,
        request: Any,
        context: grpc.ServicerContext,
        method_name: str,
) -> Any:

    rsep = None
    try:
        print("我是攔截器2號:開始----1")
        rsep= method(request, context)
    except GrpcException as e:
        context.set_code(e.status_code)
        context.set_details(e.details)
        raise
    finally:
        print("我是攔截器2號:結束----2",rsep)
        return rsep

def serve():

# 實例化一個rpc服務,使用線程池的方式啟動我們的服務
# 服務一些參數信息的配置
options = [
    ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制發送的最大的數據大小
    ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的數據的大小
]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全局的數據傳輸的壓縮機制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=options,
                     compression=compression,
                     interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 添加我們服務
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置啟動的端口
server.add_insecure_port('[::]:50051')
#  開始啟動的服務
server.start()

def stop_serve(signum, frame):
    print("進程結束了!!!!")
    # sys.exit(0)
    raise KeyboardInterrupt

# 註銷相關的信號
# SIGINT 對應windos下的 ctrl+c的命令
# SIGTERM 對應的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)

# wait_for_termination --主要是為了目標啟動後主進程直接的結束!需要一個循環的方式進行進行進程運行
server.wait_for_termination()

if name == 'main': serve() ```

通過上面的方式,我們就可以對應我們的上下文請求做相關的處理了!這個和我們的web框架的中間件幾乎是接近類似了!

4.2 簡單分析第三方庫簡單源碼

進入這個第三方庫的源碼內部的,其實發現它自己也是實現了 grpc.ServerInterceptor 然後對它進一步進行了抽象一層

  • 第一步其實和我們自帶的實現一樣,先是獲取返回的下一個帶處理的handle next_handler = continuation(handler_call_details) 然後對返回這個next_handler進行是那種類型的的攔截器:

    • unary_unary
    • unary_stream
    • stream_unary
    • stream_stream
  • 判斷完成是哪裏蕾西的攔截器之後返回 handler_factory, next_handler_method 然後調用的是最終返回是handler_factory的對象

  • handler_factory的對象需要的參數有:

    • invoke_intercept_method 攔截器的方法
    • request_deserializer 請求的系列化
    • response_serializer 響應的系列化
  • 而我們的invoke_intercept_method 攔截器的方法獲取則需要

    • 傳入定義的一個 request: Any, context: grpc.ServicerContext,
  • 然後返回是我們的最終需要實現的方法!~我去~暈了~

4.3 補充説明handler_call_details

如果我們的單純只是需要獲取到RPC請求裏面的提交請求頭元數據的,我們可以使用它讀取:

print("handler call details: ", handler_call_details.invocation_metadata) 它本身是一個: ~~~ grpc._server._HandlerCallDetails的類型 ~~~

總結

以上僅僅是個人結合自己的實際需求,做學習的實踐筆記!如有筆誤!歡迎批評指正!感謝各位大佬!

結尾

END

簡書:https://www.jianshu.com/u/d6960089b087

掘金:https://juejin.cn/user/2963939079225608

公眾號:微信搜【小兒來一壺枸杞酒泡茶】

小鐘同學 | 文 【歡迎一起學習交流】| QQ:308711822