kafka简介
kafka (官网地址:http://kafka.apache.org)是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。
下载地址:http://kafka.apache.org/downloads
- 消息的发布(publish)称作producer,消息的订阅(subscribe)称作consumer,中间的存储阵列称作broker。
- 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。
- producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。
- broker端不维护数据的消费状态,提升了性能。
- 直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
- Kafka使用scala编写,可以运行在JVM上。
Kafka和其他主流分布式消息系统的对比
- 异步处理, 把非关键流程异步化,提高系统的响应时间和健壮性
- 应用解耦,通过消息队列
- 流量削峰
zookeeper简介
ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。由于ZooKeeper的开源特性,后来我们的开发者在分布式锁的基础上,摸索了出了其他的使用方法:配置维护、组服务、分布式消息队列、分布式通知/协调等。
官网:https://zookeeper.apache.org/
下载地址:http://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
zookeeper应用场景:
- 服务注册&服务发现
配置中心
- 分布式锁
Zookeeper是强一致的
多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功
其它博文:http://www.cnblogs.com/sunddenly/p/4033574.html
kafka和zookeeper的单机部署
1、配置环境变量
export ZOOKEEPER_HOME=/root/zookeeper-3.4.11
export JAVA_HOME=/usr/local/java/jre1.8.0_161
export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$ZOOKEEPER_HOME/lib:$CLASSPATH
2、解压zookeeper
tar -xvf zookeeper-3.4.11.tar.gz
3、配置zookeeper
cp /root/zookeeper-3.4.11/conf/zoo_sample.cfg /root/zookeeper-3.4.11/conf/zoo.cfg
vim zoo.cfg
dataDir=/root/zookeeper-3.4.11/zkdata
dataLogDir=/root/zookeeper-3.4.11/zkdatalog
4、启动zookeeper
sh /root/zookeeper-3.4.11/bin/zkServer.sh start
5、解压kafka
tar -xvf kafka_2.11-1.0.0.tgz
6、修改配置kafka
log.dirs=/root/kafka_2.11-1.0.0/logs
7、启动kafka
./kafka-server-start.sh ../config/server.properties &
Zookeeper集群搭建
1、部署环境
操作系统:centos6.8
3台服务器:10.10.83.162、10.10.83.163、10.10.83.229
zookeeper版本:3.4.9
部署要求:Zookeeper集群的工作是超过半数才能对外提供服务,Linux服务器一台、三台、五台、(2*n+1)
2、java安装
yum list java*
yum -y install java-1.8.0-openjdk*
3、Zookeeper下载解压
tar -zxvf zookeeper-3.4.9.tar.gz
4、修改配置文件
1)进入conf目录
cp zoo_sample.cfg zoo.cfg
配置内容
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/software/zookeeper-3.4.9/zkdata
dataLogDir=/home/software/zookeeper-3.4.9/zkdata
# the port at which the clients will connect
clientPort=12181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=10.10.83.162:12888:13888
server.2=10.10.83.163:12888:13888
server.3=10.10.83.229:12888:13888
配置项说明:
- tickTime:心跳时间,为了确保连接存在的,以毫秒为单位,最小超时时间为两个心跳时间
- initLimit:多少个心跳时间内,允许其他server连接并初始化数据,如果ZooKeeper管理的数据较大,则应相应增大这个值
- clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
- dataDir:内存数据库快照存放地址,如果没有指定事务日志存放地址(dataLogDir),默认也是存放在这个路径下,建议两个地址分开存放到不同的设备上
- dataLogDir:用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争
- syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
- 从3.4.0开始,zookeeper提供了自动清理snapshot和事务日志的功能,通过配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:
- autopurge.purgeInterval 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。
- autopurge.snapRetainCount 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个
- server.A=B:C:D(A是一个数字,表示这个是第几号服务器,B是这个服务器的ip地址,C第一个端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用)
- maxClientCnxns:一个客户端能够连接到同一个服务器上的最大连接数,根据IP来区分。如果设置为0,表示没有任何限制。设置该值一方面是为了防止DoS攻击
2)创建myid文件
在每台服务器创建自己的myid文件
# server1 10.10.83.162
echo "1" > /home/software/zookeeper-3.4.9/zkdata/myid
# server2 10.10.83.163
echo "2" > /home/software/zookeeper-3.4.9/zkdata/myid
# server3 10.10.83.229
echo "3" > /home/software/zookeeper-3.4.9/zkdata/myid
myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
3)日志输出配置
- 修改./bin/zkEnv.sh 文件
if [ "x${ZOO_LOG_DIR}" = "x" ]
then
ZOO_LOG_DIR="/home/software/zookeeper-3.4.9/logs"
fi if [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi
- 修改./conf/log4j.properties文件
zookeeper.root.logger=INFO, ROLLINGFILE log4j.appender.CONSOLE=org.apache.log4j.DailyRollingFileAppender log4j.appender.ROLLINGFILE.MaxFileSize=100MB
5、启动服务并查看
- 启动服务
#启动服务(3台都需要操作)
./bin/zkServer.sh start
- 检查服务状态
./bin/zkServer.sh status
zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。
Kafka集群搭建
http://mirror.bit.edu.cn/apache/kafka
1、部署环境
centos 6.8
3台服务器:10.10.83.162、10.10.83.163、10.10.83.229(linux一台或多台,大于等于2)
kafka版本:2.11-0.11.0.1
已经搭建好的zookeeper集群
2、安装
官方文档:http://kafka.apache.org/quickstart
tar -xzf kafka_2.11-0.11.0.1.tgz
cd kafka_2.11-0.11.0.1
3、配置
进入到config目录,修改server.properties文件
broker.id=1 # 集群中每台主机唯一标识
listeners=PLAINTEXT://10.10.83.162:9092
advertised.host.name=10.10.83.162
log.dirs=/home/software/kafka_2.11-0.11.0.1/kafkalogs
zookeeper.connect=10.10.83.162:12181,10.10.83.163:12181,10.10.83.229:12181
配置项说明:
- broker.id: 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
- listeners:绑定服务监听的地址和端口
- advertised.listeners:broker对producers和consumers服务的地址和端口,如果没有配置,使用listeners的配置
- num.network.threads:处理网络请求的线程数量,也就是接收消息的线程数,接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘
- num.io.threads:用来处理磁盘IO的线程数量,消息从内存中写入磁盘是时候使用的线程数量
- socket.send.buffer.bytes:发送套接字的缓冲区大小
- socket.receive.buffer.bytes:接受套接字的缓冲区大小
- socket.request.max.bytes:请求套接字的缓冲区大小
- log.dirs:存放日志和消息的目录,可以是用逗号分开的目录
- num.partitions:每个topic默认partitions的数量,数量较大表示消费者可以有更大的并行度
- num.recovery.threads.per.data.dir:用来设置恢复和清理data下数据的线程数量
- log.retention.hours:日志的过期时间,超过后被删除,单位小时
- log.segment.bytes:一个日志文件最大大小,超过会新建一个文件
- log.retention.check.interval.ms:根据过期策略检查过期文件的时间间隔,单位毫秒
- log.flush.interval.messages:每当producer写入消息的条数达到阈值,刷数据到磁盘
- log.flush.interval.ms:每间隔阈值时间,刷数据到磁盘,单位毫秒
- zookeeper.connect:broker需要使用zookeeper保存meta数据
- zookeeper.connection.timeout.ms:zookeeper链接超时时间
- delete.topic.enable:删除topic
- advertised.host.name:外网访问配置
4、启动Kafka集群
1)启动服务
cd ./bin
./kafka-server-start.sh -daemon ../config/server.properties
2)检查服务是否启动
jps -lm
5、测试
1)创建Topic,名称是nginx_log
./kafka-topics.sh --create --zookeeper 10.10.83.162:12181 --replication-factor 1 --partitions 1 --topic nginx_log
- --replication-factor 2 #复制两份
- --partitions 1 #创建1个分区
- --topic #主题为nginx_log
2)创建生产者
./kafka-console-producer.sh --broker-list 10.10.83.162:9092 --topic nginx_log
3)创建消费者
./kafka-console-consumer.sh --bootstrap-server 10.10.83.162:9092 --topic nginx_log --from-beginning
测试(在发布者那里发布消息看看订阅者那里是否能正常收到~)
4)查看topic
./kafka-topics.sh --list --zookeeper 10.10.83.162:12181
5)查看topic状态
./kafka-topics.sh --describe --zookeeper 10.10.83.162:12181 --topic nginx_log
6)删除topic
./kafka-topics.sh --zookeeper 10.10.83.162:12181 --delete --topic nginx_log
备注:设置server.properties文件中delete.topic.enable=true
6、日志说明
默认kafka的日志是保存在./logs目录下的
- server.log #kafka的运行日志
- state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
- controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller