一、消息发送
1.1 数据生产流程
数据生产流程图解:
- Producer创建时,会创建⼀个Sender线程并设置为守护线程
- ⽣产消息时,内部其实是异步流程;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
- 批次发送的条件为:缓冲区数据⼤⼩达到
batch.size
或者linger.ms
达到上限,哪个先达到就算哪个 - 批次发送后,发往指定分区,然后落盘到 broker;如果⽣产者配置了
retrires
参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试 - 落盘到broker成功,返回⽣产元数据给⽣产者
- 元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回
1.2 必要的参数配置
先来看看我们一般在程序中是怎么配置的:
最常用的配置项:
1.3 拦截器
1.3.1 拦截器介绍
Producer 的拦截器(Interceptor)和 Consumer 的 Interceptor 主要⽤于实现Client端的定制化控制逻辑。
对于Producer⽽⾔,Interceptor使得⽤户在消息发送前以及Producer回调逻辑前有机会对消息做⼀些定制化需求,⽐如修改消息等。同时,Producer允许⽤户指定多个Interceptor按序作⽤于同⼀条消息从⽽形成⼀个拦截链(Interceptor Chain)。Intercetpor 的实现接⼝是org.apache.kafka.clients.producer.ProducerInterceptor
,其定义的⽅法包括:
onSend(ProducerRecord)
:该⽅法封装进KafkaProducer.send
⽅法中,即运⾏在⽤户主线程中。Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。onAcknowledgement(RecordMetadata, Exception)
:该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement
运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。close
:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
如前所述,Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调⽤它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要特别留意。
1.3.2 自定义拦截器
自定义拦截器步骤:
- 实现ProducerInterceptor接⼝
- 在KafkaProducer的设置中设置⾃定义的拦截器
自定义拦截器 1:
public class InterceptorOne<Key, Value> implements ProducerInterceptor<Key, Value> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
@Override
public ProducerRecord<Key, Value> onSend(ProducerRecord<Key, Value> record) {
System.out.println("拦截器1---go");
// 此处根据业务需要对相关的数据作修改
String topic = record.topic();
Integer partition = record.partition();
Long timestamp = record.timestamp();
Key key = record.key();
Value value = record.value();
Headers headers = record.headers();
// 添加消息头
headers.add("interceptor", "interceptorOne".getBytes());
ProducerRecord<Key, Value> newRecord = new ProducerRecord<Key, Value>(topic,
partition, timestamp, key, value, headers);
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器1---back");
if (exception != null) {
// 如果发⽣异常,记录⽇志中
LOGGER.error(exception.getMessage());
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
照着 拦截器 1 再加两个拦截器。
生产者
public class MyProducer1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接⽤到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put("bootstrap.servers", "192.168.0.102:9092");
// 设置key的序列化器
configs.put("key.serializer", IntegerSerializer.class);
// 设置⾃定义的序列化类
configs.put("value.serializer", UserSerializer.class);
// 设置自定义分区器
configs.put("partitioner.class", "com.mfc.config.MyPartitioner");
// 设置拦截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.mfc.interceptor.InterceptorOne,"
+ "com.mfc.interceptor.InterceptorTwo,"
+ "com.mfc.interceptor.InterceptorThree");
KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs);
User user = new User();
user.setUserId(1001);
user.setUsername("阿彪");
// ⽤于封装Producer的消息
ProducerRecord<Integer, User> record = new ProducerRecord<>(
"topic_1", // 主题名称
0, // 分区编号
user.getUserId(), // 数字作为key
user // user 对象作为value
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
System.out.println("消息发送成功:" + metadata.topic() + "\t"
+ metadata.partition() + "\t"
+ metadata.offset());
} else {
System.out.println("消息发送异常");
}
}
});
// 关闭⽣产者
producer.close();
}
}
1.4 序列化器
1.4.1 Kafka 自带序列化器
Kafka使⽤org.apache.kafka.common.serialization.Serializer
接⼝⽤于定义序列化器,将泛型指定类型的数据转换为字节数组。
package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
/**
将对象转换为byte数组的接⼝
该接⼝的实现类需要提供⽆参构造器
@param <T> 从哪个类型转换
*/
public interface Serializer<T> extends Closeable {
/*
类的配置信息
@param configs key/value pairs
@param isKey key的序列化还是value的序列化
*/
void configure(Map<String, ?> var1, boolean var2);
/*
将对象转换为字节数组
@param topic 主题名称
@param data 需要转换的对象
@return 序列化的字节数组
*/
byte[] serialize(String var1, T var2);
/*
关闭序列化器
该⽅法需要提供幂等性,因为可能调⽤多次。
*/
void close();
}
系统提供了该接⼝的⼦接⼝以及实现类:
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.ShortSerializer
1.4.2 自定义序列化器
数据的序列化⼀般⽣产中使⽤ avro
。
⾃定义序列化器需要实现 org.apache.kafka.common.serialization.Serializer<T>
接⼝,并实现其中的serialize
⽅法。
实体类
public class User {
private Integer userId;
private String username;
// set、get方法省略
}
自定义序列化器
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
// do Nothing
}
@Override
public byte[] serialize(String topic, User user) {
try {
// 如果数据是null,则返回null
if (user == null) return null;
Integer userId = user.getUserId();
String username = user.getUsername();
int length = 0;
byte[] bytes = null;
if (null != username) {
bytes = username.getBytes("utf-8");
length = bytes.length;
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
buffer.putInt(userId);
buffer.putInt(length);
buffer.put(bytes);
return buffer.array();
} catch (UnsupportedEncodingException e) {
throw new SerializationException("序列化数据异常");
}
}
@Override
public void close() {
// do Nothing
}
}
生产者:
public class MyProducer1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接⽤到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put("bootstrap.servers", "192.168.0.102:9092");
// 设置key的序列化器
configs.put("key.serializer", IntegerSerializer.class);
// 设置⾃定义的序列化类
configs.put("value.serializer", UserSerializer.class);
KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs);
User user = new User();
user.setUserId(1001);
user.setUsername("阿彪");
// ⽤于封装Producer的消息
ProducerRecord<Integer, User> record = new ProducerRecord<>(
"topic_1", // 主题名称
0, // 分区编号
user.getUserId(), // 数字作为key
user // user 对象作为value
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
System.out.println("消息发送成功:" + metadata.topic() + "\t"
+ metadata.partition() + "\t"
+ metadata.offset());
} else {
System.out.println("消息发送异常");
}
}
});
// 关闭⽣产者
producer.close();
}
}
1.5 分区器
1.5.1 Kafka 自带分区器
默认(DefaultPartitioner
)分区计算:
- 如果record提供了分区号,则使⽤record提供的分区号
- 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
- 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。
- 会⾸先在可⽤的分区中分配分区号
- 如果没有可⽤的分区,则在该主题所有分区中分配分区号。
看一下kafka的生产者(KafkaProducer
)源码:
再看Kafka自带的默认分区器(DefaultPartitioner
):
默认的分区器实现了 Partitioner
接口,先看一下接口:
public interface Partitioner extends Configurable, Closeable {
/**
* 为指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进⾏分区计算,如果没有则为null
* @param keyBytes key的序列化字节数组,根据该数组进⾏分区计算。如果没有key,则为null
* @param value 根据value值进⾏分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进⾏分区计算。如果没有,则为null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* 关闭分区器的时候调⽤该⽅法
*/
public void close();
}
1.5.2 自定义分区器
如果要⾃定义分区器,则需要
- ⾸先开发Partitioner接⼝的实现类
- 在KafkaProducer中进⾏设置:
configs.put("partitioner.class", "xxx.xx.Xxx.class")
实现Partitioner接⼝⾃定义分区器:
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
然后在⽣产者中配置:
二、消息发送原理
原理图解:
由上图可以看出:KafkaProducer
有两个基本线程:
- 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器
RecoderAccumulator
中;- 消息收集器
RecoderAccumulator
为每个分区都维护了⼀个Deque<ProducerBatch>
类型的双端队列。 ProducerBatch
可以理解为是ProducerRecord
的集合,批量发送有利于提升吞吐量,降低⽹络影响;- 由于⽣产者客户端使⽤
java.io.ByteBuffer
在发送消息之前进⾏消息保存,并维护了⼀个BufferPool
实现ByteBuffer
的复⽤;该缓存池只针对特定⼤⼩(batch.size
指定)的ByteBuffer
进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。 - 每次追加⼀条
ProducerRecord
消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch
,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch
,判断该消息⼤⼩是否超过客户端参数配置batch.size
的值,不超过,则以batch.size
建⽴新的ProducerBatch
,这样⽅便进⾏缓存重复利⽤;若超过,则以计算的消息⼤⼩建⽴对应的ProducerBatch
,缺点就是该内存不能被复⽤了。
- 消息收集器
Sender
线程:- 该线程从消息收集器获取缓存的消息,将其处理为
<Node, List<ProducerBatch>
的形式, Node 表示集群的broker节点。 - 进⼀步将
<Node, List<ProducerBatch>
转化为<Node, Request>
形式,此时才可以向服务端发送数据。 - 在发送之前,
Sender
线程将消息以Map<NodeId, Deque<Request>>
的形式保存到InFlightRequests
中进⾏缓存,可以通过其获取leastLoadedNode
,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。
- 该线程从消息收集器获取缓存的消息,将其处理为