问题描述
我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助.
我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理.
我的输入json是
[{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 1502715600, "value": 35.74 },{"ts": 1502715660, "value": 35.65 },{"ts": 1502715720, "value": 35.58 },{ts":1502715780,价值":35.55 }]},{ "dataseries": "trend-256", "values":[{"ts": 1502715840, "value": 18.45 },{"ts": 1502715900, "value": 18.35 },{ts":1502715960,价值":18.32 }]}]},{ "siteId": "30:47:47:BE:16:FF", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 1502715600, "value": 35.74 },{"ts": 1502715660, "value": 35.65 },{"ts": 1502715720, "value": 35.58 },{ts":1502715780,价值":35.55 }]},{ "dataseries": "trend-256", "values":[{"ts": 1502715840, "value": 18.45 },{"ts": 1502715900, "value": 18.35 },{ts":1502715960,价值":18.32 }]}]}]Spark 架构是
data1_spark_schema = ArrayType(结构类型([StructField("siteId", StringType(), False),StructField("siteData", ArrayType(StructType([StructField("dataseries", StringType(), False),StructField("values", ArrayType(StructType([StructField("ts", IntegerType(), False),StructField("value", StringType(), False)]), 错误), 错误)]), 错误), 错误)]), 错误的)
我非常简单的代码是:
from pyspark.sql import SparkSession从 pyspark.sql.functions 导入 *从 config.general 导入 kafka_instance从 config.general 导入主题从 schemas.schema 导入 data1_spark_schema火花 = SparkSession \.builder \.appName("Structured_BMS_Feed") \.getOrCreate()流 = 火花 \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", kafka_instance) \.option("订阅", 主题) \.option("startingOffsets", "最新") \.option("max.poll.records", 100) \.option("failOnDataLoss", False) \.加载()stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1") \.select(from_json("bms_data1", data1_spark_schema).alias("bms_data1"))站点 = stream_records.select(explode("bms_data1").alias("site")) \.select("站点.*")站点.printSchema()stream_debug = sites.writeStream \.outputMode("追加") \.format("控制台") \.option("numRows", 20) \.option("truncate", False) \.开始()stream_debug.awaitTermination()
当我运行此代码时,我的模式打印如下:
root|-- siteId: string (nullable = false)|-- 站点数据:数组(可为空 = false)||-- 元素: struct (containsNull = false)|||-- 数据系列:字符串(可为空 = 假)|||-- 值:数组(可为空 = false)||||-- 元素: struct (containsNull = false)|||||-- ts: 整数(可为空 = false)|||||-- 值:字符串(可为空 = false)
是否有可能以某种方式获得此架构,即在平面数据框中获取所有字段而不是嵌套的 JSON.因此,对于每个 ts 和 value,它应该给我一行,其中包含其父数据系列和站点 ID.
回答我自己的问题.我设法使用以下几行将其展平:
sites_flat = stream_records.select(explode("bms_data1").alias("site")) \.select("site.siteId",explode("site.siteData").alias("siteData")) \.select("siteId", "siteData.dataseries",explode("siteData.values").alias("values")) \.select("siteId", "dataseries", "values.*")
need some help on my first attempt to parse JSON coming on Kafka to Spark structured streaming.
I am struggling to convert the incoming JSON and covert it into flat dataframe for further processing.
My input json is
[
{ "siteId": "30:47:47:BE:16:8F", "siteData":
[
{ "dataseries": "trend-255", "values":
[
{"ts": 1502715600, "value": 35.74 },
{"ts": 1502715660, "value": 35.65 },
{"ts": 1502715720, "value": 35.58 },
{"ts": 1502715780, "value": 35.55 }
]
},
{ "dataseries": "trend-256", "values":
[
{"ts": 1502715840, "value": 18.45 },
{"ts": 1502715900, "value": 18.35 },
{"ts": 1502715960, "value": 18.32 }
]
}
]
},
{ "siteId": "30:47:47:BE:16:FF", "siteData":
[
{ "dataseries": "trend-255", "values":
[
{"ts": 1502715600, "value": 35.74 },
{"ts": 1502715660, "value": 35.65 },
{"ts": 1502715720, "value": 35.58 },
{"ts": 1502715780, "value": 35.55 }
]
},
{ "dataseries": "trend-256", "values":
[
{"ts": 1502715840, "value": 18.45 },
{"ts": 1502715900, "value": 18.35 },
{"ts": 1502715960, "value": 18.32 }
]
}
]
}
]
Spark schema is
data1_spark_schema = ArrayType(
StructType([
StructField("siteId", StringType(), False),
StructField("siteData", ArrayType(StructType([
StructField("dataseries", StringType(), False),
StructField("values", ArrayType(StructType([
StructField("ts", IntegerType(), False),
StructField("value", StringType(), False)
]), False), False)
]), False), False)
]), False
)
My very simple code is:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from config.general import kafka_instance
from config.general import topic
from schemas.schema import data1_spark_schema
spark = SparkSession \
.builder \
.appName("Structured_BMS_Feed") \
.getOrCreate()
stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_instance) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("max.poll.records", 100) \
.option("failOnDataLoss", False) \
.load()
stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1") \
.select(from_json("bms_data1", data1_spark_schema).alias("bms_data1"))
sites = stream_records.select(explode("bms_data1").alias("site")) \
.select("site.*")
sites.printSchema()
stream_debug = sites.writeStream \
.outputMode("append") \
.format("console") \
.option("numRows", 20) \
.option("truncate", False) \
.start()
stream_debug.awaitTermination()
When I run this code I schema is printing like this:
root
|-- siteId: string (nullable = false)
|-- siteData: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- dataseries: string (nullable = false)
| | |-- values: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- ts: integer (nullable = false)
| | | | |-- value: string (nullable = false)
Is it possible to have this schema in a way where I get all fields in a flat dataframe instead of nested JSON. So for every ts and value it should give me one row with its parent dataseries and site id.
Answering my own question. I managed to flatten it using following lines:
sites_flat = stream_records.select(explode("bms_data1").alias("site")) \
.select("site.siteId", explode("site.siteData").alias("siteData")) \
.select("siteId", "siteData.dataseries", explode("siteData.values").alias("values")) \
.select("siteId", "dataseries", "values.*")
这篇关于如何将 kafka 上的火花流嵌套 json 转换为平面数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!