1 kafka架构进阶
1.1 Kafka底层数据的同步机制(面试常问)
1、Kafka的Topic被分为多个分区,分区是是按照Segments(文件段)存储文件块。分区日志是存储在磁盘上的日志序列,Kafka可以保证分区里的事件是有序的。其中Leader负责对应分区的读写、Follower负责同步分区的数据,0.11 版本之前Kafka使用highwatermarker机制保证数据的同步,但是基于highwatermarker的同步数据可能会导致数据的不一致或者是乱序。在Kafka数据同步有以下概念。
2、LEO:log end offset 标识的是每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO.
3、HW: high watermarker称为高水位线,所有HW之前的的数据都理解是已经备份的,当所有节点都备 份成功,Leader会更新水位线。
4、ISR:In-sync-replicas,kafka的leader会维护一份处于同步的副本集,如果在replica.lag.time.max.ms
时间内系统没有发送fetch请求,或者已然在发送请求,但是在该限定时间内没有赶上Leader的数据就被剔除ISR列表。在Kafka-0.9.0版本剔除replica.lag.max.messages
消息个数限定,因为这个会导致其他的Broker节点频繁的加入和退出ISR。即如果某个从机总是赶不上leader的leo,该节点在达到限制时间后,会被剔除isr列表
1.1.1 高水位截断的同步方式可能带来数据丢失(Kafka 0.11版本前的问题)
1.1.2 解决高水位截断数据丢失和不一致问题(leaderEpoch)
可以看出0.11版本之前Kafka的副本备份机制的设计存在问题。依赖HW的概念实现数据同步,但是存在数据不一致问题和丢失数据问题,因此Kafka-0.11版本引入了 Leader Epoch解决这个问题,不在使用HW作为数据截断的依据。而是已引入了Leader epoch的概念,任意一个Leader持有一个LeaderEpoch。该LeaderEpoch这是一个由Controller管理的32位数字,存储在Zookeeper的分区状态信息中,并作为LeaderAndIsrRequest的一部分传递给每个新的Leader。Leader接受Producer请求数据上使用LeaderEpoch标记每个Message。然后,该LeaderEpoch编号将通过复制协议传播,并用于替换HW标记,作为消息截断的参考点。
改进消息格式,以便每个消息集都带有一个4字节的Leader Epoch号。在每个日志目录中,会创建一个新的Leader Epoch Sequence文件,在其中存储Leader Epoch的序列和在该Epoch中生成的消息的Start Offset。它也缓存在每个副本中,也缓存在内存中。
follower变成Leader
当Follower成为Leader时,它首先将新的Leader Epoch和副本的LEO添加到Leader Epoch Sequence序列文件的末尾并刷新数据。给Leader产生的每个新消息集都带有新的“Leader Epoch”标记。
Leader变成Follower
如果需要需要从本地的Leader Epoch Sequence加载数据,将数据存储在内存中,给相应的分区的Leader发送epoch 请求,该请求包含最新的EpochID,StartOffset信息.Leader接收到信息以后返回该EpochID所对应的LastOffset信息。该信息可能是最新EpochID的StartOffset或者是当前EpochID的Log End Offset信息.
- 情形1:Fllower的Offset比Leader的小
- 情形2:用户的Leader Epoch的信息startOffset信息比Leader返回的LastOffset要大,Follower回去重置自己的Leader Epoch文件,将Offset修改为Leader的LastOffset信息,并且截断自己的日志信息。
Follower在提取过程中,如果关注者看到的LeaderEpoch消息集大于其最新的LeaderEpoch,则会在其LeaderEpochSequence中添加新的LeaderEpoch和起始偏移量,并将Epoch数据文件刷新到磁盘。同时将Fetch的日志信息刷新到本地日志文件。
1.1.3 LeaderEpoch解决数据丢失
1.1.4 LeaderEpoch解决数据不一致
1.2 kafka监控之Kafka-Eagle
1.2.1 Kafka-Eagle安装
这是一个监视系统,监视您的kafka群集以及可视的使用者线程,偏移量,所有者等。当您安装Kafka Eagle时,用户可以看到当前的使用者组,对于每个组,他们正在消耗的Topic以及该组在每个主题中的偏移量,滞后,日志大小和位置。这对于了解用户从消息队列消耗的速度以及消息队列增加的速度很有用。
[root@CentOSB ~]# tar -zxf kafka-eagle-web-1.4.0-bin.tar.gz -C /usr/
[root@CentOSB ~]# mv /usr/kafka-eagle-web-1.4.0 /usr/kafka-eagle
[root@CentOSB ~]# vi .bashrc
KE_HOME=/usr/kafka-eagle
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export KE_HOME
[root@CentOSB ~]# source .bashrc
[root@CentOSB ~]# cd /usr/kafka-eagle/
[root@CentOSB kafka-eagle]# vi conf/system-config.properties
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181
cluster1.kafka.eagle.offset.storage=kafka
kafka.eagle.metrics.charts=true
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.52.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root
[root@CentOSB kafka-eagle]# chmod u+x bin/ke.sh
[root@CentOSB kafka-eagle]# ./bin/ke.sh start
如果需要检测Kafka性能指标需要修改Kafka启动文件
vi kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
1.3 Kafka-Flume集成
组件1:r1-采集组件,测试使用netcat
组件2:k1-输出组件
组件3:c1-缓存组件,测试使用内存做缓冲
- jdk8+环境,准备flume的安装包,安装flume
tar -zxf apache-flume-1.9.0-bin.tar.gz -C /usr/
cd /usr/apache-flume-1.9.0-bin
vi conf/kafka.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = CentOS
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic01
a1.sinks.k1.kafka.bootstrap.servers = CentOSA:9092,CentOSB:9092,CentOSC:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = -1
a1.sinks.k1.kafka.producer.linger.ms = 100
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = CentOS
a1.sources.r1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic01
a1.sinks.k1.kafka.bootstrap.servers = CentOS:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = -1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.4 Kafka-SpringBoot集成
- 依赖引入(pom.xml)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 配置(application.properties)
# 连接
spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092
# 重试次数
spring.kafka.producer.retries=5
# 开启应答
spring.kafka.producer.acks=all
# 缓冲区大小
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 事务控制,开启后KafkaTemplate发送消息,要处于事务控制中@Transation
spring.kafka.producer.transaction-id-prefix=transaction-id-
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启幂等
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.consumer.group-id=group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- 配置日志
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%p %d{yyyy-MM-dd HH:mm:ss} - %m%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 控制台输出日志级别 -->
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
<logger name="org.springframework.kafka" level="INFO" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<!--事务控制-->
<logger name="org.springframework.kafka.transaction" level="debug" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
</configuration>
- 集成后Spring后KafkaTemplate类专门用来发送数据
@Transactional
@Service
public class OrderService implements IOrderService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void saveOrder(String id,Object message) {
//发送消息给服务器
}
}
- 通过监听实现发送端到消息处理的转发
@KafkaListeners(value = {@KafkaListener(topics = {"topic04"})})
@SendTo(value = {"topic05"})
public String listenner(ConsumerRecord<?, ?> cr) {
return cr.value()+" mashibing edu";
}
- 开启事务后,KafkaTemplate发送消息的方式1
kafkaTemplate.executeInTransaction(new KafkaOperation.OperationCallback<String, String, Object>(){
@Override
public Object doInOperations(KafkaOperation<String,String> kafkaOperations) {
kafkaOperations.send(new ProducerRecord<String,String>("topic02", "002", "this is value"));
return null;
}
});
- 开启事务后,KafkaTemplate发送消息的方式2