问题描述
我正在尝试对场景进行一些分析.我有一个示例数据集,如下所示:
I am trying to do some analysis on sets. I have a sample data set that looks like this:
orders.json
orders.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
所有这些都是一个字段,它是表示ID的数字的列表.
All it is, is a single field that is a list of numbers that represent IDs.
这是我要运行的Spark脚本:
Here is the Spark script I am trying to run:
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
概括地说,创建expanded
和grouped
很好,expanded
是两个ID的所有可能集合的列表,其中两个ID处于同一原始集中. grouped
过滤出与自身匹配的ID,然后将所有唯一的ID对组合在一起,并为每个ID产生一个计数. grouped
的模式和数据示例为:
Creating expanded
and grouped
is fine, in a nutshell expanded
is a list of all the possible sets of two IDs where the two IDs were in the same original set. grouped
filters out IDs that were matched with themselves, then groups together all the unique pairs of IDs and produces a count for each. The schema and data sample of grouped
are:
root
|-- item1: long (nullable = true)
|-- item2: long (nullable = true)
|-- count: long (nullable = false)
[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...
所以,我的问题是:现在如何将每个结果中的第一项分组,以便获得元组列表?对于上面的示例数据,我期望与此类似:
So, my question is: how do I now group on the first item in each result so that I have a list of tuples? For the example data above, I would expect something similar to this:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
正如您在我的脚本中使用recs
所看到的那样,我认为您应该首先对每一行的第一项'item1'进行groupBy.但是之后,您将剩下这个GroupedData对象,该对象具有非常有限的操作.真的,您只需要进行求和,平均等聚合操作.我只想从每个结果中列出元组.
As you can see in my script with recs
, I thought you would start by doing a groupBy on 'item1' which is the first item in each row. But after that you are left with this GroupedData object that has very limited actions on it. Really, you are only left with doing aggregations like sum, avg, etc. I just want to list the tuples from each result.
在这一点上,我可以轻松地使用RDD函数,但这与使用数据框不同.有没有办法使用数据框函数来做到这一点.
I could easily use RDD functions at this point, but that departs from using Dataframes. Is there a way to do this with the dataframe functions.
推荐答案
您可以使用自1.6以后可用的org.apache.spark.sql.functions
(collect_list
和struct
)进行构建
You can build that with org.apache.spark.sql.functions
(collect_list
and struct
) available since 1.6
val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set"))
+-----+----------------------------+
|item1|set |
+-----+----------------------------+
|1 |[[5,3], [4,1], [3,2], [2,2]]|
|2 |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+
您也可以使用collect_set
有关信息,tuples
在数据框中不存在.最接近的结构为struct
,因为它们与无类型数据集API中的案例类等效.
for information, tuples
don't exist in dataframes. The closest structures are struct
since they are the equivalent of case classes in the untyped dataset API.
还请注意,collect_set
附带一个警告,即结果实际上不是集合(SQL类型中没有具有集合属性的数据类型).这意味着您可以得到不同的集合",这些集合"的顺序不同(至少在2.1.0版中).然后必须用sort_array
对其进行排序.
Edit 2: Also be warned that collect_set
comes with the caveat that the result is actually not a set (there is no datatype with set properties in the SQL types). That means that you can end up with distinct "sets" which differ by their order (in version 2.1.0 at least). Sorting them with sort_array
is then necessary.
这篇关于火花数据框分组到列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!