问题描述
我想将Twitter的数据写入Kafka.出于教育目的,我尝试使用结构化流来做到这一点.我已经基于socket-Source创建了一个Twitter-Source,效果很好.
I want to write data from Twitter into Kafka. For educational purposes, I try to do this using Structured Streaming. I have created a Twitter-Source, based on the socket-Source, which works well.
我将源设置如下:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
这为我提供了一个很好的用于分析查询的数据集.太好了!
This gives me a nice DataSet for analytical queries. Great!
接下来,我想将稍有火花的模式中的每个推文保留到Kafka中:
Next I want to persist each tweet in the slightly sparkified schema into Kafka:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
那很容易!除此以外,它不起作用.在QueryExecution.scala
中,呼叫进入assertSupported
并最终被抛出,因为
That's easy! Except, it doesn't work. In QueryExecution.scala
the call passes into assertSupported
and eventually gets thrown out, because
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
我没想到toJSON
是一个纯批处理操作,但是没有它,而是使用说select($"text" as "value")
的代码可以工作.
I didn't expect toJSON
to be a pure batch-op, but without it, and using say select($"text" as "value")
instead, the code will work.
现在,我有点不知所措,希望有人解释为什么toJSON不应该与流兼容(这是一个错误吗?缺少一项功能吗?),并告诉是否有一种结构化的流式获取方法.将我的对象序列化表示为Kafka.
Now, I'm slightly flabbergasted and would love for someone to explain why toJSON shouldn't be streaming-compatible (is it a bug? a missing feature?), and tell whether there's a Structured Streaming-way of getting a serialized representation of my object into Kafka.
推荐答案
虽然有点冗长,但是to_json
函数应该可以解决问题:
It is a bit verbose but to_json
function should do the the trick:
import org.apache.spark.sql.functions.{to_json, struct, col}
tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
.writeStream
...
toJSON
的问题似乎是此转换为RDD :
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
...
和(如)似乎已在开发版本中解决.
and (as pointed out by maasg in the comments) seems to be already resolved in the development version.
这篇关于如何解决DataSet.toJSON与结构化流不兼容的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!