环境依赖

  • JAVA_HOME = /usr/jdk64/jdk1.8.0_112

  • HADOOP_CONF_DIR = /etc/hadoop/conf # 包括 HDFS_CONF_DIR and YARN_CONF_DIR

注意: 安装官方文档安装的 JDP 3.2 集群,自带如上相关依赖,本演示是 Flink on Yarn 模式。

目前 JDP 仅支持 Redhat 系列 7.x 的操作系统,故我们使用 yum 安装 Flink。

1
yum install flink_3_2_0_0_108 -y
  1. 在 JDP 集群中安装 Flink on Yarn 集群,仅需配置 env.java.home
1
2
grep -rn "env.java.home" /usr/jdp/current/flink/conf/flink-conf.yaml
19:env.java.home: /usr/jdk64/jdk1.8.0_112

JDP 集群中安装 Flink on Yarn 可选配置,因为 JDP HADOOP_CONF_DIR 默认值和 Flink 配置默认值一致,故不需要配置。也可显示的在 config.sh 中 export HADOOP_CONF_DIR=xxx。

1
2
grep -rn "HADOOP_CONF_DIR" config.sh | head -1
19:export HADOOP_CONF_DIR=/etc/hadoop/conf
  1. 添加 Flink 用户,并授予 hdfs 组权限,方便 Flink 读写 HDFS。
1
useradd flink -g hdfs
  1. 创建 Flink 需要的log、run等目录。
1
2
mkdir /var/log/flink
mkdir /var/run/flink
  1. 为新创建的 log、run 目录授予 Flink 读写权限
1
2
chown flink:root -R /var/log/flink
chown flink:root -R /var/run/flink
  1. 初始化 flink 用户在 hdfs 的 home 目录
1
2
3
su - flink

hadoop fs -mkdir /user/flink
  1. 需要切换到 flink,执行启动一个 Flink on Yarn 集群

1.1 Start a long-running Flink cluster on YARN

1
2
3
4
5
6
7
8
su - flink

cd /usr/jdp/current/flink

./bin/yarn-session.sh -n 2 -tm 2024 -s 3 # 集群资源太小,配置不合理容易无法分配到资源,导致任务hang住。Specify the -s flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.

./bin/yarn-session.sh -n 2 -tm 2024 -s 3 -d # -d 表示后台运行。- The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1564483050501_0002

注意:启动时的提示Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.,如果未设置 HADOOP_CONF_DIR,默认值 /etc/hadoop/conf;而 JDP 集群的 HADOOP_CONF_DIR 刚好在此处。

1.2 Run a Flink job on YARN

1
./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
  1. 测试 flink on yarn 集群

3.1 Examples 经典案例:WordCount

1
2
3
./bin/flink run ./examples/batch/WordCount.jar

flink list

3.2 快速启动 flink sql clinet

进入 SQL Client

1
./bin/sql-client.sh embedded

Running SQL Queries:

1
2
3
Flink SQL> SELECT 'Hello World';

Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

退出 SQL Client

1
Flink SQL> quit;

提交 Flink WordCount 程序,因为 YARN 容器资源 limit 导致失败,需要调整启动集群的资源配比。

org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested resource type=[vcores] < 0 or greater than maximum allowed allocation. Requested resource=<memory:1024, vCores:4>, maximum allowed allocation=<memory:6656, vCores:3>, please note that maximum allowed allocation is calculated by scheduler based on maximum resource of registered NodeManagers, which might be less than configured maximum allocation=<memory:6656, vCores:3>

  1. 停止 Flink 集群,Flink 集群 -d 运行模式,需要通过 yarn 命令才能停止,具体如下:
1
2
3
yarn application --list

yarn application -kill application_1556800373836_0018

总结

简单演示如果在 JDP 平台快速部署 Flink 集群,JDP 发型包中默认已经自带 Flink rpm 包。yum 直接安装好之后配置一个env.java.home,就可以直接启动 flink on yarn 模式了,flink 本身模式比较多,Yarn Setup 支持 Start a long-running Flink cluster on YARNRun a Flink job on YARN。今天我们演示的主要是前者,已经在 YARN 启动好一个 long-running 的 Flink 集群,在提交 Flink job 到此集群。主要遇到的问题是资源分配问题,导致任务无法申请到资源或请求资源超限而失败。Flink on Yarn 的 Job Manager 和 Task Manager 均运行在 YARN Container 中,资源请求受限于 YARN 对单个 Contrainer 的限制。由于我是个人环境,资源有限,安装过程我曾故意调整 YARN Container 资源 Limit 上限。如果是生产环境一般不会遇到此类问题,除非乱配置 Flink 提交的资源参数。

接下来,我会简单介绍一下,Flink sql-training 项目,帮助快速入门 Flink 的具体使用。

参考: