环境

操作系统:centos72
软件版本:flink-1.11.1
JDK版本:1.8
 

步骤

官网
https://flink.apache.org/
下载安装
curl -O https://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
或
curl -O https://archive.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz

# 解压
tar -zxf  flink-1.11.1-bin-scala_2.11.tgz

# 移动文件
mv flink-1.11.1 /opt/
更改配置文件
# 进入目录
cd /opt/flink-1.11.1/conf/

# 打开文件
vim flink-conf.yaml

修改配置项

# 开放以下配置
rest.port: 8081
rest.address: 0.0.0.0
# 使用默认jdk1.8,也可以用以下方式指定jdk1.8目录
env.java.home: /opt/jdk1.8.0_51
# 由于用到ssh免密访问,可指定ssh端口(默认22)
env.ssh.opts: -p 9322

## 其它核心参数配置(可默认不变):
## jobmanager.heap.mb:Jobmanager可用内存
## taskmanager.heap.mb:taskmanager可用内存
## taskmanager.numberOfTaskSlots:每个机器可用cpu核心数量(如果设置的并行度大于TaskSlots,有可能会无法分配有可用cpu资源,则需要更改此参数)
## parallelism.default:集群中的默认并行度(如果执行job没有指定并行度,则用默认值)
## taskmanager.tmp.dirs:节点临时目录(默认在/tmp目录)

启动|停止

# 启动
bin/start-cluster.sh
# 停止
bin/stop-cluster.sh
访问
http://localhost:8081

运行测试示例

# 统计关键词个数
./bin/flink run examples/streaming/WordCount.jar
# --input支持输入文件的内空
./bin/flink run examples/streaming/WordCount.jar --output ${your_source_file} --input ${your_source_file}

# 查询所有运行的job清单
./bin/flink list
# 停止指定ID对应运行的JOB
./bin/flink cancel xxxx269

Flink集群部署

Flink集群常用以下方式部署

Standalone模式:

Flink On Yarn 模式(YARN集群又分两种模式):

注意

  • YARN模式要求Flink需要用于Hadoop,并且有版本支持要求,Hadoop环境需要保证版本在 2.2 以上,并且集群中安装有HDFS服务。
  • Standalone模式缺少系统层面的管理、缺少资源隔离、缺少进程级异常恢复。
  • 在实际生产环境中,使用Flink on Yarn模式者居多。

考虑学习与演示环境的规模、设备要求,本集群安装示例以Standalone模式为主。

Standalone模式集群

  • 主机
192.168.110.35
192.168.110.36
  • 互相免密登录

1.生成秘钥

ssh-keygen -t rsa
2.查看秘钥目录
cd ~/.ssh 或 cd /root/.ssh
authorized_keys:存放远程免密登录的公钥,主要通过这个文件记录多台机器的公钥
id_rsa : 生成的私钥文件
id_rsa.pub : 生成的公钥文件
know_hosts : 已知的主机公钥清单
如果希望ssh公钥生效需满足至少下面两个条件:
    1) .ssh目录的权限必须是700
    2) .ssh/authorized_keys文件权限必须是600
3.通过ssh-copy-id的方式
ssh-copy-id -i root/.ssh/id_rsa.pub [email protected]
4.免密登录
# -p是指定端口,不指定默认22
ssh -p '9322' '[email protected]'
# 本机加ssh免密登录
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
ssh -p '9322' '[email protected]'
5.退出免密
exit
  • 配置flink集群(standalone模式)
1.编辑192.168.110.35、192.168.110.36主机下的flink配置文件
vim flink-conf.yaml
# 指定master主机IP或hostName,注意A、B主机均填写相同值
jobmanager.rpc.address: 192.168.110.35
2.以192.168.110.35主机做为master
2.1.修改conf下的master文件,加入如下:
vim master
# 指定192.168.110.35为master
192.168.110.35
2.2.修改conf下的workers文件
vim workers
# 指定192.168.110.36为slaves,也可以同时指定192.168.110.35即为master又为slaves
192.168.110.36
3.以192.168.110.36主机做为slaves
3.1.修改conf下的master文件
vim master
# 指定192.168.110.35为master
192.168.110.35
3.2.修改conf下的workers文件
vim workers
# 指定192.168.110.36为slaves
192.168.110.36
4.启停集群
# 只需启动master主机即可,会自动发送指令启动slaves
./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host centos72.
Starting taskexecutor daemon on host centos72.
Starting taskexecutor daemon on host centos72.36.
# 停止集群
./bin/stop-cluster.sh 
5.扩展实例到集群节点
# 添加或停止 JobManager
./bin/jobmanager.sh (start cluster)|stop|stop-all
# 添加或停止 TaskManager
./bin/taskmanager.sh start|stop|stop-all
6.访问
http://localhost:8081

7.其它

standalone集群job容错说明:
1.jobmanager挂掉的话,正在执行的任务会失败,所以jobmanager应该做HA。
2.taskmanager挂掉的话,如果有多余的taskmanager节点,flink会自动把任务调度到其他节点上执行。
3.standalone模式可能会分配TaskSlots不均衡。

8.学习资源

https://www.cnblogs.com/zz-ksw/p/11988132.html
https://blog.csdn.net/xu470438000/article/details/79533350
05-05 11:14