我已经在Spring Boot中实现了jms,我正在使用@JmsListener来监听主题
@Component
public class AMQListner {
BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
@JmsListener(destination = "${spring.activemq.topic}")
public void Consume(TextMessage message) {
try {
String json = message.getText();
MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
queue.add(bo);
} catch (JMSException e) {
e.printStackTrace();
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
现在,我需要一个侦听器,以监听该阻塞队列,如果它具有value,则进行处理。我们可以在Spring Boot中使用注释实现此目的吗?
最佳答案
首先,正确的方法是创建一个处理程序bean,而不是使接收者类中的消息队列具有成员。
public interface MessageHandler extends Consumer<MessageBO> {
public default void handle(MessageBO msg) { accept(msg); }
}
@Component
public class AMQListener {
@Resource("multiplexer")
MessageHandler handler;
@JmsListener(destination = "${spring.activemq.topic}")
public void Consume(TextMessage message) {
try {
String json = message.getText();
MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
handler.handle(bo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后,您将在处理程序Bean中放入队列
@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
@Autowired
MessageHandler actualConsumer;
ExecutorService executor = Executors.newFixedThreadPool(4);
public void accept(MessageBO msg) {
executor.submit(msg -> actualConsumer.handle(msg));
}
}
在这种情况下,执行器几乎是排队的。
注意:您没有这种方式的1024个限制。您可以通过使用
ThreadPoolExecutor
构造函数并向其传递有限的队列来实现。关于java - Spring 启动和BlockingQueue监听器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49273962/