1.kafka配置文件

解压kafka的安装包后,在conf目录下server.properties

#broker 的全局唯一编号,在kafka集群中不能重复,为整型数字
broker.id=0
#开启删除topic功能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka运行数据保存目录 kafka data是以.log后缀的
log.dirs=/data/kafka/logs
#topic 在当前 broker 上的分区数量,下面配置一个分区
num.partitions=1
#用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1
#segment log.dirs中文件保留的最长时间,超时将被删除,默认为7天也就是168小时
log.retention.hours=168
#kafka需要zookeeper进行管理,配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.152.163:2181,192.168.152.162:2181,192.168.152.161:2181

启动kafka

kafka-server-start.sh -daemon config/server.properties
//如果没有加-daemon的话,将会阻塞方式运行

关闭kafka

kafka-server-stop.sh stop

2.kafka工作流程

kafka是基于topic的可分区的分布式的消息中间件。消息生产者通过将消息发送至broker的某一个分区,消费者通过broker的分区获取消息进行消费,一个broker可有多个分区,一个topic可有多个分区,topic下的每一个分区存在副本机制,也就是存在一个leader和多个follower,生产者将消息写入leader分区,follower同步leader中新增的消息,每个分区都有index文件和log文件。消费者消费分区中的消息通过偏移值offset进行记录。

kafka存储消息到文件中采用分片及索引机制,每个topic在一个broker上的数据放在一个文件夹下,该文件夹名称为topic名称加分区号,如minerprofit-1,(minerprofit为topic名称,1为分区号),每一个分区上的数据又分为多个segment,每一个segment有index文件和log文件,segment中index和log文件的命名是在该segment第一个offset值。

0000000000000000000000.index
0000000000000000000000.log
0000000000000000017866.index
0000000000000000017866.log
0000000000000000029866.index
0000000000000000029866.log

log文件记录具体的消息数据,index记录的是消息的偏移值与消息数据的索引值,依据offset值找到具体的消息的过程是:例如offset为5,首先在index文件中,采用二分法查找offset为5对应的索引值,然后获取该索引值,在log文件中依据该索引值获取消息。

kafka零拷贝技术: linux有用户态和核心态,正常的访问文件linux需要核心态和用户态进行转换,kafka内部机制能够做到不经过用户态,具体机制可以了解了解。

kafka顺序写磁盘。

分区的优劣:

优势:

高并发: 采取分区机制,将不同数据放置在不同的分区,提高并发能力,类似ConcurrentHashMap的分段锁机制,当需要访问不同分区的数据,能够将锁粒度降低。

方便扩展: 当有多个broker时候,多个topic可方便扩展。

缺点:

分区的副本机制可能导致生产者发送的消息不能够正常同步到所有follower,当leader挂掉后,造成消息丢失。

3.生产者

生产者是通过封装一个ProductRecord对象,需要指定具体的topic,分区,key,value,headers。

  • topic:指定具体要发送消息到哪个topic
  • partition: 指定topic具体的分区
  • timestamp: 发送消息的时间如果为null则为系统当前时间
  • key: 消息的key值,在决定具体分区的时候有用
  • value: 消息体信息
  • headers: 消息头信息
/**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
     *                  the timestamp using System.currentTimeMillis().
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers the headers that will be included in the record
     */
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);

消息发送到具体分区的选择:

  • 如果partition指定了,就发送到具体分区
  • 如果partition没有指定,key值存在,就将key的hashcode与该topic的可用分区进行取余。
  • 如果partition和key都没有指定,则会生成一个随机数,将该随机数的hashcode值与可用分区数进行取余,下一次决定分区的时候将上述的随机数进行加1,重新取余。

生产者消息确认(ack)机制:

kafka分区存在副本机制,一个leader和多个follower,生产者将消息发送到leader,follower同leader同步数据,存在一种情况,当生产者将消息发送至leader后,follower还未同步到数据,leader就宕机了,然后follower就选择新的leader,新的leader进行数据同步后,就不清楚了生产者发送的消息时候成功了,所以在此过程中就涉及到消息确认问题。

副本同步数据有半数确认机制和全部确认机制,kafka使用的是全数确认机制。

kafka设计了三种acks消息确认机制:

  • acks=0,当leader一收到消息,还没有写入磁盘,就给生产者发送ack消息,此就可能导致发送了acks,还未写入磁盘,leader就挂了。
  • acks=1,当leader收到消息,并且将消息记录到磁盘中后,再发送ack确认,但该消息还未同步到所有follower,leader挂掉后,就会导致消息丢失
  • acks=all, 当leader收到消息后,并且写入到磁盘后,所有follower也同步消息后,才进行发送ack确认。此也有缺点,如果follower比较多的话就会导致发送消息慢,kafka提供了isr机制。该机制也可能导致数据重复问题,当leader挂掉后,follower部分同步了数据,然后follower之间选出leader,同步了还未给生产者发送ack确认信息的数据,然后生产者会重新发送该消息,所以就会重复,该重复可在业务中做判断,如消费者得到消息后首先在库中查找是否已经存在该消息,如果存在就抛出异常,即使不查库,也可以使用唯一索引。

kafka消息重复或者消息丢失都可能在此发生。

public static final String ACKS_CONFIG = "acks";
    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
                                           + " durability of records that are sent. The following settings are allowed: "
                                           + " <ul>"
                                           + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
                                           + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
                                           + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
                                           + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
                                           + " always be set to -1."
                                           + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
                                           + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
                                           + " acknowledging the record but before the followers have replicated it then the record will be lost."
                                           + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                           + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                           + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";

ISR机制:

当acks使用all的时候,所有follower同步数据后再发送acks就会很慢,kafka设计维护一个isr列表,isr是所有follower中的一部分,当isr内的follower同步完了数据,leader就会立刻发送ack确认,isr通过确定follower是否在一定时间内与其是否完成数据同步后而确认是否将该follower加入到isr列表中。该时间通过replica.lag.time.max.ms参数设定。

log文件中的HW,LEO

  • HW : high water,记录消费者消费offset的最大值,在leader和follower之间,消费者能够看到的最大offset就是HW。
  • LEO: log end offset,记录每一个log文件中最大的offset值。

4.消费者

发布订阅模式中消费者消费消息有两种方式,一种是消息中间推送消息到消费者,另一种方式是消费者通过消息中间件拉去消息。推的方式会导致消费者处理消息的速度赶不上推送的速度,拉的方式就会导致消费者在没有消息的时候会不断的轮询,导致空运行,kafka采用拉的方式,可以设置时间来间隔性的访问消息中间件。

消费者组:多个消费者可组成一个消费者组,一个消费者组订阅一个topic,消费者组中的一个消费者消费了消息就不会再有另外一个消费者消费消息,当有多个消费者后就涉及分区分消费者策略。

分区分消费者策略:

  • RoundRobin:RoundRobin方式是以消费者组为主,如果消费者组CG1有三个消费者C1,C2和C3,C1,C2和C3分别订阅了一个topic有三个分区p1,p2和p3,kafka会进行随机组合,组合成c1p1,c1p2,c1p3,c2p1,c2p2,c2p3,c3p1,c3p2和c3p3,然后依据hashcode值将消息分别发送到不同的消费者中进行消费。
  • Range:range方式是面向topic的,如果一个topic被一个消费者组订阅,该消费者组有3个消费者,然后会将topic三个分区的消息分别给三个消费者。

消费者offset值维护:

kafka老版本将offset维护放置zookeeper中,0.9版本后将维护放置在kafka __consumer_offset topic 中,

exclude.internal.topics=false #开启内部topic

5.zookeeper在kafka中的作用

  • broker中的controller的选举
  • broker的上下线
  • topic 分区副本分配及follower之间选举leader
03-05 22:29