本文介绍了如何从Qpid JMS(qpid-jms-client-0.11.1.jar)从Azure Service Bus发送/接收消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在研究如何使用Qpid JMS(qpid-jms-client-0.11.1.jar)连接到Azure Service Bus.

I am currently researching how to connect to Azure Service Bus using Qpid JMS (qpid-jms-client-0.11.1.jar).

我创建了一个演示Java应用程序SimpleSenderReceiver,它使用以下指南(#link1 ).该代码似乎可以在Qpid JMS客户端(0.32版)中使用非常"旧版本使用.我现在正在尝试使其与Qpid JMS的最新稳定版本(qpid-jms-client-0.11.1.jar)一起使用,到目前为止,我还没有成功.仔细阅读Qpid JMS的文档#link2 在0.11.1中,您可以看到属性文件中的属性 connectionfactory 的方式与版本0.32中的方式不同.

I have created a Demo Java application SimpleSenderReceiver which connects to an already configured Azure Service Bus using the following guide (#link1). This code seems to work using a "very" old version om the Qpid JMS client (version 0.32). I am now trying to get it to work with the latest stable version of Qpid JMS (qpid-jms-client-0.11.1.jar), And so far I have not been successful.Going through the documentation #link2 of Qpid JMS 0.11.1, you can see that the way that the in the properties file the property connectionfactory is different to that in version 0.32.

  • 如何在属性文件?
  • 如何设置Qpid JMS-Azure Service Bus演示以与最新的Qpid稳定版本一起使用?
  • How can i setup a correct connection amqp connection string in theproperties file?
  • How can I setup de Qpid JMS - Azure Service Bus Demo to work with the latest Qpid stable version?

我一直遇到以下问题:

731 [AmqpProvider:(1):[amqps://example-bus.servicebus.windows.net?transport.connectTimeout=60000]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
javax.jms.JMSException: Idle timeout value specified in connection OPEN ('30000 ms') is not supported. Minimum idle timeout is '60000' ms. TrackingId:238849ced1em4cd3a093261372f4fc1e_G21, SystemTracker:gateway6, Timestamp:10/27/2016 8:16:23 AM [condition = amqp:internal-error]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:150)
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:147)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:251)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:771)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

我有以下属性文件servicebus.properties:

I have the follwing properties file servicebus.properties:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]

connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSsuiLI%3D&transport.connectTimeout=6000

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]

queue.myQueueLookup = queue1

我有一个正在运行的类SimpleSenderReceiver.java:

I have the flowing class SimpleSenderReceiver.java:

package com.demo.AzureTest;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;

public class SimpleSenderReceiver implements MessageListener {

    private static boolean runReceiver = false;
    private Connection connection;
    private Session sendSession;
    private Session receiveSession;
    private MessageProducer sender;
    private MessageConsumer receiver;
    private static Random randomGenerator = new Random();

    public SimpleSenderReceiver() throws Exception {
        // Configure JNDI environment
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, 
                   "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put(Context.PROVIDER_URL, "C://PATH//servicebus.properties");
        Context context = new InitialContext(env);

        // Look up ConnectionFactory and Queue
        ConnectionFactory cf = (ConnectionFactory) context.lookup("myFactoryLookup");
        System.out.println("lookup: " + context.lookup("myFactoryLookup"));
        System.out.println("cf:"+cf);
        Destination queue = (Destination) context.lookup("myQueueLookup");

        System.out.println("queue:");

        // Create Connection
        connection = cf.createConnection();
        System.out.println("connection :"+connection);

//        // Create sender-side Session and MessageProducer
        sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        System.out.println("Session open.");

        sender = sendSession.createProducer(queue);
        System.out.println(sender.getDestination());
        System.out.println("sender:"+sender);

        if (runReceiver) {
            // Create receiver-side Session, MessageConsumer,and MessageListener
            receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            receiver = receiveSession.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
        }
    }

    public static void main(String[] args) {
        try {

            if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
                runReceiver = false;
            }

            SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
            System.out.println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
            BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));

            while (true) {
                String s = commandLine.readLine();
                if (s.equalsIgnoreCase("exit")) {
                    simpleSenderReceiver.close();
                    System.exit(0);
                } else {
                    simpleSenderReceiver.sendMessage();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendMessage() throws JMSException {
        TextMessage message = sendSession.createTextMessage();
        message.setText("Hello from SIS Test AMQP message from Java JMSaaa");
        long randomMessageID = randomGenerator.nextLong() >>>1;
        message.setStringProperty("TenantId", "klant");
        message.setStringProperty("EventType", "bericht");
        message.setStringProperty("EventTypeVersion", "1.0");
        message.setStringProperty("MessageType", "DocumentMessage");
        message.setStringProperty("OperationType", "Create");
        message.setStringProperty("SourceSystem", "sis_sender");
        message.setStringProperty("EnterpriseKey", "sis_sender-klant-bericht");
        message.setJMSMessageID("ID:" + randomMessageID);
        sender.send(message);
        System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());
        System.out.println("Sent message with Text = " + message.getText());
    }

    public void close() throws JMSException {
        connection.close();
    }

    public void onMessage(Message message) {
        try {
            System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID());
            TextMessage txtmessage = (TextMessage) message;
            System.out.println("Received message with Text = " + txtmessage.getText());
            message.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}  

Maven依赖项:

    <dependencies>
        <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-jms-client</artifactId>
          <version>0.11.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.2</version>
        </dependency>
    </dependencies>

--- 更新 ---

自那以后,我走了一点,但仍然有些卡住.更新到connectionfactory属性:

I have since gotten a little further but still a bit stuck.Update to the connectionfactory property:

connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?amqp.idleTimeout=150000&jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSkuiLI%3D

我现在得到以下堆栈跟踪:

I now am getting the following stacktrace:

842 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
1014 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:543efe98-3ecc-485e-9f7f-3046c40db0cb:1 connected to remote Broker: amqps://example-open-bus-bus.servicebus.windows.net
1301 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] WARN org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder - Open of resource:(JmsProducerInfo { ID:546efe78-3ecc-485d-9f6f-3065c40db1ce:1:1:1, destination = klant }) failed: Attempted to perform an unauthorized operation. TrackingId:2950b1ed7a0d4e0a97b0k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
Caught exception, exiting.
javax.jms.JMSSecurityException: Attempted to perform an unauthorized operation. TrackingId:2890b0ed9a0d4e0a97b1k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:129)
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

推荐答案

较新的客户端默认情况下启用AMQP心跳/空闲超时,而较旧的客户端则默认不启用.客户端设置默认的60秒超时,这意味着按照规范定义的行为(对等方通告其实际超时的一半,当客户端连接到服务器时,它会在其AMQP开放帧中请求30秒(30000毫秒)的空闲超时值.以帮助避免虚假超时).

The newer client enables AMQP heartbeating/idle-timeout by default, while the older client did not. The client sets a default 60second timeout, and in turn this means it requests a 30sec (30000ms) idle-timeout value in its AMQP Open frame when connecting to the server, in accordance with the specifications defined behaviour (where peers advertise half their actual timeout to help avoid spurious timeouts).

ServiceBus拒绝30000ms开放帧值,并指示它至少需要60000ms(或者也可能是0,这意味着它被禁用).为此,您需要将客户端配置为至少将其超时设置为120000ms,这将导致所需的最小60000ms开放帧空闲超时值ServiceBus强制要求(或者再次通过设置客户端禁用超时处理)到0).

ServiceBus is refusing the 30000ms Open frame value, and indicating it needs a value of at least 60000ms (or presumably also 0, which means it is disabled). To acheive this you will need to configure the client to have its timeout set to at least 120000ms, which will result in the required minimum 60000ms Open frame idle-timeout value ServiceBus is mandating (or again, perhaps disable the clients timeout handling by setting it to 0).

您可以按照 http://qpid.apache.org/releases/qpid-jms-0.11.1/docs/index.html#amqp-configuration-options

我看到你在我输入答案的同时发现了这一点.

I see you figured that out at the same time I was typing my answer.

新的例外来自ServiceBus,它表示您无权执行您尝试做的事情.应该足够容易地从源头捕获异常并确定异常.

The new exception is from ServiceBus saying you arent authorized to do something you are trying. It should be easy enough to catch the exception at its source and determine what.

您的URI似乎很好(尽管我假设您的用户名实际上不是'somePolicy',并且在开始时出现double connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup =就是c& p错误).我还没有将客户与ServiceBus一起使用,但是我已经看到了来自各个客户的问题,因此我不知道有什么特定的问题会直接阻止他们一起工作.

Your URI seems fine (though I assume your username isnt actually 'somePolicy' and the double connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = at the start is a c&p error). I haven't used the client with ServiceBus personally, but I've seen questions from various folks that have, so I'm not aware of a particular issue outright stopping them working together.

这篇关于如何从Qpid JMS(qpid-jms-client-0.11.1.jar)从Azure Service Bus发送/接收消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-21 13:50