一、简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息.Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。

二、系统环境

1、操作系统:64位 CentOS 7

2、jdk版本:1.8.0

3、zookeeper版本:zookeeper-3.4.10.tar.gz

4、三台服务器:192.168.1.91; 192.168.1.92; 192.168.1.93;

三、下载安装Kafka

进入kafka官方网站下载kafka 地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka-1.1.0-src.tgz

下载完kafka上传到CentOS服务上进行解压

tar zxvf kafka_2.-0.8.2.2.tgz
mv kafka_2.-0.8.2.2 kafka
cd kafka

四、 配置Kafka

4.1 进入Kafka配置目录配置

cd /usr/local/kafka/config/
vi server.properties

4.2 修改配置

Kafka的配置信息就是在server.properties里面配置的

找到下面两行代码并进行修改

broker.id=1

zookeeper.connect=192.168.1.91:2181,192.168.1.92:2181,192.168.1.93:2181

listeners = PLAINTEXT://192.168.1.92:9091

4.3 Kafka集群配置

拷贝Kafka到另外两台服务器然后修改配置

IP:192.168.1.92
broker.id=
zookeeper.connect=192.168.1.91:,192.168.1.92:,192.168.1.93:
listeners = PLAINTEXT://192.168.1.92:9092
IP:192.168.1.93
broker.id=
zookeeper.connect=192.168.1.91:,192.168.1.92:,192.168.1.93:
listeners = PLAINTEXT://192.168.1.92:9093

五、功能验证

5.1 开启三台服务的zookeeper

bin/zookeeper-server-start.sh -daemon config/zookeeper.propertie

5.2 开启三台服务器的Kafka服务

bin/kafka-server-start.sh config/server.properties

5.3 创建topic

使用kafka-topics.sh 创建单分区单副本的topic test:

bin/kafka-topics.sh --create --zookeeper 192.168.1.92: --replication-factor  --partitions  --topic test

查看创建的topic test:

bin/kafka-topics.sh --list --zookeeper 192.168.1.92:

5.4 产生消息命令

使用kafka-console-producer.sh  命令向topic test 发送消息

bin/kafka-console-producer.sh --broker-list 192.168.1.92: --topic test

5.5 消费消息

使用kafka-console-consumer.s 命令接受来自topic test的消息
bin/kafka-console-consumer.sh --zookeeper 192.168.1.92: --topic test --from-beginning
05-11 16:14