kafka生产者发送流程:
kafka是通过异步的方式进行的消息发送流程,为什么是异步的?
主线程->构建ProducerRecord对象,这个对象声明了主题Topic、分区Partition、键 Key以及 值 Value,主题和值是必须要声明的,分区和键可以不用指定。
主线程->调用send发送进行消息发送。(因为消息要在网络上传输,所以必须进行序列化)
主线程->序列化器:将key和value序列化成字节数组。
主线程->分区器,根据参数进行分区选择,参数里有分区则指定分区,无分区有key值则通过key计算出分区,既没有分区也没有key值,则通过轮询方式进行指定分区。
主线程->,将消息放入一个RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程 直接的缓冲区)中暂存.
异步线程->Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从 RecordAccumulator中取出消息并批量发送出去
Broker成功接收到消息,表示发送成功,返回消息的元数据(主题、分区、偏移量)。发送失败可以选择重试或跑出异常。
发送类型:
1、发送即忘记:通过Producer对象调用send方法,发送完成后对响应结果不做任何处理。
2、同步发送:通过Producer对象调用send方法返回一个Future对象,然后调用Future对象的get方法等待kafka的响应,如果kafka正常响应,返回一个RecordMetadata对象,该对象为消息的元数据对象(主题、分区、偏移量);如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理 (try/catch包围)。
3、异步发送:
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + ":" + metadata.offset());
}
} })