我正在使用Spark的结构化流(2.2.1),使用Kafka每60秒从传感器接收一次数据。我很难解决如何打包此Kafka Data以便能够正确处理的问题。
当数据随Kafka一起提供时,我需要能够进行一些计算。
我的问题是将来自Kafka的JSON数据解压缩到我可以使用的数据集中
数据
简化的数据如下所示:
{
id: 1,
timestamp: "timestamp"
pump: {
current: 1.0,
flow: 20.0
torque: 5.0
},
reactors: [
{
id: 1,
status: 200,
},
{
id: 2,
status: 300,
}
],
settings: {
pumpTimer: 20.0,
reactorStatusTimer: 200.0
}
}
为了能够与Spark一起使用,我为其中的每一个创建了一些case类结构:
// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
并使用以下方式生成架构:
val rawDataSchema = Encoders.product[RawData].schema
原始数据到Spark模式
首先,我将Kafka的“值”字段放入我的一般模式中:
val rawDataSet = df.select($"value" cast "string" as "json")
.select(from_json($"json", rawDataSchema))
.select("data.*").as[RawData]
使用此rawDataSet,我可以将每个单独的对象打包到数据集中。
val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
.select("pumpData.*").as[Pump]
val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
.select("settingsData.*").as[Settings]
这为我提供了每个JSON对象漂亮又干净的数据集。
处理数据
这是我的问题,例如,如果要比较或计算“设置”和“泵”的两个数据集之间的某些值,则JOIN无法使用结构化流工作。
val joinedData = pump.join(settings)
错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;
我的处理方法是否全部错误?还是对处理此问题的其他方法有任何建议?
谢谢
最佳答案
我将使用现在可以使用的解决方案回答我自己的问题
不用为JSON中的每个对象创建案例类,我可以将它们作为一个带有嵌套对象的案例类连接在一起,如下所示:
case class RawData(
id: String,
timestamp: String,
pump: Pump,
reactors: Array[Reactor],
settings: Settings
)
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
为了使它成为可用的数据集,我可以简单地调用
val rawDataset = df.select($"value" cast "string" as "json")
.select(from_json($"json", Encoders.product[RawData].schema) as 'data)
.select("data.*").as[RawData]
.withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.
处理完JSON并将其放入我的定义模式后,我可以选择每个特定的传感器,如下所示:
val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)
谢谢user6910411为我指出正确的方向
关于apache-spark - 结构化流式处理并将嵌套数据拆分为多个数据集,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49597331/