「從零單排canal 06」 instance模組原始碼解析

語言: CN / TW / HK

精選30+雲產品,助力企業輕鬆上雲!>>>

基於1.1.5-alpha版本,具體原始碼筆記可以參考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

關於instance的定義,可以參考前面的幾篇原始碼解析,介紹的非常清楚。

instance模組比較簡單,我們重點了解以下幾個問題

  • instance配置模式有哪幾種,如何根據配置建立instance?
  • 遠端配置如何覆蓋本地配置的?
  • instance例項內部有哪些元件?

1.基本結構


instance模組下面也分為三個子模組,core、manager、spring。

其中,core是instance的核心邏輯 。

而manager和spring只是兩種不同的instance配置讀取方式,manager通過http請求讀取admin的配置,spring通過配置檔案的方式讀取。

主要控制邏輯我們在deployer模組原始碼分析中提到過,就是在CanalController類 的instanceGenerator,配置引數是canal.instance.global.mode

  • 根據destination建立config
  • 如果canal.instance.global.mode = manager,就使用PlainCanalInstanceGenerator
  • 如果canal.instance.global.mode = spring,就使用SpringCanalInstanceGenerator

原始碼如下


2.core子模組


程式碼不多,就兩個介面,兩個類。

2.1 CanalInstanceGenerator介面

這個介面只有一個方法


具體實現就是開頭提到的兩種,PlainCanalInstanceGenerator和SpringCanalInstanceGenerator,分別在manager子模組和spring子模組中實現。

具體選擇就是開頭的那個canalController裡面根據canal.instance.global.mode來選擇。


2.2 CanalInstance介面

先看一張官方文件的圖,這個前面的文章已經分析過了。


server代表一個canal-server執行例項,對應於一個jvm。server內部可以有多個instance。

Instance內部有4個主要元件:

  • eventParser :資料來源接入,模擬slave協議和master進行互動,協議解析
  • eventSink :Parser和Store聯結器,進行資料過濾,加工,分發的工作
  • eventStore :資料儲存
  • metaManager:增量訂閱&消費資訊管理器

在這個介面中,就定義了獲取4個元件的方法,以及新版本增加的mqProducer的配置資訊(mqProducer在server模組解析中介紹過了,可以回頭去看看)


我們簡單看下4個元件介面的各個實現類。

CanalEventParser介面實現類(paser模組):

  • MysqlEventParser:偽裝成單個mysql例項的slave解析binglog日誌
  • GroupEventParser:偽裝成多個mysql例項的slave解析binglog日誌。內部維護了多個CanalEventParser,組合多個EventParser進行合併處理,group只是作為一個delegate處理。主要應用場景是分庫分表:比如一個大表拆分了4個庫,位於不同的mysql例項上,正常情況下,我們需要配置四個CanalInstance。對應的,業務上要消費資料時,需要啟動4個客戶端,分別連結4個instance例項。為了方便業務使用,此時我們可以讓CanalInstance引用一個GroupEventParser,由GroupEventParser內部維護4個MysqlEventParser去4個不同的mysql例項去拉取binlog,最終合併到一起。此時業務只需要啟動1個客戶端,連結這個CanalInstance即可
  • LocalBinlogEventParser:解析本地的mysql binlog。例如將mysql的binlog檔案拷貝到canal的機器上進行解析。
  • RdsLocalBinlogEventParser:基於阿里雲rds的binlog備份檔案複製,下載到本地後進行本地的binlog解析。

CanalEventSink介面實現類(sink模組):

  • EntryEventSink:普通的單個parser的sink操作,進行資料過濾,加工,分發

  • GroupEventSink:用於分庫分表的場景,對應GroupEventParser的資料解析,然後實現基於歸併排序的sink處理

CanalEventStore介面實現類(store模組):

  • MemoryEventStoreWithBuffer:基於記憶體實現儲存store

CanalMetaManager(meta模組):

  • ZooKeeperMetaManager:將元資料存儲存到zk中

  • MemoryMetaManager:將元資料儲存到記憶體中

  • MixedMetaManager:組合memory + zookeeper的使用模式

  • PeriodMixedMetaManager:基於定時重新整理的策略的mixed實現

  • FileMixedMetaManager:先寫記憶體,然後定時重新整理資料到File

關於這些實現的具體細節,我們在相應模組的原始碼分析時,進行講解。目前只需要知道,一些元件有多種實現,因此組合工作方式有多種。

2.3 AbstractCanalInstance類

AbstractCanalInstance是canalInstance的抽象類,維護了相關元件的引用。

同時,也實現了canalInstance介面的subscribeChange方法。

這個抽象類有兩個實現,CanalInstanceWithManager 和 CanalInstanceWithSpring。

AbstractCanalInstance的初始化過程都是在實現類中完成的。

如果選擇admin控制模式,那就是在CanalInstanceWithManager中完成;如果是spring模式,就在CanalInstanceWithSpring中完成。 

這裡有個小發現:

仔細看下實際程式碼呼叫我們發現,CanalInstanceWithManager是給ManagerCanalInstanceGenerator使用的,而這個generator實際上沒有被使用到。如果使用admin模式,本文開頭我們就看到了,使用了PlainCanalInstanceGenerator。PlainCanalInstanceGenerator裡面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然後替換系統變數,然後再從spring的beanfactory中構建具體的例項。


2.3.1 subscribeChange() 方法

AbstractCanalInstance類實現了CanalInstance介面的subscribeChange方法。

我們看到,如果訂閱關係發生變化,就做一些操作,這裡看的話,主要就是更新了一下filter。

filter規定了需要訂閱哪些庫,哪些表。


2.3.2 start() 方法

啟動沒什麼特別的邏輯,就是按照順序依次啟動各個元件。

順序為 metaManager -> alarmHandler -> eventStore -> eventSink -> eventParser。

啟動順序主要跟依賴關係有關,元資訊相關的管理跟所有都有關,所以metaManager最先啟動,其他的按照彼此之間的關係一一啟動。


這裡我們發現,在啟動eventParser的時候做了特殊處理,分別是beforeStartEventParser 和 afterStartEventParser。我們在2.3.4專門講一下。


2.3.3 stop()方法

stop也沒什麼特殊的,就是依次關閉各個元件。

關閉的順序就是start的逆過程。

這裡就不貼程式碼了。


2.3.4 eventParserr的特殊處理

在start和stop方法中的eventParser前後都有特殊的處理,start的beforeStartEventParser 和 afterStartEventParser,Stop的beforeStopEventParser 和 afterStopEventParser。

這個其實跟eventParser的設計有關。

EventParser 設計

  • 每個EventParser都會關聯兩個內部元件

  • CanalLogPositionManager : 記錄binlog 最後一次解析成功位置資訊,主要是描述下一次canal啟動的位點 CanalHAController: 控制 EventParser 的連結主機管理,判斷當前該連結哪個mysql資料庫

所以這兩個beforexxx、afterxxxx方法做的主要是CanalLogPositionManager和CanalHAController的啟停工作。


2.3.5 AbstractCanalInstance類 總結

可以看到AbstractCanalInstance除了負責啟動和停止其內部元件,就沒有其他工作了。

eventParser在AbstractCanalInstance中啟動後,就會自行開啟多執行緒任務dump資料,通過eventSink投遞給eventStore。

而對eventStore的操作邏輯,實際上都是在CanalServerWithEmbedded中完成的,我們可以回顧一下CanalServerWithEmbedded中 getWithoutAck( ) 的相關邏輯。

包括:

  • 根據clientIdentity的destination獲取對應的instance

  • 獲取到流式資料中的最後一批獲取的位置positionRanges(跟batchId有關聯,就是上面那個map裡面的)

  • 從cananlEventStore裡面獲取binlog,轉化為event。一般是從最後的一個batchId位置開始,如果之前沒有batchId,那麼就從cursor記錄的消費位點開始;如果cursor為空,那隻能從eventStore的第一條訊息開始。(這裡幾個位置關係再想一想,跟ack有關,畫個圖)

  • event轉化為entry,並生成新的batchId,組合成message返回給客戶端

所以,其實這裡只是簡單的啟動和停止,元件的互動邏輯是在CanalServerWithEmbedded中get出instance的各個元件來進行實現的。


3.spring模組

前面提到了,PlainCanalInstanceGenerator裡面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然後替換系統變數,然後再從spring的beanfactory中構建具體的例項。

所以我們重點關注spring子模組的配置方式即可。

就下面四個類



3.1 CanalInstanceWithSpring類

基於spring容器啟動canal例項,方便獨立於manager啟動。

繼承了AbstractCanalInstance,其實就是一系列元件的setter方法,就不貼原始碼了。

具體配置是基於spring的xml來做的.

當我們配置載入方式為spring時,建立的CanalInstance例項型別都是CanalInstanceWithSpring。canal將會尋找本地的spring配置檔案來建立instance例項。canal預設提供了以下幾種spring配置檔案:

  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

四個配置檔案中,對CanalInstanceWithSpring都採用了同樣的配置方式:


當然,具體每個元件的ref在不同配置檔案中有所不同。

最主要的就是metaManager 和eventParser 這兩個配置有所不同,可能在記憶體、檔案或zk進行儲存。

eventStore 、和eventSink 定義都是相同的,eventStore目前的開源版本中eventStore只有一種基於記憶體的實現,eventSink其作用是eventParser和eventStore的聯結器,進行資料過濾,加工,分發的工作。不涉及儲存,也就沒有必要針對記憶體、file、或者zk進行區分。


3.2 SpringCanalInstanceGenerator類

這個是具體建立instance的邏輯。


順便看下PlainCanalInstanceGenerator裡面的實現,就是多了從遠端拉取配置,然後用PropertyPlaceholderConfigurer進行了變數替換,然後還是用beanFactory來獲取例項。

com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer繼承了org.springframework.beans.factory.config.PropertyPlaceholderConfigurer,設定動態properties,替換掉本地properties。


4.總結

其實這個模組的東西比較少,沒有什麼特別複雜的邏輯。

我們來回顧下開頭的幾個問題

  • instance配置模式有哪幾種,如何根據配置建立instance?

主要有基於spring和基於遠端配置兩種方式,前者的實現在,後者的實現在PlainCanalInstanceGenerator

  • 遠端配置如何覆蓋本地配置的?

PlainCanalInstanceGenerator中使用了spring的PropertyPlaceholderConfigurer來覆蓋配置

  • instance例項內部有哪些元件?

包括了parser、sink、store、metamanager等元件,但是隻負責了啟動和停止邏輯,具體互動邏輯還是在CanalServerWithEmbedded中實現的。



原創:阿丸筆記(微信公眾號:aone_note),歡迎 分享,轉載請保留出處。

掃描下方二維碼可以關注我哦~

                                                                              覺得不錯,就點個 再看 吧👇



本文分享自微信公眾號 - 阿丸筆記(aone_note)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。

分享到: