萬字帶你深入阿里開源的Canal工作原理

語言: CN / TW / HK


前言

         上篇文章給大家講解了如何安裝一個Canal,以及講解了一部分的原理,今天我們就來深度聊一聊Canal的工作流程,以及他是怎麼工作的,以及架構師怎樣的。         

5146e36f2271f57b8d1a88a3854a57db.jpg       

            首先我們深度瞭解Canal時必須深度瞭解了一下MySQL主從複製原理。

一、MySQL主從複製

MySQL主備複製原理cfad7ab1b237f6f15e1bc01deef166b9.jpg

  • MySQL master 將資料變更寫入二進位制日誌( binary log, 其中記錄叫做二進位制日誌事件  log events,可以通過 show binlog events 進行檢視)
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)
  • MySQL slave 重放 relay log 中事件,將資料變更反映它自己的資料,以此來達到資料一致。

MySQL的binLog         

       它記錄了所有的DDL和DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間。主要用來備份和資料同步。binlog 有三種:STATEMENTROWMIXED

  • STATEMENT 記錄的是執行的sql語句
  • ROW 記錄的是真實的行資料記錄
  • MIXED 記錄的是1+2,優先按照1的模式記錄

名詞解釋什麼是中繼日誌

         從伺服器I/O執行緒將主伺服器的二進位制日誌讀取過來記錄到從伺服器本地檔案,然後從伺服器SQL執行緒會讀取relay-log日誌的內容並應用到從伺服器,從而使從伺服器和主伺服器的資料保持一致

二、Canal架構

f88b174075f331a6330aed09dc5bd996.jpgCanal架構

  • server 代表一個 canal 執行例項,對應於一個 jvm
  • instance 對應於一個數據佇列 (1個 canal server 對應 1..n 個 instance )
  • instance 下的子模組
    • eventParser: 資料來源接入,模擬 slave 協議和 master 進行互動,協議解析
    • eventSink: Parser 和 Store 連結器,進行資料過濾,加工,分發的工作
    • eventStore: 資料儲存
    • metaManager: 增量訂閱 & 消費資訊管理器

         EventParser在向MySQL傳送dump命令之前會先從Log Position中獲取上次解析成功的位置(如果是第一次啟動,則獲取初始指定位置或者當前資料段binlog位點)。mysql接受到dump命令後,由EventParser從mysql上pull binlog資料進行解析並傳遞給EventSink(傳遞給EventSink模組進行資料儲存,是一個阻塞操作,直到儲存成功 ),傳送成功之後更新Log Position。流程圖如下:2d40eb9b668f9d159050d02cd9193ceb.jpg

  • EventSink起到一個類似channel的功能,可以對資料進行過濾、分發/路由(1:n)、歸併(n:1)和加工。EventSink是連線EventParser和EventStore的橋樑。
  • EventStore實現模式是記憶體模式,記憶體結構為環形佇列,由三個指標(Put、Get和Ack)標識資料儲存和讀取的位置。
  • MetaManager是增量訂閱&消費資訊管理器,增量訂閱和消費之間的協議包括get/ack/rollback,分別為:
    • Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的物件為Message,包含的內容為:batch id[唯一標識]和entries[具體的資料物件]
    • void rollback(long batchId),顧名思義,回滾上次的get請求,重新獲取資料。基於get獲取的batchId進行提交,避免誤操作
    • void ack(long batchId),顧名思議,確認已經消費成功,通知server刪除資料。基於get獲取的batchId進行提交,避免誤操作

三、server/client互動協議

         canal client與canal server之間是C/S模式的通訊,客戶端採用NIO,服務端採用Netty。canal server啟動後,如果沒有canal client,那麼canal server不會去mysql拉取binlog。即Canal客戶端主動發起拉取請求,服務端才會模擬一個MySQL Slave節點去主節點拉取binlog。通常Canal客戶端是一個死迴圈,這樣客戶端一直呼叫get方法,服務端也就會一直拉取binlog

BIO、NIO、AIO的區別

IO的方式通常分為幾種,同步阻塞的BIO同步非阻塞的NIO非同步非阻塞的AIO

同步阻塞IO:在此種方式下,使用者程序在發起一個IO操作以後,必須等待IO操作的完成,只有當真正完成了IO操作以後,使用者程序才能執行。JAVA傳統的IO模型屬於此種方式!0635253effa9b4108617291548313251.jpg同步非阻塞IO:在此種方式下,使用者程序發起一個IO操作以後邊可返回做其它事情,但是使用者程序需要時不時的詢問IO操作是否就緒,這就要求使用者程序不停的去詢問,從而引入不必要的CPU資源浪費。其中目前JAVA的NIO就屬於同步非阻塞IO。89617b76774bdf84895f808ecfdb13ee.jpg

非同步阻塞IO:此種方式下是指應用發起一個IO操作以後,不等待核心IO操作的完成,等核心完成IO操作以後會通知應用程式,這其實就是同步和非同步最關鍵的區別,同步必須等待或者主動的去詢問IO是否完成,那麼為什麼說是阻塞的呢?因為此時是通過select系統呼叫來完成的,而select函式本身的實現方式是阻塞的,而採用select函式有個好處就是它可以同時監聽多個檔案控制代碼,從而提高系統的併發性!4926ed2a319301bd506179f5b597e357.jpg

非同步非阻塞IO:在此種模式下,使用者程序只需要發起一個IO操作然後立即返回,等IO操作真正的完成以後,應用程式會得到IO操作完成的通知,此時使用者程序只需要對資料進行處理就好了,不需要進行實際的IO讀寫操作,因為真正的IO讀取或者寫入操作已經由核心完成了。目前Java中還沒有支援此種IO模型。

e7457bea2b963b0b6f79214139561830.jpg         canal client與canal server之間屬於增量訂閱/消費,流程圖如下:(其中C端是canal client,S端是canal server)fa861ac334ace43fc07fdfa64a543654.jpgcanal client呼叫connect()方法時,傳送的資料包(PacketType)型別為:

  1. handshake
  2. ClientAuthentication

canal client呼叫subscribe()方法,型別為[subscription]。對應服務端採用netty處理RPC請求(CanalServerWithNetty):

public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {
    public void start() {
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipelines = Channels.pipeline();
                pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
                // 處理客戶端的HANDSHAKE請求
                pipelines.addLast(HandshakeInitializationHandler.class.getName(),
                    new HandshakeInitializationHandler(childGroups))
;
                // 處理客戶端的CLIENTAUTHENTICATION請求
                pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                    new ClientAuthenticationHandler(embeddedServer))
;

                // 處理客戶端的會話請求,包括SUBSCRIPTION,GET等
                SessionHandler sessionHandler = new SessionHandler(embeddedServer);
                pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
                return pipelines;
            }
        });
    }
}


















ClientAuthenticationHandler處理鑑權後,會移除HandshakeInitializationHandler和ClientAuthenticationHandler。最重要的是會話處理器SessionHandler

以client傳送GET,server從mysql得到binlog後,返回MESSAGES給client為例,說明client和server的rpc互動過程:

SimpleCanalConnector傳送GET請求,並讀取響應結果的流程:

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
    waitClientRunning();
    int size = (batchSize <= 0) ? 1000 : batchSize;
    long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制
    if (unit == null) unit = TimeUnit.MILLISECONDS;  //預設是毫秒

    // client傳送GET請求
    writeWithHeader(Packet.newBuilder()
        .setType(PacketType.GET)
        .setBody(Get.newBuilder()
            .setAutoAck(false)
            .setDestination(clientIdentity.getDestination())
            .setClientId(String.valueOf(clientIdentity.getClientId()))
            .setFetchSize(size)
            .setTimeout(time)
            .setUnit(unit.ordinal())
            .build()
            .toByteString())
        .build()
        .toByteArray());
    // client獲取GET結果    
    return receiveMessages();
}

private Message receiveMessages() throws IOException {
    // 讀取server傳送的資料包
    Packet p = Packet.parseFrom(readNextPacket());
    switch (p.getType()) {
        case MESSAGES: {
            Messages messages = Messages.parseFrom(p.getBody());
            Message result = new Message(messages.getBatchId());
            for (ByteString byteString : messages.getMessagesList()) {
                result.addEntry(Entry.parseFrom(byteString));
            }
            return result;
        }
    }
}





































服務端SessionHandler處理客戶端傳送的GET請求流程:

case GET:
    // 讀取客戶端傳送的資料包,封裝為Get物件
    Get get = CanalPacket.Get.parseFrom(packet.getBody());
    // destination表示canal instance
    if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) {
        clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()));
        Message message = null;
        if (get.getTimeout() == -1) {// 是否是初始值
            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
        } else {
            TimeUnit unit = convertTimeUnit(get.getUnit());
            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
        }
        // 設定返回給客戶端的資料包型別為MESSAGES   
        Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
        packetBuilder.setType(PacketType.MESSAGES);
        // 構造Message
        Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
        messageBuilder.setBatchId(message.getId());
        if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {
            for (Entry entry : message.getEntries()) {
                messageBuilder.addMessages(entry.toByteString());
            }
        }
        packetBuilder.setBody(messageBuilder.build().toByteString());
        // 輸出資料,返回給客戶端
        NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);
    }



























具體的網路協議格式,可參見:CanalProtocol.proto

get/ack/rollback協議介紹:

  • Message getWithoutAck(int batchSize)
    • batch id 唯一標識
    • entries 具體的資料物件,對應的資料物件格式:EntryProtocol.proto
    • 允許指定batchSize,一次可以獲取多條,每次返回的物件為Message,包含的內容為:
  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit)
    • 拿夠batchSize條記錄或者超過timeout時間
    • timeout=0,阻塞等到足夠的batchSize
    • 相比於getWithoutAck(int batchSize),允許設定獲取資料的timeout超時時間
  • void rollback(long batchId)
    • 回滾上次的get請求,重新獲取資料。基於get獲取的batchId進行提交,避免誤操作
  • void ack(long batchId)
    • 確認已經消費成功,通知server刪除資料。基於get獲取的batchId進行提交,避免誤操作

EntryProtocol.protod對應的canal訊息結構如下:

Entry  
    Header  
        logfileName [binlog檔名]  
        logfileOffset [binlog position]  
        executeTime [binlog裡記錄變更發生的時間戳,精確到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete型別]  
    entryType   [事務頭BEGIN/事務尾END/資料ROWDATA]  
    storeValue  [byte資料,可展開,對應的型別為RowChange]  
      
RowChange  
    isDdl       [是否是ddl變更操作,比如create table/drop table]  
    sql         [具體的ddl sql]  
    rowDatas    [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]  
        beforeColumns [Column型別的陣列,變更前的資料欄位]  
        afterColumns [Column型別的陣列,變更後的資料欄位]  
          
Column   
    index         
    sqlType     [jdbc type]  
    name        [column name]  
    isKey       [是否為主鍵]  
    updated     [是否發生過變更]  
    isNull      [值是否為null]  
    value       [具體的內容,注意為string文字]

























SessionHandler中服務端處理客戶端的其他型別請求,都會呼叫CanalServerWithEmbedded的相關方法:

case SUBSCRIPTION:
        Sub sub = Sub.parseFrom(packet.getBody());
        embeddedServer.subscribe(clientIdentity);
case GET:
        Get get = CanalPacket.Get.parseFrom(packet.getBody());
        message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
case CLIENTACK:
        ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
        embeddedServer.ack(clientIdentity, ack.getBatchId());
case CLIENTROLLBACK:
        ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
        embeddedServer.rollback(clientIdentity);// 回滾所有批次











所以真正的處理邏輯在CanalServerWithEmbedded中,下面重點來了。。。

3.1 CanalServerWithEmbedded

         CanalServer包含多個Instance,它的成員變數canalInstances記錄了instance名稱與例項的對映關係。        

        因為是一個Map,所以同一個Server不允許出現相同instance名稱(本例中例項名稱為example),比如不能同時有兩個example在一個server上。但是允許一個Server上有example1和example2。

注意:CanalServer中最重要的是CanalServerWithEmbedded,而CanalServerWithEmbedded中最重要的是CanalInstance

         下圖表示一個server配置了兩個Canal例項(instance),每個Client連線一個Instance。每個Canal例項模擬為一個MySQL的slave,所以每個Instance的slaveId必須不一樣。比如圖中兩個Instance的id分別是1234和1235,它們都會拉取MySQL主節點的binlog。6a939946f66c9b8f252aaeab169e8061.jpg這裡每個Canal Client都對應一個Instance,每個Client在啟動時, 都會指定一個Destination,這個Destination就表示Instance的名稱。所以CanalServerWithEmbedded處理各種請求時的引數都有ClientIdentity, 從ClientIdentity中獲取destination,就可以獲取出對應的CanalInstance。

理解下各個元件的對應關係:

  • Canal Client通過destination找出Canal Server中對應的Canal Instance。
  • 一個Canal Server可以配置多個Canal Instances。

下面以CanalServerWithEmbedded的訂閱方法為例:

  1. 根據客戶端標識獲取CanalInstance
  2. 向CanalInstance的元資料管理器訂閱當前客戶端
  3. 從元資料管理中獲取客戶端的遊標
  4. 通知CanalInstance訂閱關係發生變化

注意:提供訂閱方法的作用是:MySQL新增了一張表,客戶端原先沒有同步這張表,現在需要同步,所以需要重新訂閱。

public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
   // ClientIdentity表示Canal Client客戶端,從中可以獲取出客戶端指定連線的Destination
   // 由於CanalServerWithEmbedded記錄了每個Destination對應的Instance,可以獲取客戶端對應的Instance
   CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
   if (!canalInstance.getMetaManager().isStart()) {
       canalInstance.getMetaManager().start(); // 啟動Instance的元資料管理器
   }
   canalInstance.getMetaManager().subscribe(clientIdentity); // 執行一下meta訂閱
   Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
   if (position == null) {
       position = canalInstance.getEventStore().getFirstPosition();// 獲取一下store中的第一條
       if (position != null) {
           canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
       }
   }
   // 通知下訂閱關係變化
   canalInstance.subscribeChange(clientIdentity);
}

















每個CanalInstance中包括了四個元件:EventParser、EventSink、EventStore、MetaManager

服務端主要的處理方法包括get/ack/rollback,這三個方法都會用到Instance上面的幾個內部元件,主要還是EventStore和MetaManager:

在這之前,要先理解EventStore的含義,EventStore是一個RingBuffer,有三個指標:Put、Get、Ack

  • Put: Canal Server從MySQL拉取到資料後,放到記憶體中,Put增加
  • Get: 消費者(Canal Client)從記憶體中消費資料,Get增加
  • Ack: 消費者消費完成,Ack增加。並且會刪除Put中已經被Ack的資料

這三個操作與Instance元件的關係如下:4ea4f9cb656b208c3e3fdb83ff236ad6.jpg客戶端通過canal server獲取mysql binlog有幾種方式(get方法和getWithoutAck):

  • 如果timeout為null,則採用tryGet方式,即時獲取
  • 如果timeout不為null
  1. timeout為0,則採用get阻塞方式,獲取資料,不設定超時,直到有足夠的batchSize資料才返回
  2. timeout不為0,則採用get+timeout方式,獲取資料,超時還沒有batchSize足夠的資料,有多少返回多少
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
                                TimeUnit unit)
 
{
    if (timeout == null) {
        return eventStore.tryGet(start, batchSize); // 即時獲取
    } else if (timeout <= 0){
        return eventStore.get(start, batchSize); // 阻塞獲取
    } else {
        return eventStore.get(start, batchSize, timeout, unit); // 非同步獲取
    }
}








注意:EventStore的實現採用了類似Disruptor的RingBuffer環形緩衝區。RingBuffer的實現類是MemoryEventStoreWithBuffer

get方法和getWithoutAck方法的區別是:

  • get方法會立即呼叫ack
  • getWithoutAck方法不會呼叫ack

3.2  EventStore

以10條資料為例,初始時current=-1,第一個元素起始next=0,end=9,迴圈[0,9]所有元素。List元素為(A,B,C,D,E,F,G,H,I,J)3b635e7919d71ac740e505db3eda0947.jpg第一批10個元素put完成後,putSequence設定為end=9。假設第二批又Put了5個元素:(K,L,M,N,O)

current=9,起始next=9+1=10,end=9+5=14,在Put完成後,putSequence設定為end=14。710483194c3d3458ae41a1a83bb6bb5f.jpg         這裡假設環形緩衝區的最大大小為15個(原始碼中是16MB),那麼上面兩批一共產生了15個元素,剛好填滿了環形緩衝區。如果又有Put事件進來,由於環形緩衝區已經滿了,沒有可用的slot,則Put操作會被阻塞,直到被消費掉。

下面是Put填充環形緩衝區的程式碼,檢查可用slot(checkFreeSlotAt方法)在幾個put方法中。

public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {
    private static final long INIT_SQEUENCE = -1;
    private int               bufferSize    = 16 * 1024;
    private int               bufferMemUnit = 1024;                         // memsize的單位,預設為1kb大小
    private int               indexMask;
    private Event[]           entries;

    // 記錄下put/get/ack操作的三個下標
    private AtomicLong        putSequence   = new AtomicLong(INIT_SQEUENCE); // 代表當前put操作最後一次寫操作發生的位置
    private AtomicLong        getSequence   = new AtomicLong(INIT_SQEUENCE); // 代表當前get操作讀取的最後一條的位置
    private AtomicLong        ackSequence   = new AtomicLong(INIT_SQEUENCE); // 代表當前ack操作的最後一條的位置

    // 啟動EventStore時,建立指定大小的緩衝區,Event陣列的大小是16*1024
    // 也就是說算個數的話,陣列可以容納16000個事件。算記憶體的話,大小為16MB
    public void start() throws CanalStoreException {
        super.start();
        indexMask = bufferSize - 1;
        entries = new Event[bufferSize];
    }

    // EventParser解析後,會放入記憶體中(Event陣列,緩衝區)
    private void doPut(List<Event> data) {
        long current = putSequence.get(); // 取得當前的位置,初始時為-1,第一個元素為-1+1=0
        long end = current + data.size(); // 最末尾的位置,假設Put了10條資料,end=-1+10=9
        // 先寫資料,再更新對應的cursor,併發度高的情況,putSequence會被get請求可見,拿出了ringbuffer中的老的Entry值
        for (long next = current + 1; next <= end; next++) {
            entries[getIndex(next)] = data.get((int) (next - current - 1));
        }
        putSequence.set(end);
    } 
}






























         Put是生產資料,Get是消費資料,Get一定不會超過Put。比如Put了10條資料,Get最多隻能獲取到10條資料。但有時候為了保證Get處理的速度,Put和Get並不會相等。可以把Put看做是生產者,Get看做是消費者。生產者速度可以很快,消費者則可以慢慢地消費。比如Put了1000條,而Get我們只需要每次處理10條資料。

         仍然以前面的示例來說明Get的流程,初始時current=-1,假設Put了兩批資料一共15條,maxAbleSequence=14,而Get的BatchSize假設為10。初始時next=current=-1,end=-1。通過startPosition,會設定next=0。最後end又被賦值為9,即迴圈緩衝區[0,9]一共10個元素。

private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
    LogPosition startPosition = (LogPosition) start;

    long current = getSequence.get();
    long maxAbleSequence = putSequence.get();
    long next = current;
    long end = current;
    // 如果startPosition為null,說明是第一次,預設+1處理
    if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次訂閱之後,需要包含一下start位置,防止丟失第一條記錄
        next = next + 1;
    }

    end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;
    // 提取資料並返回
    for (; next <= end; next++) {
        Event event = entries[getIndex(next)];
        if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) {
            // 如果是ddl隔離,直接返回
            if (entrys.size() == 0) {
                entrys.add(event);// 如果沒有DML事件,加入當前的DDL事件
                end = next; // 更新end為當前
            } else {
                // 如果之前已經有DML事件,直接返回了,因為不包含當前next這記錄,需要回退一個位置
                end = next - 1// next-1一定大於current,不需要判斷
            }
            break;
        } else {
            entrys.add(event);
        }
    }
    // 處理PositionRange,然後設定getSequence為end
    getSequence.compareAndSet(current, end)
}
































ack操作的上限是Get,假設Put了15條資料,Get了10條資料,最多也只能Ack10條資料。Ack的目的是清空緩衝區中已經被Get過的資料

public void ack(Position position) throws CanalStoreException {
    cleanUntil(position);
}

public void cleanUntil(Position position) throws CanalStoreException {
    long sequence = ackSequence.get();
    long maxSequence = getSequence.get();

    boolean hasMatch = false;
    long memsize = 0;
    for (long next = sequence + 1; next <= maxSequence; next++) {
        Event event = entries[getIndex(next)];
        memsize += calculateSize(event);
        boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);
        if (match) {// 找到對應的position,更新ack seq
            hasMatch = true;

            if (batchMode.isMemSize()) {
                ackMemSize.addAndGet(memsize);
                // 嘗試清空buffer中的記憶體,將ack之前的記憶體全部釋放掉
                for (long index = sequence + 1; index < next; index++) {
                    entries[getIndex(index)] = null;// 設定為null
                }
            }

            ackSequence.compareAndSet(sequence, next)
        }
    }
}




























rollback回滾方法的實現則比較簡單,將getSequence回退到ack位置。

public void rollback() throws CanalStoreException {
    getSequence.set(ackSequence.get());
    getMemSize.set(ackMemSize.get());
}



下圖展示了RingBuffer的幾個操作示例:a6567f63ec08b92ce0176d440de10fbd.jpg

3.3 EventParser WorkFlow

EventStore負責儲存解析後的Binlog事件,而解析動作負責拉取Binlog,它的流程比較複雜。需要和MetaManager進行互動。比如要記錄每次拉取的Position,這樣下一次就可以從上一次的最後一個位置繼續拉取。所以MetaManager應該是有狀態的。

EventParser的流程如下:

  1. Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當前資料庫的binlog位點)
  2. Connection建立連結,傳送BINLOG_DUMP指令
  3. Mysql開始推送Binaly Log
  4. 接收到的Binaly Log的通過Binlog parser進行協議解析,補充一些特定資訊
  5. 傳遞給EventSink模組進行資料儲存,是一個阻塞操作,直到儲存成功
  6. 儲存成功後,定時記錄Binaly Log位置0097d5bbc0d423e400417e7580364b10.jpg

總結

          上述我們講了一些架構和一些互動模式,和比較多原理,做為一名優秀的程式設計師不能只單純的會使用,而是多去了解他的思想和為什麼這麼寫,這樣你的程式碼能力才一天比一天強。我在這裡為大家提供大資料的資源需要的朋友可以去下面GitHub去下載,信自己,努力和汗水總會能得到回報的。我是大資料老哥,我們下期見~~~

資源獲取 獲取Flink面試題,Spark面試題,程式設計師必備軟體,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去 

GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData 

Gitee 自行下載  https://gitee.com/li_hey_hey/dashboard/projects 

實時數倉程式碼:https://github.com/lhh2002/Real_Time_Data_WareHouse



分享到: