一、手动下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz
二、解压
tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
节点名称 | master | worker | zookeeper |
cent-1 | master | zookeeper | |
cent-2 | master | worker | zookeeper |
cent-3 | worker | zookeeper |
三、修改flink/conf/masters,slaves,flink-conf.yaml
vi masters
cent-1:8081
vi slaves
cent-2
cent-3
vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2
jobmanager.rpc.address: cent-1
可选配置:
- 每个JobManager(
jobmanager.heap.mb
)的可用内存量, - 每个TaskManager(
taskmanager.heap.mb
)的可用内存量, - 每台机器的可用CPU数量(
taskmanager.numberOfTaskSlots
), - 集群中的CPU总数(
parallelism.default
)和 - 临时目录(
taskmanager.tmp.dirs
)
四、拷贝到其他节点
scp -r flink-1.6.1/ admin@cent-2:`pwd`
scp -r flink-1.6.1/ admin@cent-3:`pwd`
五、配置环境变量,每个节点都要配置
vi /etc/profile
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
六、启动flink
./bin/start-cluster.sh
登录web查看状态
http://cent-1:8081
七、修改配置文件
修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。
#jobmanager.rpc.address: cent-1
high-availability:zookeeper #指定高可用模式(必须)
high-availability.zookeeper.quorum:cent-1:2181,cent-2:2181,cent-3:2181 #ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.storageDir:hdfs:///flink/ha/ #JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.zookeeper.path.root:/flink #根ZooKeeper节点,在该节点下放置所有集群节点(推荐)
high-availability.cluster-id:/flinkCluster #自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
修改conf/zoo.cfg
server.1=cent-1:2888:3888
server.2=cent-2:2888:3888
server.3=cent-3:2888:3888
修改conf/masters
cent-1:8081
cent-2:8081
配置信息要同步到各个节点
八、先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink
start-cluster.sh
手动将JobManager / TaskManager实例添加到群集
使用bin/jobmanager.sh
和bin/taskmanager.sh
脚本将JobManager和TaskManager实例添加到正在运行的集群中。
添加JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
jobmanager.sh start cent-2
九、Yarn Cluster模式
配置环境变量
export HADOOP_CONF_DIR= /opt/module/hadoop-3.2.0/etc/hadoop
启动
yarn-session.sh -d -s 2 -tm 800 -n 2
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/
以上命令在参数前加上y前缀,-yn表示TaskManager个数。
在这个模式下,同样可以使用-m yarn-cluster提交一个"运行后即焚"的detached yarn(-yd)作业到yarn cluster
2.停止yarn cluster
yarn application -kill application_1539058959130_0001
3.Yarn模式HA
应用最大尝试次数(yarn-site.xml),您必须配置为尝试应用的最大数量的设置yarn-site.xml,当前YARN版本的默认值为2(表示允许单个JobManager失败)。
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>The maximum number of application master execution attempts</description>
</property>
高可用的Yarn会话
-
配置HA模式和zookeeper法定人数在
conf/flink-conf.yaml
:high-availability: zookeeper high-availability.zookeeper.quorum: node21:2181,node22:2181,node23:2181 high-availability.storageDir: hdfs:///flink/recovery high-availability.zookeeper.path.root: /flink yarn.application-attempts: 10
-
配置ZooKeeper的服务器中
conf/zoo.cfg
(目前它只是可以运行每台机器的单一的ZooKeeper服务器):server.1=cent-1:2888:3888 server.2=cent-2:2888:3888 server.3=cent-3:2888:3888
-
启动ZooKeeper仲裁:
./ start-zookeeper-quorum.sh
-
启动HA群集:
./ yarn-session.sh -n 2
可选配置:
- 每个JobManager(
jobmanager.heap.mb
)的可用内存量, - 每个TaskManager(
taskmanager.heap.mb
)的可用内存量, - 每台机器的可用CPU数量(
taskmanager.numberOfTaskSlots
), - 集群中的CPU总数(
parallelism.default
)和 - 临时目录(
taskmanager.tmp.dirs
)