我正在使用Spring Cloud Stream 2.1.0.RELEASE版本将消息(在本例中为Kafka)发送到基于收到的输入动态定义的通道。问题在于,只有其他所有消息都以正确的频道结尾,另一半消息以默认的频道结尾。
我以this样本作为起点。
我将要发送到的通道放入特定的消息标头中,然后使用HeaderValueRouter检查相同的标头值以查看输出到哪个通道。
我正在配置我的应用程序,如下所示:
@EnableBinding(CloudStreamConfig.DynamicSource.class)
public class CloudStreamConfig {
@Autowired
private BinderAwareChannelResolver resolver;
public static final String CHANNEL_HEADER = "channelHeader";
public static final String OUTPUT_CHANNEL = "outputChannel";
private final String defaultChannel = "defaultChannel";
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(CHANNEL_HEADER);
router.setDefaultOutputChannelName(defaultChannel);
router.setChannelResolver(resolver);
return router;
}
public interface DynamicSource {
@Output(OUTPUT_CHANNEL)
MessageChannel output();
}
}
在我的控制器中,我接受一个对象以及一个参数,该参数定义了将其发送到哪个通道,然后将其发送到MessageChannel。相关代码如下:
@Autowired
@Qualifier(CloudStreamConfig.OUTPUT_CHANNEL)
public MessageChannel localChannel;
...
@GetMapping(path = "/error/{channel}")
@ResponseStatus(HttpStatus.OK)
public void error(@PathVariable String channel) {
// build my object
Message message = MessageBuilder.createMessage(myObject,
new MessageHeaders(Collections.singletonMap(CloudStreamConfig.CHANNEL_HEADER, channel)));
localChannel.send(message);
}
如果我向
/error/someChannel
发送10条消息,我希望在someChannel
中看到10条消息。但是,我在someChannel
中看到一半的消息,而在defaultChannel
中看到另一半的消息。我在消息中放置了一个调试计数器变量,它将第一条消息发送到正确的通道,然后每隔一条消息发送到正确的通道,而其他所有消息都发送到默认通道。是什么原因造成的,我该如何解决?我在滥用我的
DynamicSource
课吗?我以为它会与任何同名的自动连接的MessageChannel
绑在一起(而且看起来确实如此),但我想知道是否缺少某些东西。还是与BinderAwareChannelResolver
有意外交互? (老实说,我不知道这是做什么的,我仅将其包括在内是因为样本确实如此) 最佳答案
输出通道上有两个订户-通道绑定(在活页夹中)和路由器。
对于DirectChannel
,默认的调度算法是循环调度,因此您将消息交替发送到路由器,然后直接发送到活页夹。
您需要为服务激活器使用不同的DirectChannel
@Bean
,以便所有消息都发送到那里,并在路由后发送到绑定器。
请参阅该示例中的sourceChannel
。