本文介绍了我需要在单元测试中模拟RabbitMQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在项目中使用RabbitMQ.

I am using a RabbitMQ in my project.

我的使用者中有RabbitMQ客户端部分的代码,该连接需要tls1.1才能与真正的MQ连接.

I have in my consumer the code of the client part of rabbitMQ and the connection need a tls1.1 to connect with the real MQ.

我想在我的JUnit测试中测试此代码,并模拟将消息传递给我的使用者.

I want to test this code in my JUnit test and to mock the message delivery to my consumer.

我在Google中看到了几个使用不同工具的示例,它们是如何使用骆驼Rabbit或activeMQ的,但是此工具可用于amqp 1.0,而rabbitMQ仅可用于amqp 0.9.

I see in google several examples with different tools how camel rabbit or activeMQ but this tools works with amqp 1.0 and rabbitMQ only works in amqp 0.9 .

有人遇到了这个问题吗?

Someone had this problem?

谢谢!

更新

这是用于测试从队列接收json的代码.

This is the code to testing to receive a json from the queue.

package com.foo.foo.queue;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;

import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class BrokerThreadHLConsumer extends Thread {

private static BrokerThreadHLConsumer instance;

private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);

private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;

private boolean loop;

private BrokerThreadHLConsumer() throws IOException {
    ConnectionFactory factory = new ConnectionFactory();
    char[] keyPassphrase = "clientrabbit".toCharArray();
    KeyStore keyStoreCacerts;
    ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
    String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
    String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
    try {
        /* Public key cacerts to connect to message queue*/
        keyStoreCacerts = KeyStore.getInstance("PKCS12");
        URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
        File filePublicKey = new File(resourcePublicKey.toURI());
        keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
        KeyManagerFactory keyManager;

        keyManager = KeyManagerFactory.getInstance("SunX509");
        keyManager.init(keyStoreCacerts, keyPassphrase);

        char[] trustPassphrase = "changeit".toCharArray();
        KeyStore tks;

        tks = KeyStore.getInstance("JCEKS");

        URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
        File fileCacerts = new File(resourceCacerts.toURI());

        tks.load(new FileInputStream(fileCacerts), trustPassphrase);

        TrustManagerFactory tmf;
        tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.1");
        c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);

        factory.setUri(rabbitHost);
        factory.useSslProtocol(c);
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, "fanout");
        queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    } catch (CertificateException e) {
        e.printStackTrace();
    } catch (KeyStoreException e) {
        e.printStackTrace();
    } catch (UnrecoverableKeyException e) {
        e.printStackTrace();
    } catch (KeyManagementException e1) {
        e1.printStackTrace();
    } catch (Exception e) {
        log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
        log.error(e.getStackTrace());
        e.printStackTrace();
    }
}

public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
    if (instance == null)
        instance = new BrokerThreadHLConsumer();
    return instance;
}

public void run() {
    if (PolicyFinder != null) {
        try {
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            log.info("Consumer broker started and waiting for messages");
            loop = true;
            while (loop) {
                try {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    JSONObject obj = new JSONObject(message);
                    log.info("Message received from broker " + obj);
                    if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
                        log.error("PolicySet error: error upgrading the policySet");
                    }
                } catch (Exception e) {
                    log.error("Receiving message error");
                    log.error(e);
                }
            }
        } catch (IOException e) {
            log.error("Consumer couldn't start");
            log.error(e.getStackTrace());
        }
    } else {
        log.error("Consumer couldn't start cause of PolicyFinder is null");
    }
}

public void close() {
    loop = false;
    try {
        consumer.getChannel().basicCancel(consumer.getConsumerTag());
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        channel.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
    this.PolicyFinder = PolicyFinder;
}
}

推荐答案

据我所知,有两个要尝试测试的问题:

As I understand it, there are two things trying to be tested in the question:

  • 用于连接RabbitMQ的TLS配置
  • basicPublish/basicConsume(称为 delivery )与其他应用程序交互的行为
  • TLS configuration to connect to RabbitMQ
  • basicPublish / basicConsume (what's called delivery) behavior regarding interactions with the rest of the application

对于第一个,由于正在测试TLS本身,只有连接到配置了正确的信任库的RabbitMQ的真实实例,才能证明配置可以正常工作

For the first one, as TLS itself is being tested, only connecting to a real instance of RabbitMQ with correct truststore configured will prove that configuration is working

但是,对于第二个示例,对于展示该应用程序功能的测试(使用诸如Cucumber之类的工具以提高可读性),您可以尝试使用我正在开发的库: rabbitmq-mock (这就是为什么我要挖一个旧帖子)

For the second one however, for tests demonstrating features of the app (with tools like Cucumber for readability), you may try a library i'm working on: rabbitmq-mock (and that's why I'm digging up an old post)

只需将其作为依赖项包括即可:

Just include it as dependency:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.14</version>
    <scope>test</scope>
</dependency>

并在单元测试中将new ConnectionFactory()替换为new MockConnectionFactory().

And replace new ConnectionFactory() by new MockConnectionFactory() in your unit test.

项目中提供了示例:"> https://github.com/fridujo/rabbitmq-mock/blob/master/src/test/java/com/github/fridujo/rabbitmq/mock/IntegrationTest.java

这篇关于我需要在单元测试中模拟RabbitMQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-15 14:33