本文介绍了轴突消息接收但事件处理程序未调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

接收到 Axon 消息但未调用事件处理程序.

我正在尝试在两个不同队列的两侧实现事件源.我的第一个队列是test,第二个队列是testdemo

I am trying to implement the event sourcing in both the side with tow different queue.My First Queue is test and the Second one is testdemo

我有两个独立的应用程序在同一台服务器上运行.

I have two separate application running on the same server.

  1. 用户管理
  2. 钱包管理

我已经实现了从用户管理到钱包管理的事件溯源.并且工作正常.

I have implemented the event sourcing from User Management to wallet management. and it is working fine.

现在我正在尝试将钱包管理实施到 UserManagement,这意味着我何时会从钱包管理( Producer )发布事件和(使用)用户管理应用程序.因此接收到事件但未调用事件处理程序.

Now I am trying to implement the wallet management to UserManagement, Means that When I will publish the event from the wallet management ( Producer )and ( Consume ) the user management application. So the event is received but event handler is not called.

以下是我的应用程序代码.请帮我弄清楚我会错过什么.

Following is my application code. Please help me to figure out what I will be missing.

我的 Axon 配置类

package com.peaas.ngapblueprintdemo.config;

import org.axonframework.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.rabbitmq.client.Channel;

@Configuration
public class AxonConfiguration {

    private final static Logger logger = LoggerFactory.getLogger(AxonConfiguration.class);

    @Value("${axon.amqp.exchange}")
    private String exchange;

    @Bean
    public Exchange exchange() {
        logger.info(exchange + " AMQP Exchange Registering ");
        return ExchangeBuilder.fanoutExchange(exchange).build();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable(exchange).build();
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
    }

    @Autowired
    public void configure(AmqpAdmin amqpAdmin) {
        amqpAdmin.declareExchange(exchange());
        amqpAdmin.declareQueue(queue());
        amqpAdmin.declareBinding(binding());
    }

    @Bean
    public SpringAMQPMessageSource testdemo(Serializer serializer) {
        System.out.println("--- On Message Call ---");
        return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {

            @RabbitListener(queues = "testdemo")

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println(message.getMessageProperties());
                System.out.println("channel == "+channel);
                super.onMessage(message, channel);
            }
        };
    }
}

WalletCreatedEvent 类

package com.peaas.ngapblueprintdemo.events;

public class WalletCreatedEvent {
    private Long id;
    private String walletId;
    private Double amount;
    private Long userId;

    public WalletCreatedEvent(Long id, String walletId, Double amount, Long userId) {
        super();
        System.out.println("--- call ---");
        this.id = id;
        this.walletId = walletId;
        this.amount = amount;
        this.userId = userId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getWalletId() {
        return walletId;
    }

    public void setWalletId(String walletId) {
        this.walletId = walletId;
    }

    @Override
    public String toString() {
        return "WalletCreatedEvent [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
                + "]";
    }

}

EventHandler 类

package com.peaas.ngapblueprintdemo.eventHandlers;

import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;

import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;

@Component
public class UserEventHandler {

    @EventHandler
    public void onCreateWalletEvent(WalletCreatedEvent event) {
        System.out.println("--- Wallet Created Successfully ---");
        System.out.println(event);
    }
}

以下是我的application.yml文件属性

Following is my application.yml file properties

axon:
    amqp:
        exchange: test
    eventhandling:
        processors:
            amqpEvents:
                source: testdemo

以下是我收到的显示事件的日志数据.

Following is my log data that showing event is received.

MessageProperties [headers={axon-message-id=fa60968c-6905-46b5-8afe-6da853a4c51a, axon-message-aggregate-seq=0, axon-metadata-correlationId=589ef284-176f-49b8-aae0-0ad1588fa735, axon-message-aggregate-type=WalletAggregate, axon-message-revision=null, axon-message-timestamp=2018-08-06T11:09:26.345Z, axon-message-type=com.peaas.ngapblueprintdemo.events.WalletCreatedEvent, axon-metadata-traceId=589ef284-176f-49b8-aae0-0ad1588fa735, axon-message-aggregate-id=9524f7df-44fb-477f-83b8-d176583a126e}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=testdemo, receivedRoutingKey=com.peaas.ngapblueprintdemo.events, deliveryTag=1, consumerTag=amq.ctag-fGm3jQcP_JIoTGf4ZMhAIg, consumerQueue=testdemo]
channel == Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@3dcd657d Shared Rabbit Connection: SimpleConnection@19b12fd2 [delegate=amqp://[email protected]:5672/, localPort= 52963]

推荐答案

您已经有了大部分正确的配置,但是您忘记将您的 SpringAMQPMessageSource 绑定到您的事件所在的事件处理器处理组件放置.

You have most of the right configuration in place, but you are forgetting to tie your SpringAMQPMessageSource to an Event Processor under which your event handling component is placed.

请参阅参考指南关于如何达到此目标的正确示例.

See the reference guide for a correct example on how to reach this.

这是该参考指南中的直接片段,用于将消息源配置为事件处理器:

Here is a direct snippet from that reference guide to configure the message source to an event processor:

@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}

编辑

我想我知道你遗漏了哪一部分.您确实将队列作为可订阅的消息源正确连接到事件处理器.这遵循您的 application.yml,它将 testdemo 消息源与 amqpEvents 事件处理器联系起来.因此对于我之前在这方面的假设感到抱歉.

I think I see which part you where missing.You did correctly wire the queue as a subscribable message source to an Event Processor. This follows from you application.yml, which ties the testdemo message source to the amqpEvents Event Processor. Thus sorry for my earlier assumption on that part.

您未在 UserEventHandler 中接收事件的原因是该事件处理程序未绑定到 amqpEvents 事件处理器.要解决该问题,您应该将 @ProcessingGroup("amqpEvents") 注释添加到 UserEventHandler 组件.

The reasoning why you don't receive your events in the UserEventHandler, is because that event handler isn't tied to the amqpEvents Event Processor.To solve that you should add the @ProcessingGroup("amqpEvents") annotation to the UserEventHandler component.

这篇关于轴突消息接收但事件处理程序未调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-19 20:53