fastapi微服務系列(2)-之GRPC的interceptor攔截器簡單使用(中介軟體)

語言: CN / TW / HK

theme: juejin

對於一個框架來說,通常具備有所謂的中介軟體,有時候也可以說是攔截器,其實和鉤子差不多的概念。

那grpc也不例外。但是使用python如何應用到我們的攔截器的吶? 攔截器又可以做哪些事情呢?

1:grpc的攔截器可以做啥?

本身攔截器的概念和我們的中介軟體類似,所以類似fastapi中我們的中介軟體能做,攔截器都可以做:

  • 身份驗證
  • 日誌請求記錄
  • 全域性上下文的資訊處理等
  • 多個攔截器和多箇中間件遵循的請求規則都是洋蔥模型
  • 攔截器必須有返回值,返回是響應報文體

PS:而且相對GRPC來說不止於我們的服務端有鉤子,客戶端也有鉤子(攔截器),和我們的httpx庫提供的類似的鉤子函式差不多!

PS:攔截器可以作用再客戶端和服務端:客戶端攔截器和服務端攔截器

2:grpc的攔截器分類

  • 一元攔截器(UnaryServerInterceptor)-客戶端中
  • 流式攔截器(StreamClientInterceptor)- 客戶端中
  • python中的服務端是實現ServerInterceptor

image.png

3:在python實現grpc攔截器

檢視服務傳遞的攔截器引數說明:

image.png

3.1 服務端的自帶攔截器

主要注意點:

  • 攔截器傳入是一個例項化的物件
  • 攔截器列表的傳入,可以是元組也可以是列表
  • 多攔截器的形式遵循洋蔥模型

服務端攔截器需要實現攔截器的抽象方法:

image.png

完整服務端示例程式碼: ``` from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal

實現 proto檔案中定義的 GreeterServicer的介面

class Greeter(hello_pb2_grpc.GreeterServicer): # 實現 proto 檔案中定義的 rpc 呼叫 def SayHello(self, request, context): # 返回是我們的定義的響應體的物件 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

def SayHelloAgain(self, request, context):
    # 返回是我們的定義的響應體的物件

    # # 設定異常狀態碼
    # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    # context.set_details("你沒有這個訪問的許可權")
    # raise context

    # 接收請求頭的資訊
    print("接收到的請求頭元資料資訊", context.invocation_metadata())
    # 設定響應報文頭資訊
    context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
    # 三種的壓縮機制處理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 區域性的資料進行壓縮
    context.set_compression(grpc.Compression.Gzip)
    return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

class MyUnaryServerInterceptor1(grpc.ServerInterceptor):

def intercept_service(self,continuation, handler_call_details):
    print("我是攔截器1號:開始----1")
    respn = continuation(handler_call_details)
    print("我是攔截器1號:結束----2",respn)
    return respn

class MyUnaryServerInterceptor2(grpc.ServerInterceptor):

def intercept_service(self,continuation, handler_call_details):
    print("我是攔截器2號:開始----1")
    respn = continuation(handler_call_details)
    print("我是攔截器2號:結束----2",respn)
    return respn

def serve():

# 例項化一個rpc服務,使用執行緒池的方式啟動我們的服務
# 服務一些引數資訊的配置
options = [
    ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制傳送的最大的資料大小
    ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的資料的大小
]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全域性的資料傳輸的壓縮機制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=options,
                     compression=compression,
                     interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 新增我們服務
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置啟動的埠
server.add_insecure_port('[::]:50051')
#  開始啟動的服務
server.start()

def stop_serve(signum, frame):
    print("程序結束了!!!!")
    # sys.exit(0)
    raise KeyboardInterrupt

# 登出相關的訊號
# SIGINT 對應windos下的 ctrl+c的命令
# SIGTERM 對應的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)

# wait_for_termination --主要是為了目標啟動後主程序直接的結束!需要一個迴圈的方式進行進行程序執行
server.wait_for_termination()

if name == 'main': serve() ```

關鍵的配置地方是:

image.png 此時使用我們的客戶端請求服務端,服務端會輸出一下的資訊:

~~~ 我是攔截器1號:開始----1 我是攔截器2號:開始----1 我是攔截器2號:結束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=, response_serializer=, unary_unary=<bound method Greeter.SayHelloAgain of <main.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None) 我是攔截器1號:結束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=, response_serializer=, unary_unary=<bound method Greeter.SayHelloAgain of <main.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None) 接收到的請求頭元資料資訊 (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', value='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))

~~~

3.2 客戶端的自帶攔截器

客戶端攔截器的需要實現類和服務端的不一樣:

image.png

且當我們的使用客戶端攔截器的時候,主要連結到我們的RPC的時候的方式也有所改變:

image.png

完整客戶端示例程式碼:

``` import grpc import hello_pb2 import hello_pb2_grpc

class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客戶端的攔截器1:---開始1") resp = continuation(client_call_details, request) print("客戶端的攔截器1:---結束2", resp) return resp

class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客戶端的攔截器2:---開始1") resp = continuation(client_call_details, request) print("客戶端的攔截器2:---結束2", resp) return resp

def run(): # 連線 rpc 伺服器 options = [ ('grpc.max_send_message_length', 100 * 1024 * 1024), ('grpc.max_receive_message_length', 100 * 1024 * 1024), ('grpc.enable_retries', 1), ('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }') ]

# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全域性的資料傳輸的壓縮機制
compression = grpc.Compression.Gzip
# with grpc.insecure_channel(target='localhost:50051',
#                            options=options,
#                            compression=compression
#                            ) as channel:


with grpc.insecure_channel(target='localhost:50051',
                           options=options,
                           compression=compression
                           ) as channel:
    # 通過通道服務一個服務intercept_channel
    interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2())
    stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
    # 生成請求我們的服務的函式的時候,需要傳遞的引數體,它放在hello_pb2裡面-請求體為:hello_pb2.HelloRequest物件
    try:

        reest_header = (
            ('mesasge', '1010'),
            ('error', 'No Error')
        )

        response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='歡迎下次光臨'),
                                                          # 設定請求的超時處理
                                                          timeout=5,
                                                          # 設定請求的頭的資訊
                                                          metadata=reest_header,
                                                          )
        print("SayHelloAgain函式呼叫結果的返回: " + response.message)
        print("SayHelloAgain函式呼叫結果的返回---響應報文頭資訊: ", callbask.trailing_metadata())
    except grpc._channel._InactiveRpcError as e:
        print(e.code())
        print(e.details())

if name == 'main': run() ```

4:grpc攔截器上下文傳遞

我們的都知道作為中介軟體的話,一般某些業務場景下是有些使用承載請求上下文的傳遞的任務滴,然是自帶的攔截器,似乎完全沒有對應的 request, context 相關的引入傳遞,如果我們的需要傳遞上下文的時候呢?這就無法實現了!!!!

要實現具有上下文的傳遞攔截器的話使用第三方庫來實現:

pip install grpc-interceptor

這個庫還字典的相關的測試: $ pip install grpc-interceptor[testing]

4.1 改造服務端攔截器

該用 第三方庫後的完整服務端改造示例:

``` from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal from typing import Any,Callable

實現 proto檔案中定義的 GreeterServicer的介面

class Greeter(hello_pb2_grpc.GreeterServicer): # 實現 proto 檔案中定義的 rpc 呼叫 def SayHello(self, request, context): # 返回是我們的定義的響應體的物件 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

def SayHelloAgain(self, request, context):
    # 返回是我們的定義的響應體的物件

    # # 設定異常狀態碼
    # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    # context.set_details("你沒有這個訪問的許可權")
    # raise context

    # 接收請求頭的資訊
    print("接收到的請求頭元資料資訊", context.invocation_metadata())
    # 設定響應報文頭資訊
    context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
    # 三種的壓縮機制處理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 區域性的資料進行壓縮
    context.set_compression(grpc.Compression.Gzip)
    return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

from grpc_interceptor import ServerInterceptor from grpc_interceptor.exceptions import GrpcException from grpc_interceptor.exceptions import NotFound class MyUnaryServerInterceptor1(ServerInterceptor):

def intercept(
        self,
        method: Callable,
        request: Any,
        context: grpc.ServicerContext,
        method_name: str,
) -> Any:

    rsep = None
    try:
        print("我是攔截器1號:開始----1")
        rsep= method(request, context)
    except GrpcException as e:
        context.set_code(e.status_code)
        context.set_details(e.details)
        raise
    finally:
        print("我是攔截器1號:結束----2",rsep)
        return rsep

class MyUnaryServerInterceptor2(ServerInterceptor):

def intercept(
        self,
        method: Callable,
        request: Any,
        context: grpc.ServicerContext,
        method_name: str,
) -> Any:

    rsep = None
    try:
        print("我是攔截器2號:開始----1")
        rsep= method(request, context)
    except GrpcException as e:
        context.set_code(e.status_code)
        context.set_details(e.details)
        raise
    finally:
        print("我是攔截器2號:結束----2",rsep)
        return rsep

def serve():

# 例項化一個rpc服務,使用執行緒池的方式啟動我們的服務
# 服務一些引數資訊的配置
options = [
    ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制傳送的最大的資料大小
    ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的資料的大小
]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全域性的資料傳輸的壓縮機制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=options,
                     compression=compression,
                     interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 新增我們服務
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置啟動的埠
server.add_insecure_port('[::]:50051')
#  開始啟動的服務
server.start()

def stop_serve(signum, frame):
    print("程序結束了!!!!")
    # sys.exit(0)
    raise KeyboardInterrupt

# 登出相關的訊號
# SIGINT 對應windos下的 ctrl+c的命令
# SIGTERM 對應的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)

# wait_for_termination --主要是為了目標啟動後主程序直接的結束!需要一個迴圈的方式進行進行程序執行
server.wait_for_termination()

if name == 'main': serve() ```

通過上面的方式,我們就可以對應我們的上下文請求做相關的處理了!這個和我們的web框架的中介軟體幾乎是接近類似了!

4.2 簡單分析第三方庫簡單原始碼

進入這個第三方庫的原始碼內部的,其實發現它自己也是實現了 grpc.ServerInterceptor 然後對它進一步進行了抽象一層

  • 第一步其實和我們自帶的實現一樣,先是獲取返回的下一個帶處理的handle next_handler = continuation(handler_call_details) 然後對返回這個next_handler進行是那種型別的的攔截器:

    • unary_unary
    • unary_stream
    • stream_unary
    • stream_stream
  • 判斷完成是哪裡蕾西的攔截器之後返回 handler_factory, next_handler_method 然後呼叫的是最終返回是handler_factory的物件

  • handler_factory的物件需要的引數有:

    • invoke_intercept_method 攔截器的方法
    • request_deserializer 請求的系列化
    • response_serializer 響應的系列化
  • 而我們的invoke_intercept_method 攔截器的方法獲取則需要

    • 傳入定義的一個 request: Any, context: grpc.ServicerContext,
  • 然後返回是我們的最終需要實現的方法!~我去~暈了~

4.3 補充說明handler_call_details

如果我們的單純只是需要獲取到RPC請求裡面的提交請求頭元資料的,我們可以使用它讀取:

print("handler call details: ", handler_call_details.invocation_metadata) 它本身是一個: ~~~ grpc._server._HandlerCallDetails的型別 ~~~

總結

以上僅僅是個人結合自己的實際需求,做學習的實踐筆記!如有筆誤!歡迎批評指正!感謝各位大佬!

結尾

END

簡書:http://www.jianshu.com/u/d6960089b087

掘金:http://juejin.cn/user/2963939079225608

公眾號:微信搜【小兒來一壺枸杞酒泡茶】

小鐘同學 | 文 【歡迎一起學習交流】| QQ:308711822