【雲原生】Flink on k8s 講解與實戰操作

語言: CN / TW / HK

一、概述

Flink核心是一個流式的資料流執行引擎,並且能夠基於同一個Flink執行時,提供支援流處理和批處理兩種型別應用。其針對資料流的分散式計算提供了資料分佈,資料通訊及容錯機制等功能。

Flink官網:https://flink.apache.org/
不同版本的文件:https://nightlies.apache.org/flink/
k8s on flink 官方文件:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/
也可以參考我之前的文章:大資料Hadoop之——實時計算流計算引擎Flink(Flink環境部署) GitHub地址:https://github.com/apache/flink/tree/release-1.14.6/

二、Flink 執行模式

官方文件:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/overview/

FLink on yarn 有三種執行模式:

  • yarn-session模式(Seesion Mode)
  • yarn-cluster模式(Per-Job Mode)
  • Application模式(Application Mode)

在這裡插入圖片描述

【溫馨提示】Per-Job 模式(已棄用),Per-job 模式僅由 YARN 支援,並已在 Flink 1.15 中棄用。它將被丟棄在FLINK-26000中。

三、Flink on k8s實戰操作

在這裡插入圖片描述

1)flink下載

下載地址:https://flink.apache.org/downloads.html

bash wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

2)構建基礎映象

bash docker pull apache/flink:1.14.6-scala_2.12 docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12 docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

3)session模式

Flink Session 叢集作為長時間執行的 Kubernetes Deployment 執行。你可以在一個Session 叢集上執行多個 Flink 作業。每個作業都需要在叢集部署完成後提交到叢集。 Kubernetes 中的Flink Session 叢集部署至少包含三個元件:

  • 執行JobManager的部署
  • TaskManagers池的部署
  • 暴露JobManager 的REST 和 UI 埠的服務

1、Native Kubernetes 模式

引數配置: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace

【1】構建映象Dockerfile

bash FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 開始構建映象

```bash docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache

上傳映象

docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 ```

【2】建立名稱空間和serviceaccount

```bash

建立namespace

kubectl create ns flink

建立serviceaccount

kubectl create serviceaccount flink-service-account -n flink

使用者授權

kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account ```

【3】建立flink叢集

```bash ./bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dkubernetes.rest-service.exposed.type=NodePort

``` 在這裡插入圖片描述 在這裡插入圖片描述

【4】提交任務

```bash ./bin/flink run \ --target kubernetes-session \ -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ ./examples/streaming/TopSpeedWindowing.jar

#   引數配置
./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \

```

【溫馨提示】注意jdk版本,目前jdk8是正常的。

在這裡插入圖片描述

【5】檢視

bash kubectl get pods -n flink kubectl logs -f my-first-flink-cluster-taskmanager-1-1 在這裡插入圖片描述 在這裡插入圖片描述

【6】刪除flink叢集

bash kubectl delete deployment/my-first-flink-cluster -n flink kubectl delete ns flink --force

2、Standalone模式

【1】構建映象

預設使用者是flink使用者,這裡我換成admin,根據企業需要更換使用者,指令碼可以通過上面執行的pod拿到。

啟動指令碼 docker-entrypoint.sh ```bash

!/usr/bin/env bash

Licensed to the Apache Software Foundation (ASF) under one

or more contributor license agreements. See the NOTICE file

distributed with this work for additional information

regarding copyright ownership. The ASF licenses this file

to you under the Apache License, Version 2.0 (the

"License"); you may not use this file except in compliance

with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"

If unspecified, the hostname of the container is taken as the JobManager address

JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don't need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec admin else # Others echo gosu admin fi }

copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi

echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar}

mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  echo "Plugin ${target_plugin} does not exist. Exiting."
  exit 1
else
  ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  echo "Successfully enabled ${target_plugin}"
fi

done }

set_config_option() { local option=$1 local value=$2

# escape periods for usage in regular expressions local escaped_option=$(echo ${option} | sed -e "s/./\./g")

# either override an existing entry, or append a new one if grep -E "^${escaped_option}:." "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi }

prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125

if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi

if [ -n "${FLINK_PROPERTIES}" ]; then
    echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

}

maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi }

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}")

echo "Starting History Server"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"

elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}")

echo "Starting Task Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"

fi

args=("${args[@]}")

Running command in pass-through mode

exec $(drop_privs_cmd) "${args[@]}"

``` 編排Dockerfile

```bash FROM myharbor.com/bigdata/centos:7.9.2009

USER root

安裝常用工具

RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

設定時區,預設是UTC時區

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN mkdir -p /opt/apache

ADD jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/

ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH

建立使用者應用jar目錄

RUN mkdir $FLINK_HOME/usrlib/

RUN mkdir home

COPY docker-entrypoint.sh /opt/apache/ RUN chmod +x /opt/apache/docker-entrypoint.sh

RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN chown -R admin:admin /opt/apache

設定的工作目錄

WORKDIR $FLINK_HOME

對外暴露埠

EXPOSE 6123 8081

執行指令碼,構建映象時不執行,執行例項才會執行

ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"] ``` 開始構建映象

```bash docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache

上傳映象

docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

刪除映象

docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 ```

【2】建立名稱空間和serviceaccount

```bash

建立namespace

kubectl create ns flink

建立serviceaccount

kubectl create serviceaccount flink-service-account -n flink

使用者授權

kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account ```

【3】編排yaml檔案
  • flink-configuration-configmap.yaml

```yaml apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2
log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

`` -jobmanager-service.yaml`可選服務,僅非 HA 模式需要。

yaml apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager - jobmanager-rest-service.yaml 可選服務,將 jobmanager rest埠公開為公共 Kubernetes 節點的埠。

yaml apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager - taskmanager-query-state-service.yaml 可選服務,公開 TaskManager 埠以訪問可查詢狀態作為公共 Kubernetes 節點的埠。

yaml apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager 以上幾個配置檔案是公共的

  • jobmanager-session-deployment-non-ha.yaml

yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - taskmanager-session-deployment.yaml

yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties

【4】建立flink叢集

```yaml kubectl create ns flink

Configuration and service definition

kubectl create -f flink-configuration-configmap.yaml -n flink

service

kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink

Create the deployments for the cluster

kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink ``` 映象逆向解析dockerfile

bash alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler" whaler flink:1.14.6-scala_2.12 檢視

bash kubectl get pods,svc -n flink -owide 在這裡插入圖片描述 web:http://192.168.182.110:30081/#/overview 在這裡插入圖片描述

【5】提交任務

bash ./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar 在這裡插入圖片描述

bash kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink

在這裡插入圖片描述 在這裡插入圖片描述

【6】刪除flink叢集

bash kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f taskmanager-session-deployment.yaml -n flink kubectl delete -f jobmanager-session-deployment.yaml -n flink kubectl delete ns flink --force

【7】訪問flink web

埠就是jobmanager-rest-service.yaml檔案中的NodePort

http://192.168.182.110:30081/#/overview 在這裡插入圖片描述

4)application模式(推薦)

Kubernetes 中一個基本的Flink Application 叢集部署包含三個元件:

  • 執行JobManager的應用程式
  • TaskManagers池的部署
  • 暴露JobManager 的REST 和 UI 埠的服務

1、Native Kubernetes 模式(常用)

【1】構建映象Dockerfile

bash FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 RUN mkdir -p $FLINK_HOME/usrlib COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/ 開始構建映象

```bash docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache

上傳映象

docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12

刪除映象

docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 ```

【2】建立名稱空間和serviceacount

```bash

建立namespace

kubectl create ns flink

建立serviceaccount

kubectl create serviceaccount flink-service-account -n flink

使用者授權

kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account ```

【3】建立flink叢集並提交任務

bash ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \ -Dkubernetes.jobmanager.replicas=1 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dexternal-resource.limits.kubernetes.cpu=2000m \ -Dexternal-resource.limits.kubernetes.memory=2Gi \ -Dexternal-resource.requests.kubernetes.cpu=1000m \ -Dexternal-resource.requests.kubernetes.memory=1Gi \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/TopSpeedWindowing.jar

【注意】 local是應用模式中唯一支援的方案。local代表本地環境,這裡即pod或者容器環境,並非宿主機。

檢視

bash kubectl get pods pods,svc -n flink 在這裡插入圖片描述

bash kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink 在這裡插入圖片描述 在這裡插入圖片描述

在這裡插入圖片描述

【4】刪除flink叢集

bash kubectl delete deployment/my-first-application-cluster -n flink kubectl delete ns flink --force

2、Standalone模式

【1】構建映象 Dockerfile

啟動指令碼 docker-entrypoint.sh

```bash

!/usr/bin/env bash

Licensed to the Apache Software Foundation (ASF) under one

or more contributor license agreements. See the NOTICE file

distributed with this work for additional information

regarding copyright ownership. The ASF licenses this file

to you under the Apache License, Version 2.0 (the

"License"); you may not use this file except in compliance

with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"

If unspecified, the hostname of the container is taken as the JobManager address

JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don't need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec admin else # Others echo gosu admin fi }

copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi

echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar}

mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  echo "Plugin ${target_plugin} does not exist. Exiting."
  exit 1
else
  ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  echo "Successfully enabled ${target_plugin}"
fi

done }

set_config_option() { local option=$1 local value=$2

# escape periods for usage in regular expressions local escaped_option=$(echo ${option} | sed -e "s/./\./g")

# either override an existing entry, or append a new one if grep -E "^${escaped_option}:." "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi }

prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125

if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi

if [ -n "${FLINK_PROPERTIES}" ]; then
    echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

}

maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi }

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}")

echo "Starting History Server"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"

elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}")

echo "Starting Task Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"

fi

args=("${args[@]}")

Running command in pass-through mode

exec $(drop_privs_cmd) "${args[@]}" `` 編排Dockerfile`

```bash FROM myharbor.com/bigdata/centos:7.9.2009

USER root

安裝常用工具

RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

設定時區,預設是UTC時區

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN mkdir -p /opt/apache

ADD jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/

ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH

建立使用者應用jar目錄

RUN mkdir $FLINK_HOME/usrlib/

RUN mkdir home

COPY docker-entrypoint.sh /opt/apache/

RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN chown -R admin:admin /opt/apache RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh

設定的工作目錄

WORKDIR $FLINK_HOME

對外暴露埠

EXPOSE 6123 8081

執行指令碼,構建映象時不執行,執行例項才會執行

ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"] ```

```bash docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache

上傳映象

docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

刪除映象

docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 ```

【2】建立名稱空間和 serviceacount

```bash

建立namespace

kubectl create ns flink

建立serviceaccount

kubectl create serviceaccount flink-service-account -n flink

使用者授權

kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account ```

【3】編排yaml檔案

flink-configuration-configmap.yaml

```yaml apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2
log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

``jobmanager-service.yaml`可選服務,僅非 HA 模式需要。

yaml apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager jobmanager-rest-service.yaml 可選服務,將 jobmanager rest埠公開為公共 Kubernetes 節點的埠。

yaml apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager taskmanager-query-state-service.yaml 可選服務,公開 TaskManager 埠以訪問可查詢狀態作為公共 Kubernetes 節點的埠。

yaml apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager jobmanager-application-non-ha.yaml ,非高可用

yaml apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts

【溫馨提示】注意這裡的掛載/mnt/bigdata/flink/usrlib,最好這裡使用共享目錄。

taskmanager-job-deployment.yaml

yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts

【4】建立flink叢集並提交任務

```bash kubectl create ns flink

Configuration and service definition

kubectl create -f flink-configuration-configmap.yaml -n flink

service

kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink

Create the deployments for the cluster

kubectl create -f jobmanager-application-non-ha.yaml -n flink kubectl create -f taskmanager-job-deployment.yaml -n flink ``` 檢視

bash kubectl get pods,svc -n flink 在這裡插入圖片描述

【5】刪除flink叢集

```bash kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f jobmanager-rest-service.yaml -n flink kubectl delete -f taskmanager-query-state-service.yaml -n flink kubectl delete -f jobmanager-application-non-ha.yaml -n flink kubectl delete -f taskmanager-job-deployment.yaml -n flink

kubectl delete ns flink --force ```

【6】檢視

bash kubectl get pods,svc -n flink kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash 在這裡插入圖片描述 Flink on k8s 講解與實戰操作,這裡只是拿官方示例進行演示,後續也有相關企業案例,有任何疑問的小夥伴歡迎給我留言,後續會持續分享【雲原生+大資料】相關的教程,請小夥伴耐心等待~