本文介绍了如何在转换为DF时将Kafka中的时间戳添加到Spark Streaming的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我在做卡夫卡的火花直播。我想把我的RDD从卡夫卡转换成数据帧。我正在使用以下方法。Val SSC=new StreamingContext("local[*]","KafkaExample",Second(4))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "dofff2.dl.uk.feefr.com:8002",
"security.protocol" -> "SASL_PLAINTEXT",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("csv")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val strmk = stream.map(record => (record.value))
val rdd1 = strmk.map(line => line.split(',')).map(s => (s(0).toString, s(1).toString,s(2).toString,s(3).toString,s(4).toString, s(5).toString,s(6).toString,s(7).toString))
rdd1.foreachRDD((rdd, time) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val requestsDataFrame = rdd.map(w => Record(w._1, w._2, w._3,w._4, w._5, w._6,w._7, w._8)).toDF()
requestsDataFrame.createOrReplaceTempView("requests")
val word_df =sqlContext.sql("select * from requests ")
println(s"========= $time =========")
word_df.show()
})
但在数据帧中,我还想包括来自卡夫卡的时间戳。有人能帮我怎么做吗?
推荐答案
卡夫卡记录有多种属性。
参见https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
请注意,对于Kafka,有一种流和批处理方法。
示例:
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "AAA")
.option("startingOffsets", "earliest")
.load()
//.as[String]
//
//.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)")
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.writeStream
.format("console")
.option("truncate", "false")
.outputMode(OutputMode.Append())
.start().awaitTermination()
我的示例输出如下:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+-----------------------+
|key |value|timestamp |
+----+-----+-----------------------+
|null|RRR |2019-02-07 04:37:34.983|
|null|HHH |2019-02-07 04:37:36.802|
|null|JJJ |2019-02-07 04:37:39.1 |
+----+-----+-----------------------+
您只需展开上面的语句:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
这篇关于如何在转换为DF时将Kafka中的时间戳添加到Spark Streaming的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!