我想聚合来自3个不同端点(@ServiceActivator)的响应,并将聚合的响应持久保存到数据库。

我正在关注异常

org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: c.b.bean.jpa.PersonEntity.listsOfEmails, could not initialize proxy - no Session


如何使消息流事务感知?还是我缺少东西?

以下是代码段,

组态

@Configuration
@EnableIntegration
@ComponentScan(basePackages={"integration.endpoint", "integration.sync"})
@IntegrationComponentScan(basePackages={"integration.gateway"})
public class InfrastructureConfiguration {

    @Bean
    @Description("Entry to the messaging system through the gateway.")
    public MessageChannel requestChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends transformed message to outbound channel.")
    public MessageChannel invocationChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends handler message to aggregator channel.")
    public MessageChannel aggregatorChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends handler message to response channel.")
    public MessageChannel responseChannel(){
        return pubSubChannel();
    }

    private PublishSubscribeChannel pubSubChannel() {
        PublishSubscribeChannel pubSub = new PublishSubscribeChannel(executor());
        pubSub.setApplySequence(true);
        return pubSub;
    }

    private Executor executor() {
        return Executors.newFixedThreadPool(10);
    }
}


启动网关

@MessagingGateway(name="entryGateway", defaultRequestChannel="requestChannel")
public interface IntegrationService {
    String initiateSync(AnObject obj);
}


消息生成器:它通过获取实体来转换消息,并将其设置为消息的属性,并将消息发送到通道。稍后,此实体由@ServiceActivator(3个端点)中的@Autowired服务使用。该实体因其关联而延迟初始化。

@Component
public class MessageBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBuilder.class);

    @Autowired
    private ODao dao;

    @Transformer(inputChannel="requestChannel", outputChannel="invocationChannel")
    public OMessage buildMessage(Message<AnObject> msg){
        LOGGER.info("Transforming messages for ID [{}]", msg.getPayload().getId());
        OMessage om = new OMessage(msg.getPayload());
        om.buildMessage(dao);
        return om;
    }
}


端点1

@Component
public class Handler1 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler1.class);

    @Autowired
    private service1 Service1;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler1 is called");
            rm = service1.getResponse(om);
        }else{
            LOGGER.info("Handler1 is not called");
        }
        return rm;
    }
}


端点2

@Component
public class Handler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler2.class);

    @Autowired
    private service2 Service2;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler2 is called");
            rm = service2.getResponse(om);
        }else{
            LOGGER.info("Handler2 is not called");
        }
        return rm;
    }
}


端点3

@Component
public class Handler3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler3.class);

    @Autowired
    private service3 Service3;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler3 is called");
            rm = service3.getResponse(om);
        }else{
            LOGGER.info("Handler3 is not called");
        }
        return rm;
    }
}


聚合器

@Component
public class MessageAggregator {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageAggregator.class);

    @Aggregator(inputChannel="aggregatorChannel", outputChannel="responseChannel")
    public Response aggregate(List<ResponseMessage> resMsg){
        LOGGER.info("Aggregating Responses");
        Response res = new Response();
        res.getResponse().addAll(resMsg);
        return res;
    }

    @ReleaseStrategy
    public boolean releaseChecker(List<Message<ResponseMessage>> resMsg) {
        return resMsg.size() ==3;
    }

    @CorrelationStrategy
    public ResponseMessage corelateBy(ResponseMessage resMsg) {
        LOGGER.info("CorrelationStrategy: message payload details {}", resMsg);
        return resMsg;
    }
}

最佳答案

您可能会获取对dao层内的延迟加载域的引用。因此,当稍后使用它时,它将在没有代理的情况下正确实例化。
例如,可能如下所示:

public List<PersonEntity> fetchPersonsWithMails() {
    return sessionFactory.getCurrentSession()
        .createCriteria(PersonEntity.class)
        .setFetchMode("listsOfEmails", FetchMode.JOIN)
        .list();
}

10-06 11:34