flink(九):Table&Sql環境搭建和程式結構

語言: CN / TW / HK

說明

  • Flink Table 相關知識是我一直感興趣的部分,現決定跨過一些不必要的知識,直接學習 Flink Table ,本文主要介紹 flink table 架構和介面實現。
  • Apache Flink 有兩種關係型 API 來做流批統一處理:Table API 和 SQL。Table API 是用於 Scala 和 Java 語言的查詢API,它可以用一種非常直觀的方式來組合使用選取、過濾、join 等關係型運算元。Flink SQL 是基於 Apache Calcite 來實現的標準 SQL。這兩種 API 中的查詢對於批(DataSet)和流(DataStream)的輸入有相同的語義,也會產生同樣的計算結果。

資料

架構

  • 1.9版本Blink是阿里提供,Flink 1.12預設Blink實現Tabel API和SQL功能。

優勢

  • 宣告式:使用者只關心做什麼,不用關心怎麼做。
  • 高效能:支援查詢優化,可以獲得更好的執行效能。
  • 流批統一:相同的統計邏輯,既可以流模式執行,也可以批模式執行。
  • 標準穩定:語義遵循SQL標準,不易變動。
  • 易理解:語義明確,所見即所得。

maven導包

tabel API和SQL

  • 針對java和scala語言匯入不同包。
<!-- java -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>
<!-- scala -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

本地環境配置

  • 如果需要在 IDE 本地執行測試程式,需要新增以下模組,具體用哪個取決使用哪個的引擎是 Flink 還是 Blink 。
<!-- flink引擎,1.9版本前預設 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>
<!-- Blink 當前預設引擎 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

優化

stream支援scala

  • 內部實現上,部分 table 相關程式碼使用 Scala 開發。不管批式或流式程式,必須新增如下依賴。
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

支援自定義格式或函式

  • 如果需要實現自定義格式解析 Kafka 資料,或者自定義函式處理業務,需要新增如下依賴。
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

程式結構

Blink(新)和flink(舊)計劃器區別

  1. Blink 將批處理作業視作流處理的一種特例。Table 和 DataSet 之間不支援相互轉換,並且批處理作業也不會轉換成 DataSet 程式而是轉換成 DataStream 程式,流處理作業也一樣。
  2. Blink 計劃器不支援 BatchTableSource,而是使用有界的 StreamTableSource 來替代。
  3. 舊計劃器和 Blink 計劃器中 FilterableTableSource 的實現是不相容的。舊計劃器會將 PlannerExpression 下推至 FilterableTableSource,而 Blink 計劃器則是將 Expression 下推。
  4. 基於字串的鍵值配置選項僅在 Blink 計劃器中使用。(詳情參見 配置 )
  5. PlannerConfig 在兩種計劃器中的實現(CalciteConfig)是不同的。
  6. Blink 計劃器會將多sink(multiple-sinks)優化成一張有向無環圖(DAG),TableEnvironment 和 StreamTableEnvironment 都支援該特性。舊計劃器總是將每個sink都優化成一個新的有向無環圖,且所有圖相互獨立。
  7. 舊計劃器目前不支援 catalog 統計資料,而 Blink 支援

程式結構

  • 用於批處理和流處理的 Table API 和 SQL 程式都遵循相同的模式,先建立TableEnvironment,再建立表,繼續通過 Table API 或 SQL 操作表,實現業務功能,例項程式碼如下:
// 建立一個TableEnvironment執行流或批處理
TableEnvironment tableEnv = ...; 

// 建立輸入表
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// 建立輸出表
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 使用 Table API 執行查詢操作
Table table2 = tableEnv.from("table1").select(...);
// 使用 SQL 執行查詢操作
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");

// 使用 Table API 獲取結果資料
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...

總結

  • Table 和 SQL 功能的加入,簡化了Flink開發難度,懂 Sql 就能開發自己需要的功能。
  • 興趣是最好的老師,日常除了工作,維護好心底的熱情,工作和興趣分別對待,時刻保持活力。