问题描述
我正在编写一个spark scala代码以将输出写入BQ,以下是用于形成具有两列(id和关键字)的输出表的代码
I am writing a spark scala code to write the output to BQ, The following are the codes used for forming the output table which has two columns (id and keywords)
val df1 = Seq("tamil", "telugu", "hindi").toDF("language")
val df2 = Seq(
(101, Seq("tamildiary", "tamilkeyboard", "telugumovie")),
(102, Seq("tamilmovie")),
(103, Seq("hindirhymes", "hindimovie"))
).toDF("id", "keywords")
val pattern = concat(lit("^"), df1("language"), lit(".*"))
import org.apache.spark.sql.Row
val arrayToMap = udf{ (arr: Seq[Row]) =>
arr.map{ case Row(k: String, v: Int) => (k, v) }.toMap
}
val final_df = df2.
withColumn("keyword", explode($"keywords")).as("df2").
join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword").
groupBy("id", "language").agg(size(collect_list($"language")).as("count")).
groupBy("id").agg(arrayToMap(collect_list(struct($"language", $"count"))).as("keywords"))
final_df的输出为:
The output of final_df is:
+---+--------------------+
| id| app_language|
+---+--------------------+
|101|Map(tamil -> 2, t...|
|103| Map(hindi -> 2)|
|102| Map(tamil -> 1)|
+---+--------------------+
我正在定义以下函数来传递此输出表的架构.(由于BQ不支持map字段,所以我使用的是struct数组.但这也不起作用)
I am defining the below function to pass the schema for this output table. (Since BQ doesn't support map field, I am using array of struct. But this is also not working)
def createTableIfNotExists(outputTable: String) = {
spark.createBigQueryTable(
s"""
|CREATE TABLE IF NOT EXISTS $outputTable(
|ds date,
|id int64,
|keywords ARRAY<STRUCT<key STRING, value INT64>>
|)
|PARTITION BY ds
|CLUSTER BY user_id
""".stripMargin)
}
任何人都可以帮助我为此编写一个正确的架构,以便它与BQ兼容.
Could anyone please help me in writing a correct schema for this so that it's compatible in BQ.
推荐答案
您可以收集以下结构体数组:
You can collect an array of struct as below:
val final_df = df2
.withColumn("keyword", explode($"keywords")).as("df2")
.join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword")
.groupBy("id", "language")
.agg(size(collect_list($"language")).as("count"))
.groupBy("id")
.agg(collect_list(struct($"language", $"count")).as("app_language"))
final_df.show(false)
+---+-------------------------+
|id |app_language |
+---+-------------------------+
|101|[[tamil, 2], [telugu, 1]]|
|103|[[hindi, 2]] |
|102|[[tamil, 1]] |
+---+-------------------------+
final_df.printSchema
root
|-- id: integer (nullable = false)
|-- app_language: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- language: string (nullable = true)
| | |-- count: integer (nullable = false)
然后您可以拥有一个类似
And then you can have a schema like
def createTableIfNotExists(outputTable: String) = {
spark.createBigQueryTable(
s"""
|CREATE TABLE IF NOT EXISTS $outputTable(
|ds date,
|id int64,
|keywords ARRAY<STRUCT<language STRING, count INT64>>
|)
|PARTITION BY ds
|CLUSTER BY user_id
""".stripMargin)
}
这篇关于如何将Spark Scala映射字段合并到BQ?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!