使用elasticsearch-hadoop库,我想直接从ElasticSearch读取数据到Spark数据集。但是,该API返回RDD [(String,Map [String,Any])],其中元组的第一个元素是文档名称,第二个( map )是数据本身。
我想将其转换为Dataset [T],其中T是某种案例类,以使返回的数据更易于使用。我会考虑使用其他一些库(找不到任何库)或简洁的代码解决方案。
最佳答案
我写了一个函数stringMapRddToDataset
来做到这一点。我觉得应该有一个整体上更好的方法来执行此操作……还担心此解决方案的效率,但是我尚未在大量数据上进行过测试。
private def mapToSparkRow(map: collection.Map[String, Any], orderedFields: List[StructField]): Row = {
val orderedValues = orderedFields.map { field =>
val columnValue = map.getOrElse(field.name, null)
field.dataType match {
case nestedField: StructType =>
mapToSparkRow(columnValue.asInstanceOf[Map[String, Any]], nestedField.toList)
case notNested => columnValue
}
}
Row(orderedValues: _*)
}
def stringMapRddToDataset[T: Encoder](rdd: RDD[collection.Map[String, Any]])(
implicit spark: SparkSession): Dataset[T] = {
val encoder = implicitly[Encoder[T]]
val rddOfRows: RDD[Row] = rdd.map(mapToSparkRow(_, encoder.schema.toList))
val df = spark.createDataFrame(rddOfRows, encoder.schema)
df.as[T]
}
关于apache-spark - 将数据从ElasticSearch读取到Spark数据集中,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/62951744/