2.1 下载kafka并安装
kafka_2.12-2.3.0.tgz
tar -zxvf kafka_2.12-2.3.0.tgz
2.2 配置kafka集群
在config/server.properties中修改参数:
[hadoop@hadoop01 kafka_2.12-2.3.0]$ cd config
[hadoop@hadoop01 config]$ gedit server.properties
参数1:添加host.name=hadoop01
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
host.name=hadoop01
参数2:修改log.dirs
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/hadoop/kafka_2.12-2.3.0/logs
参数3:添加zookeeper节点地址和端口
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
2.3 拷贝至各节点
scp -r ~/kafka_2.12-2.3.0 hadoop02:~/
scp -r ~/kafka_2.12-2.3.0 hadoop03:~/
2.4 到各节点修改server.properties里面的broker.id参数,其余参数不变
hadoop02节点:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
host.name=hadoop02
hadoop03节点:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
host.name=hadoop03
2.5启动kafka(需要先启动zookeeper)
2.5.1 启动kafka之前必须先启动zookeeper
[hadoop@hadoop01 ~]$ cd bin
[hadoop@hadoop01 bin]$ sh zookeeperstart.sh --这里是一键启动zookeeper脚本 ,具体见https://www.cnblogs.com/CQ-LQJ/p/11605603.html
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
2.5.2 zookeeper启动后执行启动kafka命令
[hadoop@hadoop01 bin]$ cd ~/kafka_2.12-2.3.0
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-server-start.sh config/server.properties --此命令正确,但不能关闭窗口,卡在进程界面
2.5.2.1.1 第一次启动报错(解决办法:升级到jdk8 131或者151均可):
[2019-09-30 00:28:56,482] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-30 00:28:56,875] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.VerifyError: Uninitialized object exists on backward branch 209
Exception Details:
Location:
scala/collection/immutable/HashMap$HashTrieMap.split()Lscala/collection/immutable/Seq; @249: goto
Reason:
Error exists in the bytecode
Bytecode:
0000000: 2ab6 0064 04a0 001e b200 c1b2 00c6 04bd
0000010: 0002 5903 2a53 c000 c8b6 00cc b600 d0c0
0000020: 00d2 b02a b600 38b8 0042 3c1b 04a4 0156
0000030: 1b05 6c3d 2a1b 056c 2ab6 0038 b700 d43e
0000040: 2ab6 0038 021d 787e 3604 2ab6 0038 0210
0000050: 201d 647c 7e36 05bb 0019 59b2 00c6 2ab6
0000060: 003a c000 c8b6 00d8 b700 db1c b600 df3a
0000070: 0619 06c6 001a 1906 b600 e3c0 008b 3a07
0000080: 1906 b600 e6c0 008b 3a08 a700 0dbb 00e8
0000090: 5919 06b7 00eb bf19 073a 0919 083a 0abb
00000a0: 0002 5915 0419 09bb 0019 59b2 00c6 1909
00000b0: c000 c8b6 00d8 b700 db03 b800 f13a 0e3a
00000c0: 0d03 190d b900 f501 0019 0e3a 1136 1036
00000d0: 0f15 0f15 109f 0027 150f 0460 1510 190d
00000e0: 150f b900 f802 00c0 0005 3a17 1911 1917
00000f0: b800 fc3a 1136 1036 0fa7 ffd8 1911 b801
0000100: 00b7 0069 3a0b bb00 0259 1505 190a bb00
0000110: 1959 b200 c619 0ac0 00c8 b600 d8b7 00db
0000120: 03b8 00f1 3a13 3a12 0319 12b9 00f5 0100
0000130: 1913 3a16 3615 3614 1514 1515 9f00 2715
0000140: 1404 6015 1519 1215 14b9 00f8 0200 c000
0000150: 053a 1819 1619 18b8 0103 3a16 3615 3614
0000160: a7ff d819 16b8 0100 b700 693a 0cbb 0105
0000170: 5919 0bbb 0105 5919 0cb2 010a b701 0db7
0000180: 010d b02a b600 3a03 32b6 010f b0
Stackmap Table:
same_frame(@35)
full_frame(@141,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118]},{})
append_frame(@151,Object[#139],Object[#139])
full_frame(@209,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
full_frame(@252,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
full_frame(@312,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
full_frame(@355,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
full_frame(@387,{Object[#2],Integer},{})
at scala.collection.immutable.HashMap$.scala$collection$immutable$HashMap$$makeHashTrieMap(HashMap.scala:185)
at scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:220)
at scala.collection.immutable.HashMap.updated(HashMap.scala:62)
at scala.collection.immutable.Map$Map4.updated(Map.scala:227)
at scala.collection.immutable.Map$Map4.$plus(Map.scala:228)
at scala.collection.immutable.Map$Map4.$plus(Map.scala:200)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
at scala.collection.TraversableOnce.$anonfun$toMap$1(TraversableOnce.scala:320)
at scala.collection.TraversableOnce$$Lambda$10/838411509.apply(Unknown Source)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableOnce.toMap(TraversableOnce.scala:319)
at scala.collection.TraversableOnce.toMap$(TraversableOnce.scala:317)
at scala.collection.AbstractTraversable.toMap(Traversable.scala:108)
at kafka.api.ApiVersion$.<init>(ApiVersion.scala:98)
at kafka.api.ApiVersion$.<clinit>(ApiVersion.scala)
at kafka.server.Defaults$.<init>(KafkaConfig.scala:146)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:854)
at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
at kafka.metrics.KafkaMetricsConfig.<init>(KafkaMetricsConfig.scala:32)
at kafka.metrics.KafkaMetricsReporter$.startReporters(KafkaMetricsReporter.scala:62)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:27)
at kafka.Kafka$.main(Kafka.scala:68)
at kafka.Kafka.main(Kafka.scala)
现在jdk版本:
[hadoop@hadoop01 kafka_2.12-2.3.0]$ java -version
java version "1.8.0_11"
Java(TM) SE Runtime Environment (build 1.8.0_11-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.11-b03, mixed mode)
2.5.2.1.2 kafka无法启动或卡死
<可能原因:虚拟机内存不足
解决方法:修改启动脚本的初始内存1G -> 200m
1、打开脚本 vim bin/kafka-server-start.sh
2、找到:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
3、修改为:export KAFKA_HEAP_OPTS="-Xmx200m -Xms200m"
4、重启kafka:bin/kafka-server-start.sh config/server.properties>
2.5.2.2 kafka启动方式:
加daemon和&,用守护进程的启动方式。(直接在控制台打印输出,不能关闭窗口 命令:bin/kafka-server-start.sh config/server.properties)
[hadoop@hadoop01 bin]$ cd ~/kafka_2.12-2.3.0
[hadoop@hadoop01 kafka_2.12-2.3.0]# bin/kafka-server-start.sh -daemon config/server.properties &
2.5.2.3 用jps命令查看kafka是否启动成功
[hadoop@hadoop01 kafka_2.12-2.3.0]# jps
8736 NodeManager
8593 ResourceManager
8083 DataNode
7942 NameNode
8330 SecondaryNameNode
12700 Jps
11981 Kafka
注:启动关闭kafka和zookeeper的顺序,先启动zookeeper再启动kafka,先停止kafka再停止zookeeper
2.5.2.4 在其他节点利用同样方式启动kafka
说明事项:在hadoop集群应用中,只要启动任何一个组件自带的zookeeper,或者是独立的zookeeper,就可以为其他任何需要zookeeper服务的组件提供支持,并不需要单独启动自带的zookeeper
2.6 测试kafka
2.6.1 创建topics(消息主题:相当于文件系统目录,用于保存消息内容)
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --topic testr --replication-factor 3 --partitions 3
Created topic testr.
2.6.2 查看topic 和 topic详情
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --list
testr
2.6.3 开启Kafka producer生产者(在hadoop01和hadoop02和hadoop03都可以),模拟producer发送消息,用命令行的方式手动的往kafka的topic里面发送消息:
[hadoop@hadoop02 kafka_2.12-2.3.0]$ bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr
>THI^H^H\^H^H^H this is message from terminal
>hello
>my^H^H hahha
>
>
>11111
>
>11111111111111111111
>
注:作为producer,上面启动的终端一直处于发送消息的状态,就是等待用户输入命令,并保存到topic。这个时候,我们需要在另开一个终端创建consumer消费者,才能接受这些消息
2.6.4 开启Kafka consumer消费者(在hadoop1和hadoop2和hadoop3都可以)
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic testr --from-beginning (kafka版本0.9以前用此命令)
[hadoop@hadoop01 kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr --from-beginning (kafka版本0.9以后用此命令)
hello
hahha
11111
this is message from terminal
11111111111111111111
说明事项: producer 和 consumer 通过kafka中间件联系起来,这种应用模式在很多数据处理系统中都可以发挥积极的作用。例如:在一些实时的大数据应用中,kafka可以保存从数据源产生的数据,consumer可以采用自己的数据率保存数据,因此kafka起到一个缓冲的作用