我可以向社区询问听多个主题的最佳方法是什么,每个主题都包含不同类别的消息?
在过去的几天里,我一直在和Spring Kafka一起玩。到目前为止,我的思考过程是:
我在这里会走上正确的路吗?有一个更好的方法吗?
谢谢!
最佳答案
这是一个非常简单的例子。
// -----------------------------------------------
// Sender
// -----------------------------------------------
@Configuration
public class SenderConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Class1> producerFactory1() {
return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Class1> kafkaTemplate1() {
return new KafkaTemplate<>(producerFactory1());
}
@Bean
public Sender1 sender1() {
return new Sender1();
}
//-------- send the second class --------
@Bean
public ProducerFactory<String, Class2> producerFactory2() {
return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Class2> kafkaTemplate2() {
return new KafkaTemplate<>(producerFactory2());
}
@Bean
public Sender2 sender2() {
return new Sender2();
}
}
public class Sender1 {
@Autowired
private KafkaTemplate<String, Class1> kafkaTemplate1;
public void send(String topic, Class1 c1) {
kafkaTemplate1.send(topic, c1);
}
}
public class Sender2 {
@Autowired
private KafkaTemplate<String, Class2> kafkaTemplate2;
public void send(String topic, Class2 c2) {
kafkaTemplate2.send(topic, c2);
}
}
// -----------------------------------------------
// Receiver
// -----------------------------------------------
@Configuration
@EnableKafka
public class ReceiverConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, Class1> consumerFactory1() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Class1.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
return factory;
}
@Bean
public Receiver1 receiver1() {
return new Receiver1();
}
//-------- add the second listener
@Bean
public ConsumerFactory<String, Class2> consumerFactory2() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Class2.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
return factory;
}
@Bean
public Receiver2 receiver2() {
return new Receiver2();
}
}
public class Receiver1 {
@KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
public void receive(Class1 c1) {
LOGGER.info("Received c1");
}
}
public class Receiver2 {
@KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(Class2 c2) {
LOGGER.info("Received c2");
}
}