fastapi微服務系列(2)-之GRPC的interceptor攔截器簡單使用(中介軟體)
theme: juejin
對於一個框架來說,通常具備有所謂的中介軟體,有時候也可以說是攔截器,其實和鉤子差不多的概念。
那grpc也不例外。但是使用python如何應用到我們的攔截器的吶? 攔截器又可以做哪些事情呢?
1:grpc的攔截器可以做啥?
本身攔截器的概念和我們的中介軟體類似,所以類似fastapi中我們的中介軟體能做,攔截器都可以做:
- 身份驗證
- 日誌請求記錄
- 全域性上下文的資訊處理等
- 多個攔截器和多箇中間件遵循的請求規則都是洋蔥模型
- 攔截器必須有返回值,返回是響應報文體
PS:而且相對GRPC來說不止於我們的服務端有鉤子,客戶端也有鉤子(攔截器),和我們的httpx庫提供的類似的鉤子函式差不多!
PS:攔截器可以作用再客戶端和服務端:客戶端攔截器和服務端攔截器
2:grpc的攔截器分類
- 一元攔截器(UnaryServerInterceptor)-客戶端中
- 流式攔截器(StreamClientInterceptor)- 客戶端中
- python中的服務端是實現ServerInterceptor
3:在python實現grpc攔截器
檢視服務傳遞的攔截器引數說明:
3.1 服務端的自帶攔截器
主要注意點:
- 攔截器傳入是一個例項化的物件
- 攔截器列表的傳入,可以是元組也可以是列表
- 多攔截器的形式遵循洋蔥模型
服務端攔截器需要實現攔截器的抽象方法:
完整服務端示例程式碼: ``` from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal
實現 proto檔案中定義的 GreeterServicer的介面
class Greeter(hello_pb2_grpc.GreeterServicer): # 實現 proto 檔案中定義的 rpc 呼叫 def SayHello(self, request, context): # 返回是我們的定義的響應體的物件 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
def SayHelloAgain(self, request, context):
# 返回是我們的定義的響應體的物件
# # 設定異常狀態碼
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你沒有這個訪問的許可權")
# raise context
# 接收請求頭的資訊
print("接收到的請求頭元資料資訊", context.invocation_metadata())
# 設定響應報文頭資訊
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 區域性的資料進行壓縮
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
class MyUnaryServerInterceptor1(grpc.ServerInterceptor):
def intercept_service(self,continuation, handler_call_details):
print("我是攔截器1號:開始----1")
respn = continuation(handler_call_details)
print("我是攔截器1號:結束----2",respn)
return respn
class MyUnaryServerInterceptor2(grpc.ServerInterceptor):
def intercept_service(self,continuation, handler_call_details):
print("我是攔截器2號:開始----1")
respn = continuation(handler_call_details)
print("我是攔截器2號:結束----2",respn)
return respn
def serve():
# 例項化一個rpc服務,使用執行緒池的方式啟動我們的服務
# 服務一些引數資訊的配置
options = [
('grpc.max_send_message_length', 60 * 1024 * 1024), # 限制傳送的最大的資料大小
('grpc.max_receive_message_length', 60 * 1024 * 1024), # 限制接收的最大的資料的大小
]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全域性的資料傳輸的壓縮機制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=options,
compression=compression,
interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 新增我們服務
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置啟動的埠
server.add_insecure_port('[::]:50051')
# 開始啟動的服務
server.start()
def stop_serve(signum, frame):
print("程序結束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt
# 登出相關的訊號
# SIGINT 對應windos下的 ctrl+c的命令
# SIGTERM 對應的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)
# wait_for_termination --主要是為了目標啟動後主程序直接的結束!需要一個迴圈的方式進行進行程序執行
server.wait_for_termination()
if name == 'main': serve() ```
關鍵的配置地方是:
此時使用我們的客戶端請求服務端,服務端會輸出一下的資訊:
~~~
我是攔截器1號:開始----1
我是攔截器2號:開始----1
我是攔截器2號:結束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=
~~~
3.2 客戶端的自帶攔截器
客戶端攔截器的需要實現類和服務端的不一樣:
且當我們的使用客戶端攔截器的時候,主要連結到我們的RPC的時候的方式也有所改變:
完整客戶端示例程式碼:
``` import grpc import hello_pb2 import hello_pb2_grpc
class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客戶端的攔截器1:---開始1") resp = continuation(client_call_details, request) print("客戶端的攔截器1:---結束2", resp) return resp
class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客戶端的攔截器2:---開始1") resp = continuation(client_call_details, request) print("客戶端的攔截器2:---結束2", resp) return resp
def run(): # 連線 rpc 伺服器 options = [ ('grpc.max_send_message_length', 100 * 1024 * 1024), ('grpc.max_receive_message_length', 100 * 1024 * 1024), ('grpc.enable_retries', 1), ('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }') ]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全域性的資料傳輸的壓縮機制
compression = grpc.Compression.Gzip
# with grpc.insecure_channel(target='localhost:50051',
# options=options,
# compression=compression
# ) as channel:
with grpc.insecure_channel(target='localhost:50051',
options=options,
compression=compression
) as channel:
# 通過通道服務一個服務intercept_channel
interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2())
stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
# 生成請求我們的服務的函式的時候,需要傳遞的引數體,它放在hello_pb2裡面-請求體為:hello_pb2.HelloRequest物件
try:
reest_header = (
('mesasge', '1010'),
('error', 'No Error')
)
response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='歡迎下次光臨'),
# 設定請求的超時處理
timeout=5,
# 設定請求的頭的資訊
metadata=reest_header,
)
print("SayHelloAgain函式呼叫結果的返回: " + response.message)
print("SayHelloAgain函式呼叫結果的返回---響應報文頭資訊: ", callbask.trailing_metadata())
except grpc._channel._InactiveRpcError as e:
print(e.code())
print(e.details())
if name == 'main': run() ```
4:grpc攔截器上下文傳遞
我們的都知道作為中介軟體的話,一般某些業務場景下是有些使用承載請求上下文的傳遞的任務滴,然是自帶的攔截器,似乎完全沒有對應的
request, context
相關的引入傳遞,如果我們的需要傳遞上下文的時候呢?這就無法實現了!!!!
要實現具有上下文的傳遞攔截器的話使用第三方庫來實現:
pip install grpc-interceptor
這個庫還字典的相關的測試:
$ pip install grpc-interceptor[testing]
4.1 改造服務端攔截器
該用 第三方庫後的完整服務端改造示例:
``` from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal from typing import Any,Callable
實現 proto檔案中定義的 GreeterServicer的介面
class Greeter(hello_pb2_grpc.GreeterServicer): # 實現 proto 檔案中定義的 rpc 呼叫 def SayHello(self, request, context): # 返回是我們的定義的響應體的物件 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
def SayHelloAgain(self, request, context):
# 返回是我們的定義的響應體的物件
# # 設定異常狀態碼
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你沒有這個訪問的許可權")
# raise context
# 接收請求頭的資訊
print("接收到的請求頭元資料資訊", context.invocation_metadata())
# 設定響應報文頭資訊
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 區域性的資料進行壓縮
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
from grpc_interceptor import ServerInterceptor from grpc_interceptor.exceptions import GrpcException from grpc_interceptor.exceptions import NotFound class MyUnaryServerInterceptor1(ServerInterceptor):
def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
rsep = None
try:
print("我是攔截器1號:開始----1")
rsep= method(request, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
print("我是攔截器1號:結束----2",rsep)
return rsep
class MyUnaryServerInterceptor2(ServerInterceptor):
def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
rsep = None
try:
print("我是攔截器2號:開始----1")
rsep= method(request, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
print("我是攔截器2號:結束----2",rsep)
return rsep
def serve():
# 例項化一個rpc服務,使用執行緒池的方式啟動我們的服務
# 服務一些引數資訊的配置
options = [
('grpc.max_send_message_length', 60 * 1024 * 1024), # 限制傳送的最大的資料大小
('grpc.max_receive_message_length', 60 * 1024 * 1024), # 限制接收的最大的資料的大小
]
# 三種的壓縮機制處理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服務啟動全域性的資料傳輸的壓縮機制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=options,
compression=compression,
interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 新增我們服務
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置啟動的埠
server.add_insecure_port('[::]:50051')
# 開始啟動的服務
server.start()
def stop_serve(signum, frame):
print("程序結束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt
# 登出相關的訊號
# SIGINT 對應windos下的 ctrl+c的命令
# SIGTERM 對應的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)
# wait_for_termination --主要是為了目標啟動後主程序直接的結束!需要一個迴圈的方式進行進行程序執行
server.wait_for_termination()
if name == 'main': serve() ```
通過上面的方式,我們就可以對應我們的上下文請求做相關的處理了!這個和我們的web框架的中介軟體幾乎是接近類似了!
4.2 簡單分析第三方庫簡單原始碼
進入這個第三方庫的原始碼內部的,其實發現它自己也是實現了
grpc.ServerInterceptor
然後對它進一步進行了抽象一層
-
第一步其實和我們自帶的實現一樣,先是獲取返回的下一個帶處理的handle
next_handler = continuation(handler_call_details)
然後對返回這個next_handler進行是那種型別的的攔截器:- unary_unary
- unary_stream
- stream_unary
- stream_stream
-
判斷完成是哪裡蕾西的攔截器之後返回
handler_factory, next_handler_method
然後呼叫的是最終返回是handler_factory的物件 -
handler_factory的物件需要的引數有:
- invoke_intercept_method 攔截器的方法
- request_deserializer 請求的系列化
- response_serializer 響應的系列化
-
而我們的invoke_intercept_method 攔截器的方法獲取則需要
- 傳入定義的一個
request: Any, context: grpc.ServicerContext,
- 傳入定義的一個
-
然後返回是我們的最終需要實現的方法!~我去~暈了~
4.3 補充說明handler_call_details
如果我們的單純只是需要獲取到RPC請求裡面的提交請求頭元資料的,我們可以使用它讀取:
print("handler call details: ", handler_call_details.invocation_metadata)
它本身是一個:
~~~
grpc._server._HandlerCallDetails的型別
~~~
總結
以上僅僅是個人結合自己的實際需求,做學習的實踐筆記!如有筆誤!歡迎批評指正!感謝各位大佬!
結尾
END
簡書:http://www.jianshu.com/u/d6960089b087
掘金:http://juejin.cn/user/2963939079225608
公眾號:微信搜【小兒來一壺枸杞酒泡茶】
小鐘同學 | 文 【歡迎一起學習交流】| QQ:308711822
- FastApi(自用腳手架) Snowy搭建後臺管理系統(13)-以類的方式定義後臺任務依賴項
- FastApi(自用腳手架) Snowy搭建後臺管理系統(11)- 使用裝飾器方式注入類形式依賴項
- fastapi微服務系列(2)-之GRPC的interceptor攔截器簡單使用(中介軟體)
- Fastapi框架 KubeSphere3.1.1系列(6):手動的部署fastapi-redis postgresql服務(使用阿里雲映象倉庫)
- Fastapi框架 KubeSphere3.1.1系列(4): 應用商店和日誌落盤收集實踐
- Fastapi框架 KubeSphere3.1.1系列(5): kubesphere手動的部署一個fastapi-Hello World服務