DSS1.0.0與Linkis1.0.2——Flink引擎相關問題彙總
目錄導讀
-
1. 前言
-
2. Flink 引擎的編譯及安裝
-
3. 新增 Flink 引擎類型
-
4. 測試運行 flink-sql
-
4.1 參數數字轉換異常
-
4.2 ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache
-
4.3 Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [myhive] does not exist.
-
4.4 Throw error java.lang.NoClassDefFoundError: org/apache/hive/common/util/HiveVersionInfo
-
4.5 Throw error java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/MRVersio
-
4.6 Throw error java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService\$Iface
-
4.7 Client cannot authenticate via:[TOKEN, KERBEROS]
-
5. 模擬線上場景來測試 flink-sql
-
5.1 Could not find any factory for identifier 'kafka' that implements
-
5.2 Could not find any factory for identifier 'json' that implements
-
5.3 Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
-
5.4 Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log
-
5.5 ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
-
5.6 消費數據不寫 hive 表
1. 前言
此篇文章依舊是對上篇文章 《DSS1.0.0+Linkis1.0.2——CDH5 環境中的試用記錄》 的補充和完善,在上篇文章中,針對 Flink 引擎的測試,由於集羣資源的不足,暫時沒有跑通。所以打算用這篇文章繼續來記錄 Flink 引擎後續測試過程中遇到和解決的一些問題,並以一個實際案例,通過 Scripts 平台來提交我們的 Flink SQL 到 Yarn 上去執行。
歷史文章回顧:
《DSS1.0.0+Linkis1.0.2——CDH5 環境中的試用記錄》
《DSS1.0.0 與 Linkis1.0.2——JDBC 引擎相關問題彙總》
2. Flink 引擎的編譯及安裝
對於 Flink 引擎的編譯及安裝,以及本次測試所用的軟硬件環境和版本,可以參考歷史文章。
3. 新增 Flink 引擎類型
安裝完成 Flink 引擎之後,我們想要通過 Scripts 提交 SQL 去執行,還需要在 DSS 管理台為 flink 設置 IDE 相關的參數。

新增 flink 引擎之後,該引擎下面默認是沒有 flink 資源參數的可編輯入口,所以此時我們無法修改 flink 引擎啟動時的默認資源參數配置。

Flink 引擎默認的資源參數配置項及配置項的值,可以參考 FlinkResourceConfiguration
中的代碼:
val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 4) //Unit: G(單位為G)
val LINKIS_FLINK_CLIENT_CORES = 1 //Fixed to 1(固定為1) CommonVars[Int]("wds.linkis.driver.cores", 1)
val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 2) //Unit: G(單位為G)
val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4) //Unit: G(單位為G)
val LINKIS_FLINK_TASK_SLOTS = CommonVars[Int]("flink.taskmanager.numberOfTaskSlots", 2)
val LINKIS_FLINK_TASK_MANAGER_CPU_CORES = CommonVars[Int]("flink.taskmanager.cpu.cores", 2)
val LINKIS_FLINK_CONTAINERS = CommonVars[Int]("flink.container.num", 2)
val LINKIS_QUEUE_NAME = CommonVars[String]("wds.linkis.rm.yarnqueue", "default")
val FLINK_APP_DEFAULT_PARALLELISM = CommonVars("wds.linkis.engineconn.flink.app.parallelism", 4)
接下來需要做的事是,把上述配置存放進數據庫,具體做法可以參考 sql 文件 linkis_dml.sql
中對 spark 引擎資源配置數據的初始化。SQL 參考如下:
SET @FLINK_LABEL="flink-1.12.2";
SET @FLINK_ALL=CONCAT('*-*,',@FLINK_LABEL);
SET @FLINK_IDE=CONCAT('*-IDE,',@FLINK_LABEL);
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@FLINK_ALL, 'OPTIONAL', 2, now(), now());
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@FLINK_IDE, 'OPTIONAL', 2, now(), now());
select @label_id := id from linkis_cg_manager_label where `label_value` = @FLINK_IDE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.client.memory', '取值範圍:1-15,單位:G', 'flink客户端的內存大小', '1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.jobmanager.memory', '取值範圍:1-30,單位:G', 'flink JobManager的內存大小', '1g', 'Regex', '^([1-9]|[1-2][0-9]|3[0])(G|g)$', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.memory', '取值範圍:1-30,單位:G', 'flink TaskManager的內存大小', '1g', 'Regex', '^([1-9]|[1-2][0-9]|3[0])(G|g)$', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.numberOfTaskSlots', '取值範圍:1-40,單位:個', 'flink執行器實例最大併發數', '2', 'NumInterval', '[1,40]', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.cpu.cores', '取值範圍:1-8,單位:個', 'flink執行器核心個數', '2', 'NumInterval', '[1,8]', '0', '0', '1','flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.container.num', '取值範圍:1-40,單位:個', 'flink container個數', '2', 'NumInterval', '[1,40]', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.yarnqueue', 'flink任務所用yarn隊列名', 'yarn隊列名', 'ide', 'None', NULL, '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.flink.app.parallelism', '取值範圍:1-40,單位:個', 'flink app並行度', '1', 'NumInterval', '[1,40]', '0', '0', '1', 'flink資源設置', 'flink');
insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'flink' and label.label_value = @FLINK_ALL);
insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
(select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @FLINK_ALL);
上述 SQL 執行成功之後,再來看 Flink 引擎的資源參數配置界面:

4. 測試運行 flink-sql
Flink 引擎的資源參數配置問題搞定之後,我們就可以根據集羣的資源,按需來修改默認的資源參數。然後就可以在 Scripts 中提交 flink-sql 到 yarn 上執行。
測試 SQL:
use catalog myhive;
show databases;
4.1 參數數字轉換異常
關鍵報錯信息如下:
Failed to async get EngineNode ErrorException: errCode: 0 ,desc: operation failed(操作失敗)s!the reason(原因):NumberFormatException: For input string: "1g"
這裏可能存在 BUG,參考 Spark 引擎實現中的 SparkSubmitProcessEngineConnLaunchBuilder
,資源參數中有帶單位的,需要在代碼中處理下單位,然後才能正常進行數字轉換。
如果你不想扒源碼,那就直接修改校驗參數的正則表達式,使沒有輸入 G|g 也能保存資源參數。我這裏暫時選擇這樣的解決方式。
4.2 ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache
關鍵報錯信息如下:
ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache
缺少 mapreduce 安全相關的 jar,具體是少了哪個 jar 包,最快的知道方式是直接在 hadoop 源碼中搜索。我這裏 hadoop 的版本是 2.6.0-cdh5.13.1
,缺少的 jar 名稱為:hadoop-mapreduce-client-core-2.6.0-cdh5.13.1.jar
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-core-2.6.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib
結合 4.5 小節一起看
4.3 Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [myhive] does not exist.
在 flink 引擎 conf 目錄下的 flink-sql-defaults.yaml 文件中增加名為 myhive 的 catalog 定義
/opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/conf
vim flink-sql-defaults.yaml
配置參考:
catalogs:
- name: myhive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.1.0
property-version: 1
default-database: test
圖示:

4.4 Throw error java.lang.NoClassDefFoundError: org/apache/hive/common/util/HiveVersionInfo
cp /opt/cloudera/parcels/CDH/jars/hive-exec-1.1.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib
cp /opt/cloudera/parcels/CDH/jars/hive-metastore-1.1.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib
4.5 Throw error java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/MRVersio
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-common-2.6.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib
4.6 Throw error java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService$Iface
cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib
4.7 Client cannot authenticate via:[TOKEN, KERBEROS]
具體報錯信息如下:
[Linkis-Default-Scheduler-Thread-5] com.webank.wedatasphere.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor 57 error - execute code failed! java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "centos-bigdatacdh"; destination host is: "ip":8020;
flink 引擎 conf/linkis-engineconn.properties 配置文件中加入 Kerberos 相關的配置:
wds.linkis.keytab.enable=true
wds.linkis.keytab.file=/opt/user_keytab
wds.linkis.keytab.host.enabled=false
wds.linkis.keytab.host=127.0.0.1
同時,在 flink 的 conf/flink-conf.yaml 中也加入 kerberos 的配置
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /opt/user_keytab/hadoop.keytab
security.kerberos.login.principal: [email protected]
yarn.log-aggregation-enable: true
上述遇到的異常都解決完畢之後,測試 flink-sql 的腳本就可以成功執行啦,但這僅僅是跑通了一個 show databases;
,並不能説明 flink 引擎就完全可用啦,大家還不要高興的太早,最好再拿一個實際的場景來對 flink 引擎進行更加深入的測試,
5. 模擬線上場景來測試 flink-sql
場景説明:消費 kafka 中的數據保存進 hive 裏:
kafka 的測試 json 格式的數據:
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:15:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:20:00"}
{"user_id":"a3112","order_amount":15.0,"log_ts":"2020-11-26 12:30:00"}
{"user_id":"a2112","order_amount":15.0,"log_ts":"2020-11-26 12:32:00"}
kafka 生產數據的示例命令:
kafka-console-producer --broker-list broker-ip:9092 --topic test_kafka_01
hive 表建表語句:
create table `test.test_target_tab` (
user_id string,
order_amount double,
day string,
hour string
);
測試 SQL:
DROP TABLE IF EXISTS myhive.test.test_source;
CREATE TABLE myhive.test.test_source (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka_01',
'properties.bootstrap.servers' = 'broker_ip:9092',
'properties.group.id' = 'group_test_01',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
INSERT INTO myhive.test.test_target_tab SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM myhive.test.test_source;
5.1 Could not find any factory for identifier 'kafka' that implements
異常信息:Could not find any factory for identifier 'kafka' that implements
21304, Task is Failed,errorMsg: ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem
解決辦法:
下載 flink-connector-kafka_2.11-1.12.2.jar
到引擎的 lib 目錄下
5.2 Could not find any factory for identifier 'json' that implements
21304, Task is Failed,errorMsg: ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. Available factory identifiers are: raw
解決辦法:
下載 flink-json-1.12.2.jar
到引擎的 lib 目錄下
5.3 Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
2021-11-15 18:48:11.649 ERROR [Linkis-Default-Scheduler-Thread-1] com.webank.wedatasphere.linkis.common.utils.Utils$ 57 error - Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
替換 flink-connector-kafka_2.11-1.12.2.jar 為 flink-sql-connector-kafka_2.11-1.12.2.jar
5.4 Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log
2021-11-15 19:05:36.326 ERROR [Linkis-Default-Scheduler-Thread-3] com.webank.wedatasphere.linkis.common.utils.Utils$ 57 error - Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log$Logger
flink 引擎目錄的 lib 文件夾中放入: minlog-1.3.1.jar
5.5 ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
Flink App UI 中任務關鍵的報錯信息如下:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass
從這裏就開始跑偏了,一直以為是缺少 Kafka 相關的 jar 包,於是就一段亂試。但是,無論在 flink 引擎的 lib 目錄中,還是在 flink 客户端的 lib 目錄中放入 kafka(connector)相關的 jar 包都無濟於事。
從社區小夥伴那裏得到了一個有效的解決方案是在 flink 引擎目錄的 conf/linkis-engineconn.properties 配置文件中增加如下配置:
flink.lib.path=hdfs:///linkis/engine-lib/flink/lib
而 hdfs:///linkis/engine-lib/flink/lib 目錄中存放的是 flink 引擎目錄下,lib 中的 jar 包;同時還要包括 flink 客户端目錄下,lib 中的 flink-dist_2.11-1.12.2.jar。否則你可能會遇到如下錯誤:
ClusterDeploymentException: The "yarn.provided.lib.dirs" has to also include the lib/, plugin/ and flink-dist jar. In other case, it cannot be used.
或者如果不確定的話,可以把 flink 客户端目錄下,lib 目錄中的 jar 都放進 hdfs:///linkis/engine-lib/flink2/lib 裏。
5.6 消費數據不寫 hive 表
flink 可以正常消費 Kafka 中的數據,flink 引擎日誌以及 yarn UI 上的 application 中的日誌無報錯,但是過了默認的 checkpoint 的時間點之後,hive 中還是沒有數據產生。
我的解決辦法是,在 flink 的配置文件中增加 checkpoint 的配置,這個配置應該是全局的,針對所有的 job 有效,當然,應該也可以針對單個任務來對 checkpoint 進行配置。
配置文件中 checkpoint 的配置參考如下:
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs:///flink-savepoints
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
上述的一切假如你都經歷過,然後又看到了這裏,那麼你應該就初步跑通了 flink 引擎:
Flink checkpoint

至此,hive 中的數據寫入也正常啦,整個流程就跑通了,至於更復雜的場景,應該也不在話下啦!