本文介绍了如何解决DataSet.toJSON与结构化流不兼容的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将Twitter的数据写入Kafka.出于教育目的,我尝试使用结构化流来做到这一点.我已经基于socket-Source创建了一个Twitter-Source,效果很好.

I want to write data from Twitter into Kafka. For educational purposes, I try to do this using Structured Streaming. I have created a Twitter-Source, based on the socket-Source, which works well.

我将源设置如下:

val tweets = spark
  .readStream
  .format("twitter")
  .option("query", terms)
  .load()
  .as[SparkTweet]

这为我提供了一个很好的用于分析查询的数据集.太好了!

This gives me a nice DataSet for analytical queries. Great!

接下来,我想将稍有火花的模式中的每个推文保留到Kafka中:

Next I want to persist each tweet in the slightly sparkified schema into Kafka:

val kafkaOutStream = tweets
  .toJSON.as("value")
  .writeStream
  .queryName("stream_to_kafka")
  .outputMode(OutputMode.Append())
  .trigger(Trigger.ProcessingTime("1 second"))
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","tweets")
  .start

那很容易!除此以外,它不起作用.在QueryExecution.scala中,呼叫进入assertSupported并最终被抛出,因为

That's easy! Except, it doesn't work. In QueryExecution.scala the call passes into assertSupported and eventually gets thrown out, because

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    Queries with streaming sources must be executed with writeStream.start();;

我没想到toJSON是一个纯批处理操作,但是没有它,而是使用说select($"text" as "value")的代码可以工作.

I didn't expect toJSON to be a pure batch-op, but without it, and using say select($"text" as "value") instead, the code will work.

现在,我有点不知所措,希望有人解释为什么toJSON不应该与流兼容(这是一个错误吗?缺少一项功能吗?),并告诉是否有一种结构化的流式获取方法.将我的对象序列化表示为Kafka.

Now, I'm slightly flabbergasted and would love for someone to explain why toJSON shouldn't be streaming-compatible (is it a bug? a missing feature?), and tell whether there's a Structured Streaming-way of getting a serialized representation of my object into Kafka.

推荐答案

虽然有点冗长,但是to_json函数应该可以解决问题:

It is a bit verbose but to_json function should do the the trick:

import org.apache.spark.sql.functions.{to_json, struct, col}

tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
  .writeStream
  ...

toJSON的问题似乎是此转换为RDD :

val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
  ...

和(如)似乎已在开发版本中解决.

and (as pointed out by maasg in the comments) seems to be already resolved in the development version.

这篇关于如何解决DataSet.toJSON与结构化流不兼容的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-27 18:10