DSS1.0.0与Linkis1.0.2——Flink引擎相关问题汇总

语言: CN / TW / HK

目录导读

  • 1. 前言

  • 2. Flink 引擎的编译及安装

  • 3. 新增 Flink 引擎类型

  • 4. 测试运行 flink-sql

    • 4.1 参数数字转换异常

    • 4.2 ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache

    • 4.3 Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [myhive] does not exist.

    • 4.4 Throw error java.lang.NoClassDefFoundError: org/apache/hive/common/util/HiveVersionInfo

    • 4.5 Throw error java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/MRVersio

    • 4.6 Throw error java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService\$Iface

    • 4.7 Client cannot authenticate via:[TOKEN, KERBEROS]

  • 5. 模拟线上场景来测试 flink-sql

    • 5.1 Could not find any factory for identifier 'kafka' that implements

    • 5.2 Could not find any factory for identifier 'json' that implements

    • 5.3 Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

    • 5.4 Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log

    • 5.5 ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

    • 5.6 消费数据不写 hive 表

1. 前言

此篇文章依旧是对上篇文章 《DSS1.0.0+Linkis1.0.2——CDH5 环境中的试用记录》 的补充和完善,在上篇文章中,针对 Flink 引擎的测试,由于集群资源的不足,暂时没有跑通。所以打算用这篇文章继续来记录 Flink 引擎后续测试过程中遇到和解决的一些问题,并以一个实际案例,通过 Scripts 平台来提交我们的 Flink SQL 到 Yarn 上去执行。

历史文章回顾:

《Linkis1.0——CDH5 环境中的安装与踩坑》

《DSS1.0.0+Linkis1.0.2——CDH5 环境中的试用记录》

《DSS1.0.0 与 Linkis1.0.2——JDBC 引擎相关问题汇总》

2. Flink 引擎的编译及安装

对于 Flink 引擎的编译及安装,以及本次测试所用的软硬件环境和版本,可以参考历史文章。

3. 新增 Flink 引擎类型

安装完成 Flink 引擎之后,我们想要通过 Scripts 提交 SQL 去执行,还需要在 DSS 管理台为 flink 设置 IDE 相关的参数。

flink-engine

新增 flink 引擎之后,该引擎下面默认是没有 flink 资源参数的可编辑入口,所以此时我们无法修改 flink 引擎启动时的默认资源参数配置。

flink-resources

Flink 引擎默认的资源参数配置项及配置项的值,可以参考 FlinkResourceConfiguration 中的代码:

val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 4) //Unit: G(单位为G)
val LINKIS_FLINK_CLIENT_CORES = 1 //Fixed to 1(固定为1) CommonVars[Int]("wds.linkis.driver.cores", 1)

val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 2) //Unit: G(单位为G)
val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4) //Unit: G(单位为G)
val LINKIS_FLINK_TASK_SLOTS = CommonVars[Int]("flink.taskmanager.numberOfTaskSlots", 2)
val LINKIS_FLINK_TASK_MANAGER_CPU_CORES = CommonVars[Int]("flink.taskmanager.cpu.cores", 2)
val LINKIS_FLINK_CONTAINERS = CommonVars[Int]("flink.container.num", 2)
val LINKIS_QUEUE_NAME = CommonVars[String]("wds.linkis.rm.yarnqueue", "default")
val FLINK_APP_DEFAULT_PARALLELISM = CommonVars("wds.linkis.engineconn.flink.app.parallelism", 4)

接下来需要做的事是,把上述配置存放进数据库,具体做法可以参考 sql 文件 linkis_dml.sql 中对 spark 引擎资源配置数据的初始化。SQL 参考如下:

SET @FLINK_LABEL="flink-1.12.2";
SET @FLINK_ALL=CONCAT('*-*,',@FLINK_LABEL);
SET @FLINK_IDE=CONCAT('*-IDE,',@FLINK_LABEL);

insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@FLINK_ALL, 'OPTIONAL', 2, now(), now());
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@FLINK_IDE, 'OPTIONAL', 2, now(), now());

select @label_id := id from linkis_cg_manager_label where `label_value` = @FLINK_IDE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);

INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.client.memory', '取值范围:1-15,单位:G', 'flink客户端的内存大小', '1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '1', 'flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.jobmanager.memory', '取值范围:1-30,单位:G', 'flink JobManager的内存大小', '1g', 'Regex', '^([1-9]|[1-2][0-9]|3[0])(G|g)$', '0', '0', '1', 'flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.memory', '取值范围:1-30,单位:G', 'flink TaskManager的内存大小', '1g', 'Regex', '^([1-9]|[1-2][0-9]|3[0])(G|g)$', '0', '0', '1', 'flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.numberOfTaskSlots', '取值范围:1-40,单位:个', 'flink执行器实例最大并发数', '2', 'NumInterval', '[1,40]', '0', '0', '1', 'flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.taskmanager.cpu.cores', '取值范围:1-8,单位:个', 'flink执行器核心个数', '2', 'NumInterval', '[1,8]', '0', '0', '1','flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('flink.container.num', '取值范围:1-40,单位:个', 'flink container个数', '2', 'NumInterval', '[1,40]', '0', '0', '1', 'flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.yarnqueue', 'flink任务所用yarn队列名', 'yarn队列名', 'ide', 'None', NULL, '0', '0', '1', 'flink资源设置', 'flink');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.flink.app.parallelism', '取值范围:1-40,单位:个', 'flink app并行度', '1', 'NumInterval', '[1,40]', '0', '0', '1', 'flink资源设置', 'flink');

insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'flink' and label.label_value = @FLINK_ALL);


insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
(select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @FLINK_ALL);

上述 SQL 执行成功之后,再来看 Flink 引擎的资源参数配置界面:

flink-config

4. 测试运行 flink-sql

Flink 引擎的资源参数配置问题搞定之后,我们就可以根据集群的资源,按需来修改默认的资源参数。然后就可以在 Scripts 中提交 flink-sql 到 yarn 上执行。

测试 SQL:

use catalog myhive;
show databases;

4.1 参数数字转换异常

关键报错信息如下:

Failed  to async get EngineNode ErrorException: errCode: 0 ,desc: operation failed(操作失败)s!the reason(原因):NumberFormatException: For input string: "1g"

这里可能存在 BUG,参考 Spark 引擎实现中的 SparkSubmitProcessEngineConnLaunchBuilder ,资源参数中有带单位的,需要在代码中处理下单位,然后才能正常进行数字转换。

如果你不想扒源码,那就直接修改校验参数的正则表达式,使没有输入 G|g 也能保存资源参数。我这里暂时选择这样的解决方式。

4.2 ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache

关键报错信息如下:

ClassNotFoundException: org.apache.hadoop.mapreduce.security.TokenCache

缺少 mapreduce 安全相关的 jar,具体是少了哪个 jar 包,最快的知道方式是直接在 hadoop 源码中搜索。我这里 hadoop 的版本是 2.6.0-cdh5.13.1 ,缺少的 jar 名称为:hadoop-mapreduce-client-core-2.6.0-cdh5.13.1.jar

cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-core-2.6.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

结合 4.5 小节一起看

4.3 Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [myhive] does not exist.

在 flink 引擎 conf 目录下的 flink-sql-defaults.yaml 文件中增加名为 myhive 的 catalog 定义

/opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/conf
vim flink-sql-defaults.yaml

配置参考:

catalogs:
- name: myhive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.1.0
property-version: 1
default-database: test

图示:

hive-catalog

4.4 Throw error java.lang.NoClassDefFoundError: org/apache/hive/common/util/HiveVersionInfo

cp /opt/cloudera/parcels/CDH/jars/hive-exec-1.1.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

cp /opt/cloudera/parcels/CDH/jars/hive-metastore-1.1.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

4.5 Throw error java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/MRVersio

cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-common-2.6.0-cdh5.13.1.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

4.6 Throw error java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService$Iface

cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /opt/linkis/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib

4.7 Client cannot authenticate via:[TOKEN, KERBEROS]

具体报错信息如下:

[Linkis-Default-Scheduler-Thread-5] com.webank.wedatasphere.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor 57 error - execute code failed! java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "centos-bigdatacdh"; destination host is: "ip":8020;

flink 引擎 conf/linkis-engineconn.properties 配置文件中加入 Kerberos 相关的配置:

wds.linkis.keytab.enable=true
wds.linkis.keytab.file=/opt/user_keytab
wds.linkis.keytab.host.enabled=false
wds.linkis.keytab.host=127.0.0.1

同时,在 flink 的 conf/flink-conf.yaml 中也加入 kerberos 的配置

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /opt/user_keytab/hadoop.keytab
security.kerberos.login.principal: [email protected]
yarn.log-aggregation-enable: true

上述遇到的异常都解决完毕之后,测试 flink-sql 的脚本就可以成功执行啦,但这仅仅是跑通了一个 show databases; ,并不能说明 flink 引擎就完全可用啦,大家还不要高兴的太早,最好再拿一个实际的场景来对 flink 引擎进行更加深入的测试,

5. 模拟线上场景来测试 flink-sql

场景说明:消费 kafka 中的数据保存进 hive 里:

kafka 的测试 json 格式的数据:

{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:15:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:20:00"}
{"user_id":"a3112","order_amount":15.0,"log_ts":"2020-11-26 12:30:00"}
{"user_id":"a2112","order_amount":15.0,"log_ts":"2020-11-26 12:32:00"}

kafka 生产数据的示例命令:

kafka-console-producer --broker-list broker-ip:9092 --topic test_kafka_01

hive 表建表语句:

create table `test.test_target_tab` (
user_id string,
order_amount double,
day string,
hour string
);

测试 SQL:

DROP TABLE IF EXISTS myhive.test.test_source;
CREATE TABLE myhive.test.test_source (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka_01',
'properties.bootstrap.servers' = 'broker_ip:9092',
'properties.group.id' = 'group_test_01',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);


INSERT INTO myhive.test.test_target_tab SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM myhive.test.test_source;

5.1 Could not find any factory for identifier 'kafka' that implements

异常信息:Could not find any factory for identifier 'kafka' that implements

21304, Task is Failed,errorMsg: ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem

解决办法:

下载 flink-connector-kafka_2.11-1.12.2.jar 到引擎的 lib 目录下

5.2 Could not find any factory for identifier 'json' that implements

21304, Task is Failed,errorMsg: ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. Available factory identifiers are: raw

解决办法:

下载 flink-json-1.12.2.jar 到引擎的 lib 目录下

5.3 Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

2021-11-15 18:48:11.649 ERROR [Linkis-Default-Scheduler-Thread-1] com.webank.wedatasphere.linkis.common.utils.Utils$ 57 error - Throw error java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

替换 flink-connector-kafka_2.11-1.12.2.jar 为 flink-sql-connector-kafka_2.11-1.12.2.jar

5.4 Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log

2021-11-15 19:05:36.326 ERROR [Linkis-Default-Scheduler-Thread-3] com.webank.wedatasphere.linkis.common.utils.Utils$ 57 error - Throw error java.lang.NoClassDefFoundError: com/esotericsoftware/minlog/Log$Logger

flink 引擎目录的 lib 文件夹中放入: minlog-1.3.1.jar

5.5 ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Flink App UI 中任务关键的报错信息如下:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass

从这里就开始跑偏了,一直以为是缺少 Kafka 相关的 jar 包,于是就一段乱试。但是,无论在 flink 引擎的 lib 目录中,还是在 flink 客户端的 lib 目录中放入 kafka(connector)相关的 jar 包都无济于事。

从社区小伙伴那里得到了一个有效的解决方案是在 flink 引擎目录的 conf/linkis-engineconn.properties 配置文件中增加如下配置:

flink.lib.path=hdfs:///linkis/engine-lib/flink/lib

而 hdfs:///linkis/engine-lib/flink/lib 目录中存放的是 flink 引擎目录下,lib 中的 jar 包;同时还要包括 flink 客户端目录下,lib 中的 flink-dist_2.11-1.12.2.jar。否则你可能会遇到如下错误:

ClusterDeploymentException: The "yarn.provided.lib.dirs" has to also include the lib/, plugin/ and flink-dist jar. In other case, it cannot be used.

或者如果不确定的话,可以把 flink 客户端目录下,lib 目录中的 jar 都放进 hdfs:///linkis/engine-lib/flink2/lib 里。

5.6 消费数据不写 hive 表

flink 可以正常消费 Kafka 中的数据,flink 引擎日志以及 yarn UI 上的 application 中的日志无报错,但是过了默认的 checkpoint 的时间点之后,hive 中还是没有数据产生。

我的解决办法是,在 flink 的配置文件中增加 checkpoint 的配置,这个配置应该是全局的,针对所有的 job 有效,当然,应该也可以针对单个任务来对 checkpoint 进行配置。

配置文件中 checkpoint 的配置参考如下:

state.backend: filesystem
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs:///flink-savepoints
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE

上述的一切假如你都经历过,然后又看到了这里,那么你应该就初步跑通了 flink 引擎:

Flink checkpoint

flink-checkpoint

至此,hive 中的数据写入也正常啦,整个流程就跑通了,至于更复杂的场景,应该也不在话下啦!