kafka生产者发送流程:

kafka是通过异步的方式进行的消息发送流程,为什么是异步的?

主线程->构建ProducerRecord对象,这个对象声明了主题Topic、分区Partition、键 Key以及 值 Value,主题和值是必须要声明的,分区和键可以不用指定。
主线程->调用send发送进行消息发送。(因为消息要在网络上传输,所以必须进行序列化)
主线程->序列化器:将key和value序列化成字节数组。
主线程->分区器,根据参数进行分区选择,参数里有分区则指定分区,无分区有key值则通过key计算出分区,既没有分区也没有key值,则通过轮询方式进行指定分区。
主线程->,将消息放入一个RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程 直接的缓冲区)中暂存.
异步线程->Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从 RecordAccumulator中取出消息并批量发送出去
Broker成功接收到消息,表示发送成功,返回消息的元数据(主题、分区、偏移量)。发送失败可以选择重试或跑出异常。

Kafka基础——生产者-LMLPHP

发送类型:

1、发送即忘记:通过Producer对象调用send方法,发送完成后对响应结果不做任何处理。

2、同步发送:通过Producer对象调用send方法返回一个Future对象,然后调用Future对象的get方法等待kafka的响应,如果kafka正常响应,返回一个RecordMetadata对象,该对象为消息的元数据对象(主题、分区、偏移量);如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理 (try/catch包围)。

3、异步发送:

 producer.send(record, new Callback() {
 public void  onCompletion(RecordMetadata metadata, Exception exception) {    
 	if (exception == null) {
	System.out.println(metadata.partition() + ":" + metadata.offset());    
	}  
} })
11-19 04:14