我有一个流,它接收大消息(这些消息存储在RDBMS表中),因此在给定时间不能处理太多消息。因此,我要限制使用<int:poller max-messages-per-poll="" />的处理,也要限制某些queuescapacity设置为<int:queue capacity="">的处理。我知道,多个线程/事务将参与此流程,在用例中,这是可以接受的。

查询轮询数据库需要花费一些时间,因此,我不想经常运行它。另外,此流收到的消息往往会在“突发”中进入,这意味着它可能会收到1000条消息,然后在一个小时内没有收到任何消息。

我想做的是使用一个dynamic-poller,它很少进行轮询(因为注意到查询的运行成本很高),除非我看到一堆消息,在这种情况下,我想非常频繁地轮询直到所有消息已处理。例如,如果我有<int:poller max-messages-per-poll="100" />并且我知道轮询器仅读了100条消息,那么很有可能在RDBMS中有更多消息需要处理,我应该在处理完成后立即再次轮询。

我知道Spring没有提供修改trigger使其自然动态的方法,并且已经研究过Spring Integration Ref“ 7.1.5 Change Polling Rate at Runtime
dynamic-poller示例项目中:Dynamic Poller
这是一个开始,但我真的需要poller来根据当前负载更改其频率。
我对此可能并不正确,但是我认为加里可能在他关于“ Implementing High-Availability Architectures with Spring Integration”的演讲中提到了类似的事情。
无论如何,编写一个类来更改poller的频率似乎没什么大不了的。更具挑战性的是如何知道何时发生轮询而没有结果,因为什么都没有发布到输出通道。

我考虑过的一些选择:


<int:wire-tap channel="" />附加到poller的调用<int:service-activator>的通道。服务激活器检查消息数,并在poller上调整periodDynamicPeriodicTrigger
问题是,如果没有收到任何消息,将永远不会被调用,因此一旦我调整了轮询频率,轮询时间就会不确定。
与#1相同,但向DynamicPeriodicTrigger添加逻辑,该逻辑将在下一次触发发生后或特定时间段后将period还原为initialDelay
<int:advice-chain>元素内的<int:poller>元素与MethodInterceptor实现一起使用。
类似于Artem在link中的建议。
虽然这使我可以使用receive方法,但它不能授予我对receive方法结果的访问权限(这将使我可以检索到的消息数量)。请注意,这似乎已由Gary在link上提到的内容所证实。


  请求处理程序建议链是一个特例;我们必须注意只建议内部端点方法,而不建议任何下游处理(在输出通道上)。
  
  向投票员提供建议更为简单,因为我们正在为整个流程提供建议。如“ 7.1.4命名空间支持”小节“ AOP建议链”中所述,您只需通过实现MethodInterceptor接口来创建建议。
  
  参见SourcePollingChannelAdapterFactoryBeanTests.testAdviceChain()一个非常简单的建议...
  
  码:
  adviceChain.add(new MethodInterceptor() {
  public Object invoke(MethodInvocation invocation) throws Throwable {
  adviceApplied.set(true);
  return invocation.proceed();
  }
  });
  这只是用来断言该建议已被正确调用。真正的建议会在invocation.proceed()之前和/或之后添加代码。
  
  实际上,此建议建议所有方法,但只有一种(Callable.call())。

使用查找AfterReturning方法的切入点创建Message<T> receive()建议。
克隆JdbcPollingChannelAdapter并将我的钩子添加到该新类中。
Gary对此link建议的内容可能有用,但“主旨”链接不再有效。


更新:
我最终实现的选项是使用看起来像以下内容的AfterReturningAdvice
原始代码:

<int-jdbc:inbound-channel-adapter id="jdbcInAdapter"
    channel="inputChannel" data-source="myDataSource"
    query="SELECT column1, column2 from tableA"
    max-rows-per-poll="100">
    <int:poller fixed-delay="10000"/>
</int-jdbc:inbound-channel-adapter>


新代码:

<bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger">
    <constructor-arg name="period" value="20000" />
</bean>
<bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata">
    <property name="maxMessagesPerPoll" value="1000"/>
    <property name="trigger" ref="jdbcDynamicTrigger"/>
</bean>
<bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy">
    <property name="newPeriod" value="1"/>
    <property name="adjustmentThreshold" value="100"/>
    <property name="pollerMetadata" ref="jdbcPollerMetaData"/>
</bean>
<aop:config>
    <aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" >
        <aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/>
    </aop:aspect>
</aop:config>
<bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
    <constructor-arg ref="myDataSource" />
    <constructor-arg value="SELECT column1, column2 from tableA" />
    <property name="maxRowsPerPoll" value="100" />
</bean>
<int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean"
    channel="inputChannel"
    auto-startup="false">
    <int:poller ref="jdbcPollerMetaData" />
</int:inbound-channel-adapter>


我对此进行了更多研究,并认为Spring Integration可能会为轮询程序提供一些帮助,以便开发人员可以更好地自定义它们。
有关更多信息,请参见https://jira.spring.io/browse/INT-3633

如果该JIRA尚未实现,并且有人对我实现的代码感兴趣,请对此添加注释,然后将代码在github或gist上提供。

最佳答案

感谢您打开JIRA问题;我们应该在那儿讨论该功能,因为堆栈溢出不适用于扩展对话。

但是,我不确定您上面所说的“ ...是什么意思,但“主旨”链接不再有效...”。它对我来说很好用... https://gist.github.com/garyrussell/5374267,但让我们在JIRA中进行讨论。

关于java - 如何在Spring Integration中最佳实现DynamicPoller,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28369308/

10-11 20:32