【萬字長文】Apache ShenYu整合Apache RocketMQ實現海量日誌採集的原理與實踐

語言: CN / TW / HK

Apache ShenYu整合Apache RocketMQ實現海量日誌採集的原理與實踐

本文作者

本文作者:hutaishi, 快手軟體工程師,Apache ShenYu Contributor

可觀測性介紹

可觀測性能力的應用離不開資料與資訊,而日誌(logs)、指標(metrics)與鏈路(trace)是最重要的資料資訊源。
Apache ShenYu 利用 java agent  位元組碼增強 技術實現了無侵入的可觀性,使得使用者無需引入依賴即可接入第三方可觀測性系統,
獲取 Traces、Metrics 和 Logging。 
本文基於2.4.3-SNAPSHOT版本分析Apache ShenYu整合RocketMQ實現日誌的可觀測性。
分析Agent技術架構Agent實現細節響應式系統中日誌採集RocketMQ的整合

採集可觀測性日誌

從資料的採集到視覺化的整體流程圖如下:

 

Java Agent和ByteBuddy技術

ShenYu-Agent的可觀測性實現模組儘管是放在ShenYu專案下,但實際上他和閘道器是獨立的,對閘道器的監控是無程式碼侵入的。 

這種無程式碼侵入的自動埋點基於Java Agent技術。Java Agent是JDK1.5之後引入的新特性,該特性可以在JVM將位元組碼檔案讀入記憶體之後,使用對應的位元組流在Java堆中生成一個Class物件之前,對其位元組碼進行修改(增強)的能力,而JVM也會使用使用者修改過的位元組碼進行Class物件的建立。 

Shenyu-Agent利用JavaAgent技術攔截建構函式、例項方法、靜態方法並進行AOP修改,使用ByteBuddy做位元組碼增強。

當執行到某個被ShenYu-agent代理過的方法時, 會執行自定義的增強邏輯來實現指標、日誌、鏈路的收集、傳遞、上報,實現閘道器的可觀測性。

Java Agent中的核心類

Instrumentation

public interface Instrumentation {

    //增加一個Class 檔案的轉換器,轉換器用於改變 Class 二進位制流的資料,// 引數 canRetransform 設定是否允許重新轉換。
    void addTransformer(ClassFileTransformer transformer, boolean canRetransform);

    void addTransformer(ClassFileTransformer transformer);

    //刪除一個類轉換器
    boolean removeTransformer(ClassFileTransformer transformer);
    // 是否支援重新轉換類
    boolean isRetransformClassesSupported();

    //在類載入之後,重新定義 Class。這個很重要,該方法是1.6 之後加入的,事實上,該方法是 update 了一個類。
    void retransformClasses(Class<?>... classes) throws UnmodifiableClassException;

    // 是否支援重新定義一個類
    boolean isRedefineClassesSupported();

    // 重新定義一個類
    void redefineClasses(ClassDefinition... definitions)throws  ClassNotFoundException, UnmodifiableClassException;

    boolean isModifiableClass(Class<?> theClass);

    @SuppressWarnings("rawtypes")
    Class[] getAllLoadedClasses();

    @SuppressWarnings("rawtypes")
    Class[] getInitiatedClasses(ClassLoader loader);
    //獲取一個物件的大小long getObjectSize(Object objectToSize);

    void appendToBootstrapClassLoaderSearch(JarFile jarfile);

    void appendToSystemClassLoaderSearch(JarFile jarfile);

    boolean isNativeMethodPrefixSupported();

    void setNativeMethodPrefix(ClassFileTransformer transformer, String prefix);
}

在類載入之前,重新定義 Class 檔案,ClassDefinition 表示對一個類新的定義,如果在類載入之後,需要使用 retransformClasses 方法重新定義。 

addTransformer方法配置之後,後續的類載入都會被Transformer攔截。對於已經載入過的類,可以執行retransformClasses來重新觸發,這個Transformer的攔截。類載入的位元組碼被修改後,除非再次被retransform,否則不會恢復。

ClassFileTransformer

這個介面的主要作用就是位元組碼轉換,修改載入JVM的位元組碼資料,從而增加我們期望的功能邏輯。

public interface ClassFileTransformer {

    byte[]    transform(  ClassLoader         loader,
                String              className,
                Class<?>            classBeingRedefined,
                ProtectionDomain    protectionDomain,
                byte[]              classfileBuffer)
        throws IllegalClassFormatException;
}

ClassLoader:當前載入Class的ClassLoader。如果ClassLoader是bootstrap Loader,為null。

className:當前載入Class的類名。Java虛擬機器規範中定義的完全限定類和介面名稱的內部形式的類的名稱,例如java/util/List。

byte[] classfileBuffer:當前類的以byte陣列呈現的位元組碼資料。(可能跟class檔案的資料不一致,因為此處的byte資料是此類最新的位元組碼資料,
即此資料可能是原始位元組碼資料被其他增強方法增強之後的位元組碼資料)

JavaAgent內部位元組碼修改流程圖如下:

通過上面的分析

 

我們會發現ClassFileTransformer這個介面的實現很偏底層,我們可以利用ByteBuddy位元組碼增強框架來高效實現ClassFileTransformer介面。
呼叫java agent中的premain方法的Instrumentation引數的addTransformer方法,新增我們的ClassFileTransformer實現。

 

    public static void premain(String agentArgs, Instrumentation inst) {
        ClassFileTransformer myClassFileTransformer;  // 實現ClassFileTransformer介面
        inst.addTransformer(myClassFileTransformer);
    }

ByteBuddy操作位元組碼的神器

通過前面的JavaAgent介紹,我們知道JavaAgent的關鍵是修改位元組碼,位元組碼程式設計框架或庫有asm,javassist,byte-buddy。 

Byte Buddy 玩法上更加高階,你可以完全不需要了解一個類和方法塊是如何通過 指令碼 LDC、LOAD、STORE、IRETURN… 生成出來的。 

就像它的官網介紹;Byte Buddy 是一個程式碼生成和操作庫,用於在 Java 應用程式執行時建立和修改 Java 類,而無需編譯器的幫助。

除了 Java 類庫附帶的程式碼生成實用程式外,Byte Buddy 還允許建立任意類,並且不限於實現用於建立執行時代理的介面。 

此外,Byte Buddy 提供了一種方便的 API,可以使用 Java 代理或在構建過程中手動更改類。

下面是一個使用ByteBuddy的樣例:

public class MyAgent {
    public static void premain(String agentArgs, Instrumentation inst) {
        System.out.println("this is my agent:" + agentArgs);
        // 攔截哪個方法,以及攔截邏輯
        AgentBuilder.Transformer transformer = (builder, typeDescription, classLoader, javaModule) -> {
            return builder
                    .method(ElementMatchers.any()) // 攔截任意方法
                    .intercept(MethodDelegation.to(MethodCostTime.class)); // 委託到MethodCostTime
        };
        // 修改位元組碼的監聽器,一般調式用到。這裡由於空間,加個註釋。
        // AgentBuilder.Listener listener = new AgentBuilder.Listener() {}
        // 通過byte-buddy構造ClassFileTransformer並新增到Instrumentation inst
        new AgentBuilder
                .Default()
                .type(ElementMatchers.nameStartsWith("org.demo.test")) // 指定需要攔截的類
                .transform(transformer)
                .with(listener)
                .installOn(inst); // 將ClassFileTransformer安裝到Instrumentation
        }
    }
}

2. ShenYuAgent外掛化技術架構

ShenYu閘道器採用外掛化設計思想,外掛熱插拔,方便擴充套件,而在ShenYu Agent的設計中同樣採用了外掛化設計思想,支援熱插拔,非常容易擴充套件。 

分析採集可觀測性資料metrics、tracing、logging,我們一般只需要在方法執行前、執行後以及執行異常這3個地方埋點,  

就可以說實現指標、日誌、鏈路的收集、傳遞、上報。 

ShenYu在位元組碼增強方面提供了一個統一的AOP,AOP寫好增強的各種細節以及對配置的解析和外掛的管理。

上層外掛只需要寫增強的程式碼邏輯即可,類似spring中的AOP.

ByteBuddy實現AOP

ByteBuddy實現AOP,涉及4個關鍵點。

  • 確定攔截的類的方法。根據配置的攔截的類和方法構建ByteBuddy的型別匹配器、方法匹配器

  • 構建一個帶有before、after、onThrowing方法的介面處理器,外掛層實現這個介面來實現相應的業務邏輯

  • 對每個被攔截的類的方法構建一個@RuntimeType的方法,這個方法用於實現攔截邏輯。這個邏輯裡面會分別遍歷第二步的外掛處理器介面

  • 定義一個獲得和設定上下文的介面,讓所有被攔截的類實現這個介面,實現了這個介面的被攔截的類會作為引數傳入前面的處理方法中。這個主要用於鏈路追蹤

下面分別分析ShenYu是如何實現上面4個關鍵點的。

確定攔截的類的方法。根據配置的攔截的類和方法構建ByteBuddy的型別匹配器、方法匹配器

ShenYu Agent採取配置化的方式,可以配置攔截的類和方法。如下是一個採集日誌需要配置的攔截的類和方法。
logging-point.yaml:

pointCuts:
  - targetClass: org.springframework.web.server.adapter.HttpWebHandlerAdapter
    points:
      - type: instanceMethod
        name: createExchange
    handlers:
      rocketmq:
        - org.apache.shenyu.agent.plugin.logging.common.handler.DefaultLoggingPluginHandler

ShenYu Agent定義了一個獲取節點集合的介面AgentPluginDefinition,採用ShenYu SPI載入各個外掛的切點集合。

@SPI
public interface AgentPluginDefinition {

    /**
     * Collector shenyu agent join point.
     *
     * @return the collection
     */
    Collection<ShenyuAgentJoinPoint> collector();
}

一般這個介面的實現是利用ShenYu Agent提供的yaml工具類讀取相關yaml配置,ShenYuAgent根據攔截的方法構建ElementMatcher matcher。

用於確定ByteBuddy需要攔截哪些方法。最終的構建結果便是:

public final class ShenyuAgentJoinPoint {

    private final String classTarget; // 攔截的目標類

    private final List<ConstructorPointCut> constructorPoints;  // 攔截的目標類下的構造方法

    private final List<InstanceMethodPointCut> instanceMethodPoints; // 攔截的目標類下的例項方法

    private final List<StaticMethodPointCut> staticMethodPoints; // 攔截的目標類下的靜態方法
}

ShenyuAgentTypeMatcher繼承ElementMatcher.Junction.AbstractBase用於確定攔截哪些類。

構建一個帶有before、after、onThrowing方法的介面處理器,外掛層實現這個介面來實現相應的業務邏輯

利用ByteBuddy的@RuntimeType註解可以實現攔截器邏輯。當執行到被攔截的方法時,就會執行@RuntimeType註解的方法,在這個方法裡面可以實現我們的邏輯。
定義的攔截方法如下:

    @RuntimeType
    public Object intercept(@This final Object target, @Origin final Method method, 
    @AllArguments final Object[] args, @SuperCall final Callable<?> callable) throws Exception;

註解含義如下:
@This    當前被攔截的、動態生成的那個物件  
@Origin  可以繫結到以下型別的引數:Method 被呼叫的原始方法, Constructor 被呼叫的原始構造器, Class 當前動態建立的類等  
@AllArguments    繫結所有引數的陣列  
@SuperCall   用於呼叫父類版本的方法

實現的邏輯便是執行被攔截的方法前、方法後、出現異常的情況下,分別遍歷每個外掛處理器hander執行對應的before、after、onThrowing方法。

    @RuntimeType
    public Object intercept(@This final Object target, @Origin final Method method, @AllArguments final Object[] args,
        @SuperCall final Callable<?> callable) throws Exception {

        TargetObject instance = (TargetObject) target;
        MethodResult methodResult = new MethodResult();
        for (InstanceMethodHandler handler : handlerList) {
            // handler before logic 執行before邏輯
        }
        Object result;
        try {
            result = callable.call();
        } catch (final Throwable ex) {
            for (InstanceMethodHandler handler : handlerList) {
                // handler onThrowing logic 執行onThrowing邏輯
            }
            throw ex;
        } finally {
            for (InstanceMethodHandler handler : handlerList) {
                // handler after logic 執行after邏輯
            }
        }
        return result;
    }

定義一個獲得和設定上下文的介面,讓所有被攔截的類實現這個介面

上下文介面定義:

public interface TargetObject {
    Object getContext();

    void setContext(Object context);
}

ByteBuddy讓每個被攔截的類實現這個方法。關鍵程式碼如下:

builder.defineField(EXTRA_DATA, Object.class, Opcodes.ACC_PRIVATE | Opcodes.ACC_VOLATILE)  // 定義一個上下文欄位
.implement(TargetObject.class)  // 實現上下文介面
.intercept(FieldAccessor.ofField(EXTRA_DATA)); // 實現邏輯為這個上下文欄位的get,set。

外掛生命週期管理

ShenYu定義了外掛啟動服務,包含start和close方法。在premain方法中利用SPI技術啟動每個外掛,同時會註冊一個關閉所有外掛的鉤子ShutdownHook。

外掛啟動流程圖如下: 

 

 

2.1 自定義類載入器ShenyuAgentPluginLoader載入外掛

由於外掛會被打成一個個的獨立的jar包,這些jar包會被統一放入到shenyu-agent-dist下,要訪問這些jar中的外掛,需要自定義一個類載入器
來載入。ShenyuAgentPluginLoader繼承ClassLoader,採取雙親委派機制來載入類。ShenyuAgentPluginLoader在約定的外掛目錄下掃描所有的jar包,
在這些jar包中找class和Resources.

.
├── conf
│   ├── logback.xml
│   ├── logging-meta.yaml
│   ├── logging-point.yaml
│   ├── metrics-meta.yaml
│   ├── metrics-point.yaml
│   ├── shenyu-agent.yaml
│   └── tracing-point.yaml
├── plugins
│   ├── shenyu-agent-plugin-logging-api-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-logging-common-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-logging-rocketmq-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-metrics-api-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-metrics-common-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-metrics-prometheus-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-tracing-common-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-tracing-jaeger-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-tracing-opentelemetry-2.4.3-SNAPSHOT.jar
│   └── shenyu-agent-plugin-tracing-zipkin-2.4.3-SNAPSHOT.jar
└── shenyu-agent.jar

載入這些jar中的InstanceMethodHandler便是通過自定義類載入器實現的。

inst = Class.forName(className, true, ShenyuAgentPluginLoader.getInstance()).newInstance();

2.2外掛需要實現的介面

要完成一個外掛,只需實現3個介面。shenyu-agent基於SPI技術載入你的外掛。

  • AgentPluginDefinition:定義增強(攔截)哪個類的哪些方法以及對應的處理器

  • AgentPluginBootService:用於啟動外掛

  • MethodHandler:攔截的方法執行的增強邏輯(涉及攔截建構函式、例項方法、靜態方法)
    如下圖所示

     

 

2.2 ShenYuAgent配置檔案

ShenYuAgent的可觀測性是可配置化的,ShenYu採用maven-assembly-plugin進行打包.  
配置檔案和打包輸出在shenyu-dist/shenyu-agent-dist/target/shenyu-agent,如下圖所示:

.
├── conf
│   ├── logback.xml
│   ├── logging-meta.yaml
│   ├── logging-point.yaml
│   ├── metrics-meta.yaml
│   ├── metrics-point.yaml
│   ├── shenyu-agent.yaml
│   └── tracing-point.yaml
├── plugins
│   ├── shenyu-agent-plugin-logging-api-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-logging-common-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-logging-rocketmq-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-metrics-api-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-metrics-common-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-metrics-prometheus-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-tracing-common-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-tracing-jaeger-2.4.3-SNAPSHOT.jar
│   ├── shenyu-agent-plugin-tracing-opentelemetry-2.4.3-SNAPSHOT.jar
│   └── shenyu-agent-plugin-tracing-zipkin-2.4.3-SNAPSHOT.jar
└── shenyu-agent.jar

其中shenyu-agent.yaml用於配置各個外掛的屬性,-point.yaml之類的檔案用於配置攔截點(包含類名、方法型別、方法名)和對應的處理器

3.響應式閘道器請求日誌採集

收集的日誌欄位

欄位名稱 含義 說明 備註
clientIp 客戶端IP    
timeLocal 請求時間字串,  格式:yyyy-MM-dd HH:mm:ss.SSS    
method 請求方法(不同rpc型別不一樣,http類的為:get,post等待,rpc類的為介面名稱)    
requestHeader 請求頭(json格式)    
responseHeader 響應頭(json格式)    
queryParams 請求查詢引數    
requestBody 請求Body(二進位制型別的body不會採集)    
requestUri 請求uri    
responseBody 響應body    
responseContentLength 響應body大小    
rpcType rpc型別    
status 響應碼    
upstreamIp 上游(提供服務的程式)IP    
upstreamResponseTime 上游(提供服務的程式)響應請求的耗時(毫秒ms)    
userAgent 請求的使用者代理    
host 請求的host    
module 請求的模組    
path 請求的路徑path    
traceId 請求的鏈路追蹤ID 需要開啟鏈路追蹤外掛  

樣例:

{
  "clientIp":"0:0:0:0:0:0:0:1%0",
  "timeLocal":"2022-03-09 21:39:04.373",
  "method":"POST",
  "requestHeader":"[{\"myvar\":[\"A\"]},{\"Content-Type\":[\"application/json\"]},{\"User-Agent\":[\"PostmanRuntime/7.29.0\"]},{\"Accept\":[\"*/*\"]},{\"Cache-Control\":[\"no-cache\"]},{\"Postman-Token\":[\"31a566b2-9682-47ca-b7f2-131fc6ad714b\"]},{\"Host\":[\"localhost:9195\"]},{\"Accept-Encoding\":[\"gzip, deflate, br\"]},{\"Connection\":[\"keep-alive\"]},{\"Content-Length\":[\"43\"]}]",
  "responseHeader":"[{\"transfer-encoding\":[\"chunked\"]},{\"Content-Length\":[\"44\"]},{\"Content-Type\":[\"application/json\"]}]",
  "queryParams":"a=1",
  "requestBody":"{\n    \"id\":666,\n    \"name\": \"hello world\"\n}",
  "requestUri":"http://localhost:9195/http/order/save?a=1",
  "responseBody":"{\"id\":\"666\",\"name\":\"hello world save order\"}",
  "responseContentLength":"44",
  "rpcType":"http",
  "status":200,
  "upstreamIp":"172.17.105.222",
  "upstreamResponseTime":19,
  "userAgent":"PostmanRuntime/7.29.0",
  "host":"localhost:9195",
  "module":null,
  "traceId":"097b826595c34f51",
  "path":"/http/order/save"
}

日誌收集對應的配置

日誌元資料配置在logging-meta.yaml檔案中。 

有時候我們可能不需要採集某些欄位,特別是大body之類的,或者某些欄位涉及敏感資訊,那麼可以關閉這些欄位的採集。logFieldSwitchConfig便是控制欄位是否採集。

有時候有些介面可能是不需要採集的,或者只需要採集小部分,可以針對介面或應用配置取樣率,取樣率配置0即為關閉取樣。


同時支援介面或應用級別配置不同的topic。logApiSwitchConfigMap欄位進行介面或應用級別配置。

3.1如何正確讀取請求body和響應body

ShenYu是基於WebFlux實現的響應式閘道器,要記錄請求的body和響應的body並不能簡單的讀取流,因為讀取一下就會改變位元組流的可讀取位置,  
會導致下一個訂閱者無法讀到body。

所以需要設計一個無副作用的方案來讀取body。 

我們需要在讀取Flux(針對請求body)或寫入Flux(針對響應body)時進行body記錄,在每個DataBuffer被讀或寫的時候,  

讀取一份只讀的位元組buffer寫入到我們的記憶體位元組流中,呼叫dataBuffer.asByteBuffer().asReadOnlyBuffer()方法讀取,這個方法無副作用。

3.1.1BodyWriter的設計

dataBuffer.asByteBuffer().asReadOnlyBuffer()會返回ByteBuffer,我們需要將這個寫入記憶體,並且需要保證body採集完之後,立即釋放記憶體,可觀測性需要儘可能的降低資源的消耗。

BodyWriter使用WritableByteChannel作為位元組流容器,當讀取完這個位元組通道中的位元組後就會關閉通道,釋放資源。

3.1.2LoggingServerHttpRequest的設計

SpringWebFlux提供了ServerHttpRequestDecorator類用於ServerHttpRequest將原本的ServerHttpRequest作為委託類。

重新了getBody()方法,在super.getBody()被訂閱的時候,在doOnNext方法中呼叫dataBuffer.asByteBuffer().asReadOnlyBuffer(),

將得到的ByteBuffer寫入BodyWriter中的WritableByteChannel中。

public class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
    private final ShenyuRequestLog logInfo;

    public LoggingServerHttpRequest(final ServerHttpRequest delegate, final ShenyuRequestLog logInfo) {
        super(delegate);
        this.logInfo = logInfo;
    }

    /**
     * get request body.
     *
     * @return Flux
     */
    @Override
    @NonNull
    public Flux<DataBuffer> getBody() {
        BodyWriter writer = new BodyWriter();
        // 在super.getBody()被訂閱的時候,在doOnNext方法中呼叫dataBuffer.asByteBuffer().asReadOnlyBuffer()
        // 將得到的ByteBuffer寫入BodyWriter中的WritableByteChannel中
        return super.getBody().doOnNext(dataBuffer -> writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer()))
                .doFinally(signal -> {
                    int size = writer.size();
                    String body = writer.output();
                    if (size > 0 && LogCollectUtils.isNotBinaryType(getHeaders())
                            && !LogCollectConfigUtils.isRequestBodyTooLarge(size)) {
                        logInfo.setRequestBody(body);
                    }
                });
    }
}

3.1.2LoggingServerHttpResponse的設計

同讀取ServerHttpRequest中的body一樣,response中的body讀取一樣設計成在寫入DataBuffer的時候,順便讀取一下,不產生副作用。

繼承ServerHttpResponseDecorator類,將原本的ServerHttpResponse委託給父類。重寫writeWith方法用於讀取響應body。

body被訂閱的時候,在doOnNext方法中呼叫dataBuffer.asByteBuffer().asReadOnlyBuffer()  ,將得到的ByteBuffer寫入BodyWriter中的WritableByteChannel中,最終在Flux中的doFinally中收集完日誌。

當我們讀取完response的body,即可將收集到訪問資訊收集起來。這個collect方法會在LoggingServerHttpResponse中的logResponse方法中呼叫。

/**
 * Collect logs and save them.
 */
public interface LogCollector extends AutoCloseable {

    /**
     * collect log.
     *
     * @param log access log
     */
    void collect(Object log);
}

3.2如何埋點

ShenYu的外掛設計是可以插拔的,而全域性外掛GlobalPlugin是必定存在的並且位於外掛鏈的第一位置,最先被執行。

一般我們可以攔截全域性外掛GlobalPlugin的execute方法,在基於鏈式執行中,我們一般會修改ServerWebExchange的request和response。

然後將ServerWebExchange作為引數傳給外掛鏈ShenyuPluginChain。核心程式碼為:

    return chain.execute(
            exchange.mutate().request(new LoggingServerHttpRequest(request, requestInfo))
            .response(new LoggingServerHttpResponse(exchange.getResponse(), requestInfo)
            ).build()
    );

有2種方式實現埋點:

  • 1.攔截閘道器全域性外掛GlobalPlugin#execute,修改第一個引數ServerWebExchange,這種方式需要配合ByteBuddy的註解@Morph來實現

  • 2.攔截建立ServerWebExchange的方法做後置處理,也就是攔截HttpWebHandlerAdapter#createExchange方法。 

ShenYu Agent Logging採用的第二種方式埋點。
會在response的body被訂閱的時候,完成一個完整請求日誌的收集。

4.非同步採集日誌並採用Oneway方式推送到RocketMQ叢集

當讀取完response的body後,就會將日誌收集。由於閘道器的請求量特別大,對效能要求很高,所以收集的操作不應該阻塞主流程。

同時也要求能快速消費日誌。設計上引入了一個JVM緩衝佇列(大小可配置),請求日誌會先放入緩衝佇列,而不是直接傳送到訊息佇列叢集,  如果緩衝佇列沒有足夠空間,則會直接丟棄日誌(當閘道器處理請求速度超過非同步消費請求日誌的速度的極端情況會出現),不阻塞。 

同時會啟動一個消費執行緒,迴圈從快取佇列裡面批量獲取日誌,解析出日誌對應的topic,同時將日誌序列化(採取JSON序列化),這裡考慮到對極端情況下日誌丟失可以容忍,採取了效率最高的Oneway方式推送訊息到RocketMQ叢集,這種方式效能最高,消耗資源最低,但丟資料的可能性  相比同步和非同步相對高一點。如果生產上是追求訊息零丟失的,則不建議採用這種方式。

採集日誌設計如下: 

 

日誌收集時序圖如下: 

 

日誌視覺化

日誌寫入到rocketmq後,需要視覺化展示出來實現日誌的可觀測性。日誌資料可以儲存到clickhouse,ES,loki等等,視覺化可以採用開源的grafana loki或kibana.
一個可供參考的grafana loki視覺化介面:

 

參與Apache ShenYu

Apache ShenYu (incubating) 於2021年5月進入Apache基金會孵化器。它是一款多協議,高可用,易擴充套件的雲原生API閘道器。
官網地址:http://shenyu.apache.org
Github地址:http://github.com/apache/incubator-shenyu
Gitee地址:http://gitee.com/Apache-ShenYu/incubator-shenyu

  • 傳送訂閱郵件。用自己的郵箱向[email protected]傳送一封郵件,主題和內容任意。

  • 接收確認郵件並回復。完成步驟1後,您將收到一封來自[email protected]的確認郵件(如未收到,請確認該郵件是否已被攔截,或已經被自動歸入訂閱郵件、垃圾郵件、推廣郵件等資料夾)。直接回復該郵件,或點選郵件裡的連結快捷回覆即可,主題和內容任意。

  • 接收歡迎郵件。完成以上步驟後,您會收到一封主題為WELCOME to [email protected]的歡迎郵件,至此您已成功訂閱Apache ShenYu的郵件列表。