问题描述
在我的应用程序中,我需要确定消息是否已成功发布到AMQP交换中或发生了一些错误.似乎发明了发布商确认来解决此问题,所以我开始尝试使用它们.
In my application I need to determine whether a message is successfully published into AMQP exchange or some error happens. It seems like Publisher Confirms were invented to address this issue so I started experimenting with them.
对于我的Java应用程序,我使用了com.rabbitmq:amqp-client:jar:3.5.4
,并且在缺少交换(我尝试在其中发布)的情况下,我选择了一个非常简单的方案.我希望在这种情况下会调用ConfirmListener.handleNack
.
For my Java application I used com.rabbitmq:amqp-client:jar:3.5.4
and I chose a very simple scenario when the exchange (where I try to publish) is missing. I expected that ConfirmListener.handleNack
is going to be invoked in such case.
这是我的Java代码:
Here's my Java code:
package wheleph.rabbitmq_tutorial.confirmed_publishes;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConfirmedPublisher {
private static final Logger logger = LoggerFactory.getLogger(ConfirmedPublisher.class);
private final static String EXCHANGE_NAME = "confirmed.publishes";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
logger.debug(String.format("Received ack for %d (multiple %b)", deliveryTag, multiple));
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
logger.debug(String.format("Received nack for %d (multiple %b)", deliveryTag, multiple));
}
});
for (int i = 0; i < 100; i++) {
String message = "Hello world" + channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
logger.info(" [x] Sent '" + message + "'");
Thread.sleep(2000);
}
channel.close();
connection.close();
}
}
但是事实并非如此.日志显示没有执行回调:
However it's not the case. Log shows that no callback is executed:
17:49:34,988 [main] ConfirmedPublisher - [x] Sent 'Hello world1'
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmed.publishes' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:657)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:640)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at wheleph.rabbitmq_tutorial.confirmed_publishes.ConfirmedPublisher.main(ConfirmedPublisher.java:38)
有趣的是,当我尝试使用NodeJS库时,pubilsher会按预期确认工作. amqp-coffee (0.1.24).
What's interesting is that pubilsher confirms work as expected when I try to use library for NodeJS amqp-coffee (0.1.24).
这是我的NodeJS代码:
Here's my NodeJS code:
var AMQP = require('amqp-coffee');
var connection = new AMQP({host: 'localhost'});
connection.setMaxListeners(0);
console.log('Connection started')
connection.publish('node.confirm.publish', '', 'some message', {deliveryMode: 2, confirm: true}, function(err) {
if (err && err.error && err.error.replyCode === 404) {
console.log('Got 404 error')
} else if (err) {
console.log('Got some error')
} else {
console.log('Message successfully published')
}
})
以下是指示使用适当参数调用回调的输出:
Here's the output that indicates that the callback is invoked with proper argument:
Connection started
Got 404 error
我不正确地使用com.rabbitmq:amqp-client
还是该库中存在一些不一致之处?
Am I using com.rabbitmq:amqp-client
incorrectly or there's some inconsistency in that library?
推荐答案
原来,我的假设不正确,在这种情况下不应调用ConfirmListener.handleNack
.
It turned out that my assumption was not correct and ConfirmListener.handleNack
should not be invoked in this case.
这是针对 amqp-coffee amqp-coffee 库:
Here's a relevant portion of AMQP messages for the scenario described in the question observed for amqp-coffee library:
ch#1 -> {#method<channel.open>(out-of-band=), null, ""}
ch#1 <- {#method<channel.open-ok>(channel-id=), null, ""}
ch#1 -> {#method<confirm.select>(nowait=false), null, ""}
ch#1 <- {#method<confirm.select-ok>(), null, ""}
ch#1 -> {#method<basic.publish>(ticket=0, exchange=node.confirm.publish, routing-key=, mandatory=false, immediate=false), #contentHeader<basic>(content-type=string/utf8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), "some message"}
ch#1 <- {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'node.confirm.publish' in vhost '/', class-id=60, method-id=40), null, ""}
ch#2 -> {#method<channel.open>(out-of-band=), null, ""}
ch#2 <- {#method<channel.open-ok>(channel-id=), null, ""}
ch#2 -> {#method<confirm.select>(nowait=false), null, ""}
ch#2 <- {#method<confirm.select-ok>(), null, ""}
您可以看到:
- 发布失败后,经纪人会使用包含原因的
channel.close
关闭渠道. -
basic.nack
未发送. - 该库会自动打开另一个通道以供后续操作.
- After unsuccessful publish the channel is closed by broker using
channel.close
that includes the reason. basic.nack
is not sent.- The library automatically opens another channel for subsequent operations.
可以使用ShutdownListener
在Java中实现此行为:
This behaviour can be implemented in Java using ShutdownListener
:
package wheleph.rabbitmq_tutorial.confirmed_publishes;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConfirmedPublisher {
private static final Logger logger = LoggerFactory.getLogger(ConfirmedPublisher.class);
private final static String EXCHANGE_NAME = "confirmed.publishes";
// Beware that proper synchronization of channel is needed because current approach may lead to race conditions
private volatile static Channel channel;
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
final Connection connection = connectionFactory.newConnection();
for (int i = 0; i < 100; i++) {
if (channel == null) {
createChannel(connection);
}
String message = "Hello world" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
logger.info(" [x] Sent '" + message + "'");
Thread.sleep(2000);
}
channel.close();
connection.close();
}
private static void createChannel(final Connection connection) throws IOException {
channel = connection.createChannel();
channel.confirmSelect(); // This in fact is not necessary
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
// Beware that proper synchronization is needed here
logger.debug("Handling channel shutdown...", cause);
if (cause.isInitiatedByApplication()) {
logger.debug("Shutdown is initiated by application. Ignoring it.");
} else {
logger.error("Shutdown is NOT initiated by application. Resetting the channel.");
/* We cannot re-initialize channel here directly because ShutdownListener callbacks run in the connection's thread,
so the call to createChannel causes a deadlock since it blocks waiting for a response (whilst the connection's thread
is stuck executing the listener). */
channel = null;
}
}
});
}
}
需要注意的地方很少
- 在这种情况下,
- 发布者无需确认,因为我们不使用
ConfirmListener
或该方法专用的任何其他功能.但是,发布者确认,如果我们想跟踪哪些消息已成功发送,哪些消息未成功发送,将很有用. - 如果我们启动
ConfirmedPublisher
并在一段时间后创建丢失的交换,则以下所有消息将被成功发布.但是,以前所有失败的消息都会丢失. - 需要其他同步.
- Publisher confirms are not necessary in this case because we don't use
ConfirmListener
or any other functionality specific to that approach. However publisher confirms would be useful if we wanted to track which messages were successfully send and which not. - If we launch
ConfirmedPublisher
and after some time create the missing exchange, all following messages will be successfully published. However all the previous failed messages are lost. - Additional synchronization is needed.
这篇关于缺少交换时不会调用ConfirmListener.handleNack的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!