如何在结构化流中获取DataFrame

如何在结构化流中获取DataFrame

本文介绍了如何在结构化流中获取DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从MQTT接收JSON字符串并将其解析为DataFrames df .我该怎么办?

I want to receive JSON strings from MQTT and parse them to DataFrames df. How can I do it?

这是我发送到MQTT队列以便在Spark中处理的Json消息的示例:

This is an example of Json message that I send to MQTT queue in order to process in Spark:

{
"id": 1,
"timestamp": 1532609003,
"distances": [2,5,7,8]
}

这是我的代码:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[4]") \
    .getOrCreate()

# Custom Structured Streaming receiver
reader = spark\
             .readStream\
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
             .option("topic","uwb/distances")\
             .option('brokerUrl', 'tcp://127.0.0.1:1883')\
             .load()\
             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")


df = spark.read.json(reader.select("value").rdd)

# Start running the query that prints the running counts to the console
query = df \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

但是此代码失败:

py4j.protocol.Py4JJavaError: An error occurred while calling o45.javaToPython.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt

我尝试如下添加 start :

df = spark.read.json(reader.select("value").rdd) \
    .writeStream \
    .format('console') \
    .start()

但是出现了同样的错误.我的目标是获得一个DataFrame df ,我可以将其进一步传递给ETL流程.

But got the same error. My goal is to get a DataFrame df that I can further pass through ETL processes.

更新:

标记为答案的主题并没有帮助我解决问题.首先,当我使用PySpark时,它为Scala提供了解决方案.其次,我测试了答案中提出的解决方案,并返回了空列 json :

The thread marked as an answer has not helped me solving the problem. First of all it gives the solution for Scala, while I am using PySpark.Secondly, I tested the solution proposed in the answer and it returned me the empty column json:

reader = spark\
             .readStream\
             .schema(spark.read.json("mqtt_schema.json").schema) \
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
             .option("topic","uwb/distances")\
             .option('brokerUrl', 'tcp://127.0.0.1:1883')\
             .load()\
             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")

json_schema = spark.read.json("mqtt_schema.json").schema
df = reader.withColumn('json', from_json(col('value'), json_schema))

query = df \
    .writeStream \
    .format('console') \
    .start()

推荐答案

我想这是因为您的df没有流式传输. reader.select("value").writestream

I guess it is because your df is not streaming.How about try justreader.select("value").writestream

这篇关于如何在结构化流中获取DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:17