问题描述
我如何查询复杂的类型,如地图/阵列的RDD?
例如,当我在写这个测试code:
How Can I query an RDD with complex types such as maps/arrays?for example, when I was writing this test code:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
我虽然语法会是这样的:
I though the syntax would be something like:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
或
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
但我得到
不能在类型的MapType访问嵌套的字段(StringType,StringType,真实)
和
org.apache.spark.sql.catalyst.errors.package $ TreeNodeException:未解决的属性。
分别。
推荐答案
这取决于列的类型。让我们先从一些虚拟数据:
It depends on a type of the column. Lets start with some dummy data:
import org.apache.spark.sql.functions.{udf, lit}
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
-
数组列:
array columns:
-
Column.getItem
法
df.select($"an_array".getItem(1)).show // +-----------+ // |an_array[1]| // +-----------+ // | 2| // | 5| // +-----------+
-
-
蜂巢括号语法:
Hive brackets syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show // +---+ // |_c0| // +---+ // | 2| // | 5| // +---+
-
这是UDF
an UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption) df.select(get_ith($"an_array", lit(1))).show // +---------------+ // |UDF(an_array,1)| // +---------------+ // | 2| // | 5| // +---------------+
-
使用
Column.getField
方法:
图列
df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+
使用蜂巢括号语法:
using Hive brackets syntax:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+
使用带有点语法完整路径:
using a full path with dot syntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+
使用UDF
using an UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+
使用带有点语法完整路径结构列:
struct columns using full path with dot syntax:
-
通过数据帧API
with DataFrame API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
与原始SQL
with raw SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
在结构
的数组字段可以使用点语法,名称和标准列
方法访问
fields inside array of structs
can be accessed using dot-syntax, names and standard Column
methods:
df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+
用户定义类型(UDT)字段可以使用UDF的访问。请参见详情属性。
备注
- 取决于星火版本的一些方法可以只用
HiveContext
。 UDF的应该工作的独立版本,采用标准的SQLContext
和HiveContext
。 -
一般来说嵌套的值是二等公民。并非所有的典型操作支持嵌套的领域。根据上下文它可以更好地扁平化的架构和/或爆炸集合
- depending on a Spark version some of these methods can be available only with
HiveContext
. UDFs should work independent of version with both standardSQLContext
andHiveContext
. generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+
这篇关于星火查询SQL数据框与复杂类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!