问题描述
我有一个数据帧joinDf
是通过在userId
上加入以下四个数据帧而创建的:
I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"[email protected]"),
(123,"[email protected]"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
的食物和游戏最爱应按分数升序排列.
User
's food and game favorites should be ordered by score in the ascending order.
我正在尝试从此joinDf
创建结果,其中JSON如下所示:
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
{
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
{
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
},
{
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
},
{
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
}
]
"UserEmail": [
"[email protected]",
"[email protected]"
]
"UserGameFavourites": [
{
"gameName": "football",
"isOutdoor": "true"
},
{
"gameName": "chess",
"isOutdoor": "false"
}
]
}
]
我应该使用joinDf.groupBy().agg(collect_set())
吗?
任何帮助将不胜感激.
推荐答案
我的解决方案基于找到的答案和此处
My solution is based on the answers found here and here
它使用窗口功能.它显示了如何基于食物得分为给定的userid
创建食物偏好的嵌套列表.在这里,我们从具有的列中创建FoodDetails
的struct
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
在这里,我们正在创建一个Windowing函数,该函数将基于FoodDetails.score
降序累积userId
的列表.当应用窗口功能时,它将继续累积列表,因为它遇到具有相同userId
的新行.完成累加后,我们必须对userId
进行groupBy
选择最大的列表.
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
这是最终结果的样子:
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
鉴于此数据框,应该更容易写出嵌套的json.
Given this dataframe, it should be easier to write out the nested json.
这篇关于将数据框火花到嵌套JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!