问题描述
我在项目中使用RabbitMQ.
I am using a RabbitMQ in my project.
我的使用者中有RabbitMQ客户端部分的代码,该连接需要tls1.1才能与真正的MQ连接.
I have in my consumer the code of the client part of rabbitMQ and the connection need a tls1.1 to connect with the real MQ.
我想在我的JUnit测试中测试此代码,并模拟将消息传递给我的使用者.
I want to test this code in my JUnit test and to mock the message delivery to my consumer.
我在Google中看到了几个使用不同工具的示例,它们是如何使用骆驼Rabbit或activeMQ的,但是此工具可用于amqp 1.0,而rabbitMQ仅可用于amqp 0.9.
I see in google several examples with different tools how camel rabbit or activeMQ but this tools works with amqp 1.0 and rabbitMQ only works in amqp 0.9 .
有人遇到了这个问题吗?
Someone had this problem?
谢谢!
更新
这是用于测试从队列接收json的代码.
This is the code to testing to receive a json from the queue.
package com.foo.foo.queue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;
import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class BrokerThreadHLConsumer extends Thread {
private static BrokerThreadHLConsumer instance;
private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);
private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;
private boolean loop;
private BrokerThreadHLConsumer() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
char[] keyPassphrase = "clientrabbit".toCharArray();
KeyStore keyStoreCacerts;
ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
try {
/* Public key cacerts to connect to message queue*/
keyStoreCacerts = KeyStore.getInstance("PKCS12");
URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
File filePublicKey = new File(resourcePublicKey.toURI());
keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
KeyManagerFactory keyManager;
keyManager = KeyManagerFactory.getInstance("SunX509");
keyManager.init(keyStoreCacerts, keyPassphrase);
char[] trustPassphrase = "changeit".toCharArray();
KeyStore tks;
tks = KeyStore.getInstance("JCEKS");
URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
File fileCacerts = new File(resourceCacerts.toURI());
tks.load(new FileInputStream(fileCacerts), trustPassphrase);
TrustManagerFactory tmf;
tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
SSLContext c = SSLContext.getInstance("TLSv1.1");
c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);
factory.setUri(rabbitHost);
factory.useSslProtocol(c);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "fanout");
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
} catch (KeyManagementException e1) {
e1.printStackTrace();
} catch (Exception e) {
log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
log.error(e.getStackTrace());
e.printStackTrace();
}
}
public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
if (instance == null)
instance = new BrokerThreadHLConsumer();
return instance;
}
public void run() {
if (PolicyFinder != null) {
try {
consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
log.info("Consumer broker started and waiting for messages");
loop = true;
while (loop) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
JSONObject obj = new JSONObject(message);
log.info("Message received from broker " + obj);
if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
log.error("PolicySet error: error upgrading the policySet");
}
} catch (Exception e) {
log.error("Receiving message error");
log.error(e);
}
}
} catch (IOException e) {
log.error("Consumer couldn't start");
log.error(e.getStackTrace());
}
} else {
log.error("Consumer couldn't start cause of PolicyFinder is null");
}
}
public void close() {
loop = false;
try {
consumer.getChannel().basicCancel(consumer.getConsumerTag());
} catch (IOException e) {
e.printStackTrace();
}
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
this.PolicyFinder = PolicyFinder;
}
}
推荐答案
据我所知,有两个要尝试测试的问题:
As I understand it, there are two things trying to be tested in the question:
- 用于连接RabbitMQ的TLS配置
- basicPublish/basicConsume(称为 delivery )与其他应用程序交互的行为
- TLS configuration to connect to RabbitMQ
- basicPublish / basicConsume (what's called delivery) behavior regarding interactions with the rest of the application
对于第一个,由于正在测试TLS本身,只有连接到配置了正确的信任库的RabbitMQ的真实实例,才能证明配置可以正常工作
For the first one, as TLS itself is being tested, only connecting to a real instance of RabbitMQ with correct truststore configured will prove that configuration is working
但是,对于第二个示例,对于展示该应用程序功能的测试(使用诸如Cucumber之类的工具以提高可读性),您可以尝试使用我正在开发的库: rabbitmq-mock (这就是为什么我要挖一个旧帖子)
For the second one however, for tests demonstrating features of the app (with tools like Cucumber for readability), you may try a library i'm working on: rabbitmq-mock (and that's why I'm digging up an old post)
只需将其作为依赖项包括即可:
Just include it as dependency:
<dependency>
<groupId>com.github.fridujo</groupId>
<artifactId>rabbitmq-mock</artifactId>
<version>1.0.14</version>
<scope>test</scope>
</dependency>
并在单元测试中将new ConnectionFactory()
替换为new MockConnectionFactory()
.
And replace new ConnectionFactory()
by new MockConnectionFactory()
in your unit test.
这篇关于我需要在单元测试中模拟RabbitMQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!