本文介绍了JEE7 + WildFly(HornetQ) - 从应用程序暂停队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用WildFly + HornetQ作为我们的应用程序服务器和JMS消息队列,并且要求能够从应用程序暂停/恢复队列。这可能吗?

We are using WildFly + HornetQ as our application server and JMS message queue, and have the requirement to be able to pause/resume queues from the application. Is this possible?

推荐答案

这可以使用JMX或使用hornetq核心管理API来完成。

This can be done using JMX or using the hornetq core management api.

出于本示例的目的,使用了wildfly 8.1.0.Final运行standalone-full-ha配置文件。

For the purposes of this example, wildfly 8.1.0.Final was used running the standalone-full-ha profile.

必需的Maven依赖项:

Required Maven Dependencies:

    <dependency>
        <groupId>org.hornetq</groupId>
        <artifactId>hornetq-jms-client</artifactId>
        <version>2.4.1.Final</version>
    </dependency>

    <dependency>
        <groupId>org.wildfly</groupId>
        <artifactId>wildfly-jmx</artifactId>
        <version>8.1.0.Final</version>
    </dependency>

这是一个测试类,演示如何通过JMX使用JmsQueueControl:

Here is a test class demonstrating the use of JmsQueueControl via JMX:

package test.jmx.hornetq;

import org.hornetq.api.jms.management.JMSQueueControl;

import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

public class WildflyJmsControl {

    public static void main(String[] args) {
        try {
            //Get a connection to the WildFly 8 MBean server on localhost
            String host = "localhost";
            int port = 9990;  // management-web port
            String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
            JMXServiceURL serviceURL = new JMXServiceURL(urlString);
            JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
            MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();

            String queueName = "testQueue"; // use your queue name here

            String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
            ObjectName objectName = ObjectName.getInstance(mbeanObjectName);

            JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
            assert jmsQueueControl != null;

            long msgCount = jmsQueueControl.countMessages(null);

            System.out.println(mbeanObjectName + " message count: " + msgCount);

            jmsQueueControl.pause();
            System.out.println("queue paused");

            jmsQueueControl.resume();
            System.out.println("queue resumed");

            jmxConnector.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过JMS访问hornetq管理使用:

To access hornetq management via JMS use:

package test.jms.hornetq;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;

public class HornetqService {

    public void testPauseResumeQueue() {
        // this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
        try {
            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
                    InVMConnectorFactory.class.getName()));

            ClientSession session = locator.createSessionFactory().createSession();

            session.start();

            ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");

            String queueName = "testQueue"; // use your queue name here

            // get queue message count
            ClientMessage message = session.createMessage(false);
            ManagementHelper.putAttribute(message, queueName, "messageCount");

            ClientMessage reply = requester.request(message);
            int count = (Integer) ManagementHelper.getResult(reply);
            System.out.println("There are " + count + " messages in exampleQueue");

            // pause the queue
            message = session.createMessage(false);
            ManagementHelper.putOperationInvocation(message, queueName, "pause");

            requester.request(message);

            // get queue paused
            message = session.createMessage(false);
            ManagementHelper.putAttribute(message, queueName, "paused");
            reply = requester.request(message);
            Object result =  ManagementHelper.getResult(reply);
            System.out.println("result: " + result.getClass().getName() + " : " + result.toString());

            // resume queue
            message = session.createMessage(false);
            ManagementHelper.putOperationInvocation(message, queueName, "resume");
            requester.request(message);

            // get queue paused
            message = session.createMessage(false);
            ManagementHelper.putAttribute(message, queueName, "paused");
            reply = requester.request(message);
            Object result2 =  ManagementHelper.getResult(reply);
            System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());

            requester.close();

            session.close();
        }catch (Exception e){
            System.out.println("Error pausing queue" + e.getMessage());
        }
    }
}

这篇关于JEE7 + WildFly(HornetQ) - 从应用程序暂停队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-15 23:09