问题描述
我有一个 Cassandra 表,为了简单起见,它看起来像:
I have a Cassandra table that for simplicity looks something like:
key: text
jsonData: text
blobData: blob
我可以使用 spark 和 spark-cassandra-connector 为此创建一个基本数据框:
I can create a basic data frame for this using spark and the spark-cassandra-connector using:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
我正在努力将 JSON 数据扩展到其底层结构.我最终希望能够根据 json 字符串中的属性进行过滤并返回 blob 数据.类似于 jsonData.foo = "bar" 并返回 blobData.这目前可能吗?
I'm struggling though to expand the JSON data into its underlying structure. I ultimately want to be able to filter based on the attributes within the json string and return the blob data. Something like jsonData.foo = "bar" and return blobData. Is this currently possible?
推荐答案
Spark >= 2.4
如果需要,可以使用 schema_of_json
函数(请注意,这假定任意行是模式的有效代表).
If needed, schema can be determined using schema_of_json
function (please note that this assumes that an arbitrary row is a valid representative of the schema).
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
Spark >= 2.1
您可以使用 from_json
功能:
You can use from_json
function:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark >= 1.6
你可以使用 get_json_object
它接受一列和一个路径:
You can use get_json_object
which takes a column and a path:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
并将字段提取到可以进一步转换为预期类型的单个字符串.
and extracts fields to individual strings which can be further casted to expected types.
path
参数使用点语法表示,前导 $.
表示文档根(因为上面的代码使用字符串插值 $
被转义,因此 $$.
).
The path
argument is expressed using dot syntax, with leading $.
denoting document root (since the code above uses string interpolation $
has to be escaped, hence $$.
).
火花 :
目前有可能吗?
据我所知,这不是直接可能的.您可以尝试类似的操作:
As far as I know it is not directly possible. You can try something similar to this:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
我假设 blob
字段不能用 JSON 表示.否则,您将省略拆分和加入:
I assume that blob
field cannot be represented in JSON. Otherwise you cab omit splitting and joining:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
另一种(更便宜但更复杂)的方法是使用 UDF 来解析 JSON 并输出 struct
或 map
列.例如这样的事情:
An alternative (cheaper, although more complex) approach is to use an UDF to parse JSON and output a struct
or map
column. For example something like this:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)
这篇关于如何使用 Spark DataFrames 查询 JSON 数据列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!