问题描述
im尝试将我的结构化流Spark 2.4.5与kafka连接,但是出现所有尝试此Data Source Provider错误的情况.遵循我的Scala代码和sbt构建:
im trying to connect my structured streaming spark 2.4.5 with kafka, but all the times that im trying this Data Source Provider errors appears.Follow my scala code and my sbt build:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object streaming_app_demo {
def main(args: Array[String]): Unit = {
println("Spark Structured Streaming with Kafka Demo Application Started ...")
val KAFKA_TOPIC_NAME_CONS = "test"
val KAFKA_OUTPUT_TOPIC_NAME_CONS = "test"
val KAFKA_BOOTSTRAP_SERVERS_CONS = "localhost:9092"
val spark = SparkSession.builder
.master("local[*]")
.appName("Spark Structured Streaming with Kafka Demo")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// Stream from Kafka
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
.load()
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test2")
.start()
}
}
错误是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at streaming_app_demo$.main(teste.scala:29)
at streaming_app_demo.main(teste.scala)
我的sbt.build是:
And my sbt.build is:
name := "scala_212"
version := "0.1"
scalaVersion := "2.12.11"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" % "provided"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0"
谢谢!
推荐答案
对于,此spark-sql-kafka-0-10库是必需的.
For spark structured streaming + kafka
, this spark-sql-kafka-0-10 library required.
您将收到此 org.apache.spark.sql.AnalysisException:无法找到数据源:kafka
异常,因为 spark-sql-kafka
库在该库中不可用您的课程路径和无法在META-INF/services文件夹内找到 org.apache.spark.sql.sources.DataSourceRegister
.
You are getting this org.apache.spark.sql.AnalysisException: Failed to find data source: kafka
exception because spark-sql-kafka
library is not available in your classpath & It is unable to find org.apache.spark.sql.sources.DataSourceRegister
inside META-INF/services folder.
DataSourceRegister路径
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
更新
如果您使用的是SBT,请尝试添加以下代码块.这将在您的最终jar中包含 org.apache.spark.sql.sources.DataSourceRegister
文件.
If you are using SBT, try add below code block. This will include org.apache.spark.sql.sources.DataSourceRegister
file in your final jar.
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
case PathList("META-INF",xs @ _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
这篇关于连接Spark结构化流+ Kafka时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!