本文介绍了如何将Spark Scala映射字段合并到BQ?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个spark scala代码以将输出写入BQ,以下是用于形成具有两列(id和关键字)的输出表的代码

I am writing a spark scala code to write the output to BQ, The following are the codes used for forming the output table which has two columns (id and keywords)

val df1 = Seq("tamil", "telugu", "hindi").toDF("language")

val df2 = Seq(
  (101, Seq("tamildiary", "tamilkeyboard", "telugumovie")),
  (102, Seq("tamilmovie")),
  (103, Seq("hindirhymes", "hindimovie"))
).toDF("id", "keywords")

val pattern = concat(lit("^"), df1("language"), lit(".*"))

import org.apache.spark.sql.Row

val arrayToMap = udf{ (arr: Seq[Row]) =>
  arr.map{ case Row(k: String, v: Int) => (k, v) }.toMap
}

val final_df = df2.
  withColumn("keyword", explode($"keywords")).as("df2").
  join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword").
  groupBy("id", "language").agg(size(collect_list($"language")).as("count")).
  groupBy("id").agg(arrayToMap(collect_list(struct($"language", $"count"))).as("keywords"))

final_df的输出为:

The output of final_df is:

+---+--------------------+                                                      
| id|        app_language|
+---+--------------------+
|101|Map(tamil -> 2, t...|
|103|     Map(hindi -> 2)|
|102|     Map(tamil -> 1)|
+---+--------------------+

我正在定义以下函数来传递此输出表的架构.(由于BQ不支持map字段,所以我使用的是struct数组.但这也不起作用)

I am defining the below function to pass the schema for this output table. (Since BQ doesn't support map field, I am using array of struct. But this is also not working)

  def createTableIfNotExists(outputTable: String) = {

    spark.createBigQueryTable(
      s"""
         |CREATE TABLE IF NOT EXISTS $outputTable(
         |ds date,
         |id int64,
         |keywords ARRAY<STRUCT<key STRING, value INT64>>
         |)
         |PARTITION BY ds
         |CLUSTER BY user_id
       """.stripMargin)
    
  }

任何人都可以帮助我为此编写一个正确的架构,以便它与BQ兼容.

Could anyone please help me in writing a correct schema for this so that it's compatible in BQ.

推荐答案

您可以收集以下结构体数组:

You can collect an array of struct as below:

val final_df = df2
    .withColumn("keyword", explode($"keywords")).as("df2")
    .join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword")
    .groupBy("id", "language")
    .agg(size(collect_list($"language")).as("count"))
    .groupBy("id")
    .agg(collect_list(struct($"language", $"count")).as("app_language"))

final_df.show(false)
+---+-------------------------+
|id |app_language             |
+---+-------------------------+
|101|[[tamil, 2], [telugu, 1]]|
|103|[[hindi, 2]]             |
|102|[[tamil, 1]]             |
+---+-------------------------+

final_df.printSchema
root
 |-- id: integer (nullable = false)
 |-- app_language: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- language: string (nullable = true)
 |    |    |-- count: integer (nullable = false)

然后您可以拥有一个类似

And then you can have a schema like

def createTableIfNotExists(outputTable: String) = {

    spark.createBigQueryTable(
      s"""
         |CREATE TABLE IF NOT EXISTS $outputTable(
         |ds date,
         |id int64,
         |keywords ARRAY<STRUCT<language STRING, count INT64>>
         |)
         |PARTITION BY ds
         |CLUSTER BY user_id
       """.stripMargin)
    
  }

这篇关于如何将Spark Scala映射字段合并到BQ?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-13 21:30