Mycat 作為代理服務端的小知識點

語言: CN / TW / HK

一 . 前言

Mycat 以一個 Server 的形式對外暴露服務 , 其主要配置類為 Server.xml , 這一篇主要針對一些小細節進行學習 , 主要包括 :

  • Server.xml 的載入方式
  • Server 中的配置和作用
  • 在連線請求中 Server 的流轉

二 . Server.xml 的配置

2.1 Server.xml 配置詳情

Server.xml 主要在 XMLSchemaLoader 中進行載入 , Server.xml 檔案中主要分為以下幾個部分

User 管理

<user name="user">
    <property name="password">user</property>
    // 使用者可訪問的 schemas , 中間可通過逗號分隔
    <property name="schemas">db001</property>
    <property name="readOnly">true</property>
    <property name="defaultSchema">db001</property>
</user>
複製程式碼

配置管理

配置檔案直接從原始碼中拉下來的 , 也是非常清楚的 , 有興趣的可以拉原始碼看一下

<system>
       <property name="nonePasswordLogin">0</property> <!-- 0為需要密碼登陸、1為不需要密碼登陸 ,預設為0,設定為1則需要指定預設賬戶-->
       <property name="ignoreUnknownCommand">0</property><!-- 0遇上沒有實現的報文(Unknown command:),就會報錯、1為忽略該報文,返回ok報文。
在某些mysql客戶端存在客戶端已經登入的時候還會繼續傳送登入報文,mycat會報錯,該設定可以繞過這個錯誤-->
       <property name="useHandshakeV10">1</property>
       <property name="removeGraveAccent">1</property>
       <property name="useSqlStat">0</property>  <!-- 1為開啟實時統計、0為關閉 -->
       <property name="useGlobleTableCheck">0</property>  <!-- 1為開啟全加班一致性檢測、0為關閉 -->
       <property name="sqlExecuteTimeout">300</property>  <!-- SQL 執行超時 單位:秒-->
       <property name="sequenceHandlerType">1</property>
       <!--<property name="sequnceHandlerPattern">(?:(\s*next\s+value\s+for\s*MYCATSEQ_(\w+))(,|)|\s)*)+</property>
       INSERT INTO `travelrecord` (`id`,user_id) VALUES ('next value for MYCATSEQ_GLOBAL',"xxx");
       -->
       <!--必須帶有MYCATSEQ_或者 mycatseq_進入序列匹配流程 注意MYCATSEQ_有空格的情況-->
       <property name="sequnceHandlerPattern">(?:(\s*next\s+value\s+for\s*MYCATSEQ_(\w+))(,|)|\s)*)+</property>
       <property name="subqueryRelationshipCheck">false</property> <!-- 子查詢中存在關聯查詢的情況下,檢查關聯欄位中是否有分片欄位 .預設 false -->
       <property name="sequenceHanlderClass">io.mycat.route.sequence.handler.HttpIncrSequenceHandler</property>
       <!--  <property name="useCompression">1</property>--> <!--1為開啟mysql壓縮協議-->
       <!--  <property name="fakeMySQLVersion">5.6.20</property>--> <!--設定模擬的MySQL版本號-->
       <!-- <property name="processorBufferChunk">40960</property> -->
       <!--
       <property name="processors">1</property>
       <property name="processorExecutor">32</property>
        -->
       <!--預設為type 0: DirectByteBufferPool | type 1 ByteBufferArena | type 2 NettyBufferPool -->
       <property name="processorBufferPoolType">0</property>
       <!--預設是65535 64K 用於sql解析時最大文字長度 -->
       <!--<property name="maxStringLiteralLength">65535</property>-->
       <!--<property name="sequenceHandlerType">0</property>-->
       <!--<property name="backSocketNoDelay">1</property>-->
       <!--<property name="frontSocketNoDelay">1</property>-->
       <!--<property name="processorExecutor">16</property>-->
       <!--
           <property name="serverPort">8066</property> <property name="managerPort">9066</property>
           <property name="idleTimeout">300000</property> <property name="bindIp">0.0.0.0</property>
           <property name="dataNodeIdleCheckPeriod">300000</property> 5 * 60 * 1000L; //連線空閒檢查
           <property name="frontWriteQueueSize">4096</property> <property name="processors">32</property> -->
       <!--分散式事務開關,0為不過濾分散式事務,1為過濾分散式事務(如果分散式事務內只涉及全域性表,則不過濾),2為不過濾分散式事務,但是記錄分散式事務日誌-->
       <property name="handleDistributedTransactions">0</property>

       <!--
       off heap for merge/order/group/limit      1開啟   0關閉
   -->
       <property name="useOffHeapForMerge">0</property>

       <!--
           單位為m
       -->
       <property name="memoryPageSize">64k</property>

       <!--
           單位為k
       -->
       <property name="spillsFileBufferSize">1k</property>

       <property name="useStreamOutput">0</property>

       <!--
           單位為m
       -->
       <property name="systemReserveMemorySize">384m</property>


       <!--是否採用zookeeper協調切換  -->
       <property name="useZKSwitch">false</property>

       <!-- XA Recovery Log日誌路徑 -->
       <!--<property name="XARecoveryLogBaseDir">./</property>-->

       <!-- XA Recovery Log日誌名稱 -->
       <!--<property name="XARecoveryLogBaseName">tmlog</property>-->
       <!--如果為 true的話 嚴格遵守隔離級別,不會在僅僅只有select語句的時候在事務中切換連線-->
       <property name="strictTxIsolation">false</property>
       <!--如果為0的話,涉及多個DataNode的catlet任務不會跨執行緒執行-->
       <property name="parallExecute">0</property>
   </system>
複製程式碼

防火牆配置

<firewall>
   <whitehost>
      <host host="1*7.0.0.*" user="root"/>
   </whitehost>
   <blacklist check="false">
   </blacklist>
</firewall>
複製程式碼

2.2 Server.xml 細節解析

Server.xml 內部有幾大標籤 , 主要為 user , system 和 firewall

// user : 用於定義登入的 Mycat 的使用者和許可權 , 最終對映為 UserConfig 除了上面展示的 , 其還提供瞭如下特殊屬性 : 
- benchmark : 負載均衡策略 , 0 為不限制連線數
- privilegesConfig : 表級別的增刪改查設定


// System 標籤 : 系統配置標籤 , 最終對映為 SystemConfig 物件 , 除了上面展示 , 其主要還有如下配置
- charset : 配置字符集 , 務必和資料庫一致
- defaultSqlParser : 指定預設的解析器
- processors : 系統可用執行緒數 (預設Runtime.getRuntime().availableProcessors())
- processorBufferChunk : 每次分配 Socket Direct Buffer , 此處會影響獲取位元組的大小
- processorBufferPool : BufferPool 的計算比例
- processorBufferLocalPercent : ThreadLocalPool 分配 Pool 的比例大小 , 預設 100
- sequnceHandlerType : Mycat 全域性序列的型別

- private long processorCheckPeriod : 清理 NIOProcessor 空閒間隔
- private long dataNodeIdleCheckPeriod :  後端連線清理間隔
- private long dataNodeHeartbeatPeriod :  對後端讀寫發起心跳間隔
- private int useOffHeapForMerge : 是否啟用Off Heap for Merge  1-啟用,0-不啟用


- private int usingAIO = 0 : 是否開啟 AIO 
- private int packetHeaderSize = 4 : MySQL 協議報文長度
- private int maxPacketSize = 16 * 1024 * 1024 : 可用攜帶的資料最大大小
- private String memoryPageSize : 頁大小,對應MemoryBlock的大小,單位為M
- private long idleTimeout : 連線的空閒時間的超時長度
- private int txIsolation : 初始阿虎啊前端連線事務的隔離級別 , 對應 1-4

複製程式碼

三 . Server.xml 的載入方式

Server.xml 是在 ConfigInitializer 進行的載入 ,最終載入位 Map , 進而傳遞到 MycatServer 中

//讀取server.xml
XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);

// 對應 schema.xml -> dataHost
private final Map<String, DataHostConfig> dataHosts;
// 對應 schema.xml -> dataNode
private final Map<String, DataNodeConfig> dataNodes;
// 對應 schema.xml規則
private final Map<String, SchemaConfig> schemas;
// 對應 System 全域性配置
private final SystemConfig system;
// 對應 User 特定 Config
private final Map<String, UserConfig> users;
// 對應防火牆配置
private final FirewallConfig firewall;
// 對應分片配置
private final ClusterConfig cluster;
複製程式碼

四. 配置的使用場景

4.1 節點一 : Server 連線

當通過工具連線時 , 會首先進行 init DB 操作 ,此時會獲取 DB 資訊獲取連線

C- FrontendConnection # initDB
public void initDB(byte[] data) {
   
   // S1 : 通過 MySQLMessage 物件獲取 data 資料
   // init 請求引數 : \u0006\u0000\u0000\u0000\u0002db001 -> db001

   // S2 : 檢查 DB 有效性
   db == null || !privileges.schemaExists(db)
   
   // S3 : 校驗當前使用者是否存在
   privileges.userExists(user, host)
 
   // S4 : 獲取 Schemas
   Set<String> schemas = privileges.getUserSchemas(user);
   if (schemas == null || schemas.size() == 0 || schemas.contains(db)) {
      this.schema = db;
      // OkPacket.OK : 07 00 00 02 00 00 00 02 00 00 00
      write(writeToBuffer(OkPacket.OK, allocate()));
   } else {
      String s = "Access denied for user '" + user + "' to database '" + db + "'";
      writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
   }
} 

複製程式碼

補充一 : OkPacket.OK : 700010002000 是什麼意思 ? MySQL OK 包返回結構

一個 OK 資料包從伺服器傳送到客戶機,表示命令成功完成 , 在 MySQL 5.7.5中,OK packes 也被用來表示 EOF,而 EOF 資料包已被棄用。

在 packes 中會包含如下資料 : 包頭 , 受影響的行 , 最後一次插入的 ID , Status Flag 的狀態 等等 , 後面有機會 , 找個案例詳細的看看

補充二 : privileges 物件

可以看到 , 上述程式碼中頻繁出現 FrontendPrivileges , 該物件的作用是什麼 ?

FrontendPrivileges 是一個許可權提供者介面 , 他提供了幾種常見的方法例如 :

  • schemaExists : 檢查schema是否存在
  • userExists : 檢查使用者是否存在,並且可以使用host實行隔離策略
  • checkFirewallWhiteHostPolicy : 檢查防火牆策略

....等等

可以理解為 , FrontendPrivileges 就是對 server.xml 資料的邏輯處理 , 其主要實現類為 MycatPrivileges .

其內部邏輯也比較簡單 , 主要是對 MycatConfig 的處理 , 例如 :


// 檢查schema是否存在
public boolean schemaExists(String schema) {
    MycatConfig conf = MycatServer.getInstance().getConfig();
    return conf.getSchemas().containsKey(schema);
}


// 非常好的實踐 , 直接返回例項物件 
public static final MycatServer getInstance() {
    // private static final MycatServer INSTANCE = new MycatServer();
    // 而該靜態物件在構造器中就已經完成了相關config 的初始化
    return INSTANCE;
}

複製程式碼

可以看到 , 內部配置基本上包含了常用的 Config

image.png

4.2 節點二 : 查詢時配置

配置會在各個環節生效 , 下面來看一下幾個常見的場景 :

超時時間的常見使用場景


// 定時檢查任務 , 處理回收資源 : 
C- MycatServer # processorCheck : 
C- NIOProcessor # backendCheck
private void backendCheck() {
    // S1 : 獲取超時時間
   long sqlTimeout = MycatServer.getInstance().getConfig().getSystem().getSqlExecuteTimeout() * 1000L;
   
   // S2 :對所有的連線進行迭代
   Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();

   // S3 : 首先刪除空連線 , 然後SQL執行超時的連線關閉
  if (c.isBorrowed() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
      //.....
  }
     
}

// PS : 還有多個使用場景 , 這裡就不一一看了

複製程式碼

從這個案例上面 , 就可以明顯的看到 , 配置的主要使用還是靠從 MycatServer 中獲取配置進行處理 , 而 MycatServer 就是整個流程的核心物件之一 , 下一篇來詳細看看

五 . Server 的資料接收

在 Mycat NIO 處理請求的時候 , 可以看到一個 byte[] 持續流轉其中 , 那麼這個 Byte 中包含了那些資料呢 ?

// 在 FrontendConnection # initDB 中 , 我們可以看到 data 的具體資料 , 以一個連線為例 : 
- \u0006\u0000\u0000\u0000\u0002 : 字首 , 判斷具體的型別
- db001 : 具體的 Server Schema 

// 補充 : 通過前四位型別判斷 C- FrontendCommandHandler # handle
public void handle(byte[] data) {
    if (source.getLoadDataInfileHandler() != null && source.getLoadDataInfileHandler().isStartLoadData()) {
        MySQLMessage mm = new MySQLMessage(data);
        int packetLength = mm.readUB3();
        if (packetLength + 4 == data.length) {
            source.loadDataInfileData(data);
        }
        return;
    }
    
    // 可以看到
    switch (data[4]) {
        // public static final byte COM_INIT_DB = 2;
        case MySQLPacket.COM_INIT_DB:
            commands.doInitDB();
            source.initDB(data);
            break;
        // public static final byte COM_QUERY = 3;    
        case MySQLPacket.COM_QUERY:
            commands.doQuery();
            source.query(data);
            break;
        case MySQLPacket.COM_PING:
            commands.doPing();
            source.ping();
            break;
        case MySQLPacket.COM_QUIT:
            commands.doQuit();
            source.close("quit cmd");
            break;
        //..... 省略部分型別
        default:
            commands.doOther();
            MycatConfig config = MycatServer.getInstance().getConfig();
            if (config.getSystem().getIgnoreUnknownCommand() == 1) {
                LOGGER.warn("Unknown command:{}", data[4]);
                source.ping();
            } else {
                LOGGER.error("Unknown command:{}", new String(data));
                source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                        "Unknown command");
            }
    }
}
複製程式碼

一般一個查詢會分成2個部分 :

  • 初始化連線
  • 執行當前的操作 , 以 Query 為例

image.png

這裡可以看到 , Byte[0] - Byte[4] 還是標誌位 , 後面會傳入具體的 SQL . 不論是 initDB 還是 Query , 都會進行一個關鍵步驟 :

MySQLMessage mm = new MySQLMessage(data);
// 解析標誌位
mm.position(5);
String db = mm.readString();


// 當標註位解析完成後 , 最終會呼叫對應的 Handler 完成後續的邏輯
- queryHandler.query(sql);

複製程式碼

總結

對入口進行了簡單的學習 , 參考了文件 , 也做了一定的補充 , 後續可以慢慢的深入整個體系了

最後

如果你覺得此文對你有一丁點幫助,點個贊。或者可以加入我的開發交流群:1025263163相互學習,我們會有專業的技術答疑解惑

如果你覺得這篇文章對你有點用的話,麻煩請給我們的開源專案點點star:http://github.crmeb.net/u/defu不勝感激 !

PHP學習手冊:http://doc.crmeb.com
技術交流論壇:http://q.crmeb.com