我已经在Spring Boot中实现了jms,我正在使用@JmsListener来监听主题

  @Component
    public class AMQListner {
        BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
        @JmsListener(destination = "${spring.activemq.topic}")
        public void Consume(TextMessage message) {
            try {
                String json = message.getText();
                MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
                queue.add(bo);
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (JsonParseException e) {
                e.printStackTrace();
            } catch (JsonMappingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


现在,我需要一个侦听器,以监听该阻塞队列,如果它具有value,则进行处理。我们可以在Spring Boot中使用注释实现此目的吗?

最佳答案

首先,正确的方法是创建一个处理程序bean,而不是使接收者类中的消息队列具有成员。

public interface MessageHandler extends Consumer<MessageBO> {
    public default void handle(MessageBO msg) { accept(msg); }
}

@Component
public class AMQListener {
    @Resource("multiplexer")
    MessageHandler handler;

    @JmsListener(destination = "${spring.activemq.topic}")
    public void Consume(TextMessage message) {
        try {
            String json = message.getText();
            MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
            handler.handle(bo);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


然后,您将在处理程序Bean中放入队列

@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
    @Autowired
    MessageHandler actualConsumer;

    ExecutorService executor = Executors.newFixedThreadPool(4);
    public void accept(MessageBO msg) {
        executor.submit(msg -> actualConsumer.handle(msg));
    }
}


在这种情况下,执行器几乎是排队的。

注意:您没有这种方式的1024个限制。您可以通过使用ThreadPoolExecutor构造函数并向其传递有限的队列来实现。

关于java - Spring 启动和BlockingQueue监听器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49273962/

10-15 10:19