一、手动下载安装包

wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz

二、解压

tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz 

节点名称  master worker zookeeper
cent-1  master   zookeeper
cent-2  master  worker zookeeper
cent-3    worker zookeeper

三、修改flink/conf/masters,slaves,flink-conf.yaml

vi masters
cent-1:8081
vi slaves
cent-2
cent-3
vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2
jobmanager.rpc.address: cent-1

可选配置:

  • 每个JobManager(jobmanager.heap.mb的可用内存量
  • 每个TaskManager(taskmanager.heap.mb的可用内存量
  • 每台机器的可用CPU数量(taskmanager.numberOfTaskSlots),
  • 集群中的CPU总数(parallelism.default)和
  • 临时目录(taskmanager.tmp.dirs

四、拷贝到其他节点

scp -r flink-1.6.1/ admin@cent-2:`pwd`
scp -r flink-1.6.1/ admin@cent-3:`pwd`

五、配置环境变量,每个节点都要配置

vi /etc/profile
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
六、启动flink
./bin/start-cluster.sh 

登录web查看状态

http://cent-1:8081

七、修改配置文件

修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。

#jobmanager.rpc.address: cent-1
high-availability:zookeeper                             #指定高可用模式(必须)
high-availability.zookeeper.quorum:cent-1:2181,cent-2:2181,cent-3:2181  #ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.storageDir:hdfs:///flink/ha/       #JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.zookeeper.path.root:/flink         #根ZooKeeper节点,在该节点下放置所有集群节点(推荐)
high-availability.cluster-id:/flinkCluster           #自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

修改conf/zoo.cfg

server.1=cent-1:2888:3888
server.2=cent-2:2888:3888
server.3=cent-3:2888:3888

修改conf/masters

cent-1:8081
cent-2:8081

配置信息要同步到各个节点

八、先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink

start-cluster.sh

手动将JobManager / TaskManager实例添加到群集

使用bin/jobmanager.shbin/taskmanager.sh脚本将JobManager和TaskManager实例添加到正在运行的集群中

添加JobManager

bin/jobmanager.sh ((start|start-foreground[host] [webui-port])|stop|stop-all

添加TaskManager

bin/taskmanager.sh start|start-foreground|stop|stop-all

jobmanager.sh start cent-2

九、Yarn Cluster模式

配置环境变量

export  HADOOP_CONF_DIR= /opt/module/hadoop-3.2.0/etc/hadoop

启动

yarn-session.sh -d -s 2 -tm 800 -n 2
-n : TaskManager的数量,相当于executor的数量
-s : 每个JobManager的core的数量,executor-cores。建议将slot的数量设置每台机器的处理器数量
-tm : 每个TaskManager的内存大小,executor-memory
-jm : JobManager的内存大小,driver-memory
1.提交任务
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/

以上命令在参数前加上y前缀,-yn表示TaskManager个数。

在这个模式下,同样可以使用-m yarn-cluster提交一个"运行后即焚"的detached yarn(-yd)作业到yarn cluster

2.停止yarn cluster
yarn application -kill application_1539058959130_0001

3.Yarn模式HA

应用最大尝试次数(yarn-site.xml),您必须配置为尝试应用的最大数量的设置yarn-site.xml,当前YARN版本的默认值为2(表示允许单个JobManager失败)。

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>The maximum number of application master execution attempts</description>
</property>

高可用的Yarn会话

  1. 配置HA模式和zookeeper法定人数conf/flink-conf.yaml

    high-availability: zookeeper
    high-availability.zookeeper.quorum: node21:2181,node22:2181,node23:2181
    high-availability.storageDir: hdfs:///flink/recovery
    high-availability.zookeeper.path.root: /flink
    yarn.application-attempts: 10
  2. 配置ZooKeeper的服务器conf/zoo.cfg(目前它只是可以运行每台机器的单一的ZooKeeper服务器):

    server.1=cent-1:2888:3888
    server.2=cent-2:2888:3888
    server.3=cent-3:2888:3888
  3. 启动ZooKeeper仲裁

    ./ start-zookeeper-quorum.sh
  4. 启动HA群集

    ./ yarn-session.sh -n 2

可选配置:

  • 每个JobManager(jobmanager.heap.mb的可用内存量
  • 每个TaskManager(taskmanager.heap.mb的可用内存量
  • 每台机器的可用CPU数量(taskmanager.numberOfTaskSlots),
  • 集群中的CPU总数(parallelism.default)和
  • 临时目录(taskmanager.tmp.dirs
03-24 21:01