DSS1.0.0與Linkis1.0.2——Flink引擎相關問題彙總

語言: CN / TW / HK

目錄導讀

  • 1. 前言

  • 2. Flink 引擎的編譯及安裝

  • 3. 新增 Flink 引擎類型

  • 4. 測試運行 flink-sql

    • 4.1 參數數字轉換異常

    • 4.2 ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache

    • 4.3 Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [myhive] does not exist.

    • 4.4 Throw error java.lang.NoClassDefFoundError: org/apache/hive/common/util/HiveVersionInfo

    • 4.5 Throw error java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/MRVersio

    • 4.6 Throw error java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService\$Iface

    • 4.7 Client cannot authenticate via:[TOKEN, KERBEROS]

  • 5. 模擬線上場景來測試 flink-sql

    • 5.1 Could not find any factory for identifier 'kafka' that implements

    • 5.2 Could not find any factory for identifier 'json' that implements

    • 5.3 Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

    • 5.4 Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log

    • 5.5 ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

    • 5.6 消費數據不寫 hive 表

1. 前言

此篇文章依舊是對上篇文章 《DSS1.0.0+Linkis1.0.2——CDH5 環境中的試用記錄》 的補充和完善,在上篇文章中,針對 Flink 引擎的測試,由於集羣資源的不足,暫時沒有跑通。所以打算用這篇文章繼續來記錄 Flink 引擎後續測試過程中遇到和解決的一些問題,並以一個實際案例,通過 Scripts 平台來提交我們的 Flink SQL 到 Yarn 上去執行。

歷史文章回顧:

《Linkis1.0——CDH5 環境中的安裝與踩坑》

《DSS1.0.0+Linkis1.0.2——CDH5 環境中的試用記錄》

《DSS1.0.0 與 Linkis1.0.2——JDBC 引擎相關問題彙總》

2. Flink 引擎的編譯及安裝

對於 Flink 引擎的編譯及安裝,以及本次測試所用的軟硬件環境和版本,可以參考歷史文章。

3. 新增 Flink 引擎類型

安裝完成 Flink 引擎之後,我們想要通過 Scripts 提交 SQL 去執行,還需要在 DSS 管理台為 flink 設置 IDE 相關的參數。

flink-engine

新增 flink 引擎之後,該引擎下面默認是沒有 flink 資源參數的可編輯入口,所以此時我們無法修改 flink 引擎啟動時的默認資源參數配置。

flink-resources

Flink 引擎默認的資源參數配置項及配置項的值,可以參考 FlinkResourceConfiguration 中的代碼:

val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 4) //Unit: G(單位為G)
val LINKIS_FLINK_CLIENT_CORES = 1 //Fixed to 1(固定為1) CommonVars[Int]("wds.linkis.driver.cores", 1)

val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 2) //Unit: G(單位為G)
val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4) //Unit: G(單位為G)
val LINKIS_FLINK_TASK_SLOTS = CommonVars[Int]("flink.taskmanager.numberOfTaskSlots", 2)
val LINKIS_FLINK_TASK_MANAGER_CPU_CORES = CommonVars[Int]("flink.taskmanager.cpu.cores", 2)
val LINKIS_FLINK_CONTAINERS = CommonVars[Int]("flink.container.num", 2)
val LINKIS_QUEUE_NAME = CommonVars[String]("wds.linkis.rm.yarnqueue", "default")
val FLINK_APP_DEFAULT_PARALLELISM = CommonVars("wds.linkis.engineconn.flink.app.parallelism", 4)

接下來需要做的事是,把上述配置存放進數據庫,具體做法可以參考 sql 文件 linkis_dml.sql 中對 spark 引擎資源配置數據的初始化。SQL 參考如下:

SET @FLINK_LABEL="flink-1.12.2";
SET @FLINK_ALL=CONCAT('*-*,',@FLINK_LABEL);
SET @FLINK_IDE=CONCAT('*-IDE,',@FLINK_LABEL);

insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@FLINK_ALL, 'OPTIONAL', 2, now(), now());
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@FLINK_IDE, 'OPTIONAL', 2, now(), now());

select @label_id := id from linkis_cg_manager_label where `label_value` = @FLINK_IDE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);

INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.client.memory', '取值範圍:1-15,單位:G', 'flink客户端的內存大小', '1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.jobmanager.memory', '取值範圍:1-30,單位:G', 'flink JobManager的內存大小', '1g', 'Regex', '^([1-9]|[1-2][0-9]|3[0])(G|g)$', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.memory', '取值範圍:1-30,單位:G', 'flink TaskManager的內存大小', '1g', 'Regex', '^([1-9]|[1-2][0-9]|3[0])(G|g)$', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.numberOfTaskSlots', '取值範圍:1-40,單位:個', 'flink執行器實例最大併發數', '2', 'NumInterval', '[1,40]', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.cpu.cores', '取值範圍:1-8,單位:個', 'flink執行器核心個數', '2', 'NumInterval', '[1,8]', '0', '0', '1','flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.container.num', '取值範圍:1-40,單位:個', 'flink container個數', '2', 'NumInterval', '[1,40]', '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.yarnqueue', 'flink任務所用yarn隊列名', 'yarn隊列名', 'ide', 'None', NULL, '0', '0', '1', 'flink資源設置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.flink.app.parallelism', '取值範圍:1-40,單位:個', 'flink app並行度', '1', 'NumInterval', '[1,40]', '0', '0', '1', 'flink資源設置', 'flink');

insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'flink' and label.label_value = @FLINK_ALL);


insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
(select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @FLINK_ALL);

上述 SQL 執行成功之後,再來看 Flink 引擎的資源參數配置界面:

flink-config

4. 測試運行 flink-sql

Flink 引擎的資源參數配置問題搞定之後,我們就可以根據集羣的資源,按需來修改默認的資源參數。然後就可以在 Scripts 中提交 flink-sql 到 yarn 上執行。

測試 SQL:

use catalog myhive;
show databases;

4.1 參數數字轉換異常

關鍵報錯信息如下:

Failed  to async get EngineNode ErrorException: errCode: 0 ,desc: operation failed(操作失敗)s!the reason(原因):NumberFormatException: For input string: "1g"

這裏可能存在 BUG,參考 Spark 引擎實現中的 SparkSubmitProcessEngineConnLaunchBuilder ,資源參數中有帶單位的,需要在代碼中處理下單位,然後才能正常進行數字轉換。

如果你不想扒源碼,那就直接修改校驗參數的正則表達式,使沒有輸入 G|g 也能保存資源參數。我這裏暫時選擇這樣的解決方式。

4.2 ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache

關鍵報錯信息如下:

ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache

缺少 mapreduce 安全相關的 jar,具體是少了哪個 jar 包,最快的知道方式是直接在 hadoop 源碼中搜索。我這裏 hadoop 的版本是 2.6.0-cdh5.13.1 ,缺少的 jar 名稱為:hadoop-mapreduce-client-core-2.6.0-cdh5.13.1.jar

cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-core-2.6.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

結合 4.5 小節一起看

4.3 Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [myhive] does not exist.

在 flink 引擎 conf 目錄下的 flink-sql-defaults.yaml 文件中增加名為 myhive 的 catalog 定義

/opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/conf
vim flink-sql-defaults.yaml

配置參考:

catalogs:
- name: myhive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.1.0
property-version: 1
default-database: test

圖示:

hive-catalog

4.4 Throw error java.lang.NoClassDefFoundError: org/apache/hive/common/util/HiveVersionInfo

cp /opt/cloudera/parcels/CDH/jars/hive-exec-1.1.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

cp /opt/cloudera/parcels/CDH/jars/hive-metastore-1.1.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

4.5 Throw error java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/MRVersio

cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-common-2.6.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

4.6 Throw error java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService$Iface

cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

4.7 Client cannot authenticate via:[TOKEN, KERBEROS]

具體報錯信息如下:

[Linkis-Default-Scheduler-Thread-5] com.webank.wedatasphere.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor 57 error - execute code failed! java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "centos-bigdatacdh"; destination host is: "ip":8020;

flink 引擎 conf/linkis-engineconn.properties 配置文件中加入 Kerberos 相關的配置:

wds.linkis.keytab.enable=true
wds.linkis.keytab.file=/opt/user_keytab
wds.linkis.keytab.host.enabled=false
wds.linkis.keytab.host=127.0.0.1

同時,在 flink 的 conf/flink-conf.yaml 中也加入 kerberos 的配置

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /opt/user_keytab/hadoop.keytab
security.kerberos.login.principal: [email protected]
yarn.log-aggregation-enable: true

上述遇到的異常都解決完畢之後,測試 flink-sql 的腳本就可以成功執行啦,但這僅僅是跑通了一個 show databases; ,並不能説明 flink 引擎就完全可用啦,大家還不要高興的太早,最好再拿一個實際的場景來對 flink 引擎進行更加深入的測試,

5. 模擬線上場景來測試 flink-sql

場景説明:消費 kafka 中的數據保存進 hive 裏:

kafka 的測試 json 格式的數據:

{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:15:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:20:00"}
{"user_id":"a3112","order_amount":15.0,"log_ts":"2020-11-26 12:30:00"}
{"user_id":"a2112","order_amount":15.0,"log_ts":"2020-11-26 12:32:00"}

kafka 生產數據的示例命令:

kafka-console-producer --broker-list broker-ip:9092 --topic test_kafka_01

hive 表建表語句:

create table `test.test_target_tab` (
user_id string,
order_amount double,
day string,
hour string
);

測試 SQL:

DROP TABLE IF EXISTS myhive.test.test_source;
CREATE TABLE myhive.test.test_source (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka_01',
'properties.bootstrap.servers' = 'broker_ip:9092',
'properties.group.id' = 'group_test_01',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);


INSERT INTO myhive.test.test_target_tab SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM myhive.test.test_source;

5.1 Could not find any factory for identifier 'kafka' that implements

異常信息:Could not find any factory for identifier 'kafka' that implements

21304, Task is Failed,errorMsg: ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem

解決辦法:

下載 flink-connector-kafka_2.11-1.12.2.jar 到引擎的 lib 目錄下

5.2 Could not find any factory for identifier 'json' that implements

21304, Task is Failed,errorMsg: ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. Available factory identifiers are: raw

解決辦法:

下載 flink-json-1.12.2.jar 到引擎的 lib 目錄下

5.3 Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

2021-11-15 18:48:11.649 ERROR [Linkis-Default-Scheduler-Thread-1] com.webank.wedatasphere.linkis.common.utils.Utils$ 57 error - Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

替換 flink-connector-kafka_2.11-1.12.2.jar 為 flink-sql-connector-kafka_2.11-1.12.2.jar

5.4 Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log

2021-11-15 19:05:36.326 ERROR [Linkis-Default-Scheduler-Thread-3] com.webank.wedatasphere.linkis.common.utils.Utils$ 57 error - Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log$Logger

flink 引擎目錄的 lib 文件夾中放入: minlog-1.3.1.jar

5.5 ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Flink App UI 中任務關鍵的報錯信息如下:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass

從這裏就開始跑偏了,一直以為是缺少 Kafka 相關的 jar 包,於是就一段亂試。但是,無論在 flink 引擎的 lib 目錄中,還是在 flink 客户端的 lib 目錄中放入 kafka(connector)相關的 jar 包都無濟於事。

從社區小夥伴那裏得到了一個有效的解決方案是在 flink 引擎目錄的 conf/linkis-engineconn.properties 配置文件中增加如下配置:

flink.lib.path=hdfs:///linkis/engine-lib/flink/lib

而 hdfs:///linkis/engine-lib/flink/lib 目錄中存放的是 flink 引擎目錄下,lib 中的 jar 包;同時還要包括 flink 客户端目錄下,lib 中的 flink-dist_2.11-1.12.2.jar。否則你可能會遇到如下錯誤:

ClusterDeploymentException: The "yarn.provided.lib.dirs" has to also include the lib/, plugin/ and flink-dist jar. In other case, it cannot be used.

或者如果不確定的話,可以把 flink 客户端目錄下,lib 目錄中的 jar 都放進 hdfs:///linkis/engine-lib/flink2/lib 裏。

5.6 消費數據不寫 hive 表

flink 可以正常消費 Kafka 中的數據,flink 引擎日誌以及 yarn UI 上的 application 中的日誌無報錯,但是過了默認的 checkpoint 的時間點之後,hive 中還是沒有數據產生。

我的解決辦法是,在 flink 的配置文件中增加 checkpoint 的配置,這個配置應該是全局的,針對所有的 job 有效,當然,應該也可以針對單個任務來對 checkpoint 進行配置。

配置文件中 checkpoint 的配置參考如下:

state.backend: filesystem
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs:///flink-savepoints
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE

上述的一切假如你都經歷過,然後又看到了這裏,那麼你應該就初步跑通了 flink 引擎:

Flink checkpoint

flink-checkpoint

至此,hive 中的數據寫入也正常啦,整個流程就跑通了,至於更復雜的場景,應該也不在話下啦!