本文介绍了使用MQTTutils的Spark流媒体通过身份验证从activemq订阅主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

看来MQTTUtils仅提供三种方法,def createStream(jssc:JavaStreamingContext,brokerUrl:字符串,主题:字符串,storageLevel:存储级别):JavaDStream [String]

It seems MQTTUtils Only provide three methods,def createStream(jssc: JavaStreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel): JavaDStream[String]

创建一个输入流,以接收MQTT发布者推送的消息.def createStream(jssc:JavaStreamingContext,brokerUrl:字符串,主题:字符串):JavaDStream [String]

Create an input stream that receives messages pushed by a MQTT publisher.def createStream(jssc: JavaStreamingContext, brokerUrl: String, topic: String): JavaDStream[String]

创建一个输入流,以接收MQTT发布者推送的消息.def createStream(ssc:StreamingContext,brokerUrl:字符串,主题:字符串,storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2):DStream [String]

Create an input stream that receives messages pushed by a MQTT publisher.def createStream(ssc: StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[String]

创建一个输入流,以接收MQTT发布者推送的消息.

Create an input stream that receives messages pushed by a MQTT publisher.

但是如果代理启用了身份验证,如何提供用户名和密码?

But How can I provide username and password if the broker enabled authentication?

推荐答案

您可以尝试使用此处提供的自定义spark-streaming-mqtt-connector库- https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0

You can trying using the customized spark-streaming-mqtt-connector library available here - https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0.

此库在原始库的顶部添加了以下内容,

This library adds the following on top of the original library,

  • 添加了TLS v1.2安全性,以确保始终保持通信安全.
  • 存储的主题以及RDD中的有效负载.

因此,请使用以下方法创建流,

So, use the following method to create the stream,

val lines = MQTTUtils.createStream(ssc, // Spark Streaming Context
            "ssl://URL",                // Broker URL
            "<topic>",                 // MQTT topic
            "MQTT client-ID",          // Unique ID of the application
            "Username", 
            "passowrd")

有些重载的构造函数使您也可以传递RDD存储级别.希望这会有所帮助.

There are overloaded constructors that allows you to pass the RDD storage level as well. Hope this helps.

这篇关于使用MQTTutils的Spark流媒体通过身份验证从activemq订阅主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 17:34