问题描述
我是 Apache Spark 和 Scala 的新手,目前正在学习这个框架和大数据编程语言.我有一个示例文件,我试图找出另一个字段的给定字段总数及其计数和来自另一个字段的值列表.我自己尝试过,似乎我没有在 spark rdd
(作为开始)中以更好的方法编写.
I am new to Apache Spark as well as Scala, currently learning this framework and programming language for big data. I have a sample file I am trying to find out for a given field total number of another field and its count and list of values from another field. I tried on my own and seems that i am not writing in better approach in spark rdd
(as starting).
请找到以下示例数据(Customerid: Int, Orderid: Int, Amount: Float)
:
44,8602,37.19
35,5368,65.89
2,3391,40.64
47,6694,14.98
29,680,13.08
91,8900,24.59
70,3959,68.68
85,1733,28.53
53,9900,83.55
14,1505,4.32
51,3378,19.80
42,6926,57.77
2,4424,55.77
79,9291,33.17
50,3901,23.57
20,6633,6.49
15,6148,65.53
44,8331,99.19
5,3505,64.18
48,5539,32.42
我当前的代码:
((sc.textFile("file://../customer-orders.csv").map(x => x.split(",")).map(x => (x(0).toInt,x(1).toInt)).map{case(x,y) => (x, List(y))}.reduceByKey(_ ++ _).sortBy(_._1,true)).
fullOuterJoin(sc.textFile("file://../customer-orders.csv").map(x =>x.split(",")).map(x => (x(0).toInt,x(2).toFloat)).reduceByKey((x,y) => (x + y)).sortBy(_._1,true))).
fullOuterJoin(sc.textFile("file://../customer-orders.csv").map(x =>x.split(",")).map(x => (x(0).toInt)).map(x => (x,1)).reduceByKey((x,y) => (x + y)).sortBy(_._1,true)).sortBy(_._1,true).take(50).foreach(println)
得到这样的结果:
(49,(Some((Some(List(8558, 6986, 686....)),Some(4394.5996))),Some(96)))
预期结果如下:
customerid, (orderids,..,..,....), totalamount, number of orderids
有没有更好的办法?我刚刚用下面的代码尝试了 combineByKey
但里面的 println
没有打印.
Is there any better approach? I just tried combineByKey
with the below code but the println
inside are not printing.
scala> val reduced = inputrdd.combineByKey(
| (mark) => {
| println(s"Create combiner -> ${mark}")
| (mark, 1)
| },
| (acc: (Int, Int), v) => {
| println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
| (acc._1 + v, acc._2 + 1)
| },
| (acc1: (Int, Int), acc2: (Int, Int)) => {
| println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
| (acc1._1 + acc2._1, acc1._2 + acc2._2)
| }
| )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[27] at combineByKey at <console>:29
scala> reduced.collect()
res5: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))
我使用的是 Spark 版本 2.2.0、Scala 2.11.8 和 Java 1.8 build 101
I am using Spark version 2.2.0 , Scala 2.11.8 and Java 1.8 build 101
推荐答案
使用较新的 DataFrame API 更容易解决这个问题.首先读取csv文件并添加列名:
This is much easier to solve using the newer DataFrame API. First read the csv file and add the column names:
val df = spark.read.csv("file://../customer-orders.csv").toDF("Customerid", "Orderid", "Amount")
然后使用 groupBy
和 agg
进行聚合(这里你想要 collect_list
、sum
和 >计数
):
Then use groupBy
and agg
to make the aggregations (here you want collect_list
, sum
and count
):
val df2 = df.groupBy("Customerid").agg(
collect_list($"Orderid") as "Orderids",
sum($"Amount") as "TotalAmount",
count($"Orderid") as "NumberOfOrderIds"
)
使用提供的输入示例生成的数据帧:
Resulting dataframe using the provided input example:
+----------+------------+-----------+----------------+
|Customerid| Orderids|TotalAmount|NumberOfOrderIds|
+----------+------------+-----------+----------------+
| 51| [3378]| 19.8| 1|
| 15| [6148]| 65.53| 1|
| 29| [680]| 13.08| 1|
| 42| [6926]| 57.77| 1|
| 85| [1733]| 28.53| 1|
| 35| [5368]| 65.89| 1|
| 47| [6694]| 14.98| 1|
| 5| [3505]| 64.18| 1|
| 70| [3959]| 68.68| 1|
| 44|[8602, 8331]| 136.38| 2|
| 53| [9900]| 83.55| 1|
| 48| [5539]| 32.42| 1|
| 79| [9291]| 33.17| 1|
| 20| [6633]| 6.49| 1|
| 14| [1505]| 4.32| 1|
| 91| [8900]| 24.59| 1|
| 2|[3391, 4424]| 96.41| 2|
| 50| [3901]| 23.57| 1|
+----------+------------+-----------+----------------+
如果你想在这些转换后将数据作为 RDD 处理,你可以在之后进行转换:
If you want to work with the data as a RDD after these transformations, you can convert it afterwards:
val rdd = df2.as[(Int, Seq[Int], Float, Int)].rdd
当然,也可以直接使用RDD来解决.使用 aggregateByKey
:
val rdd = spark.sparkContext
.textFile("test.csv")
.map(x => x.split(","))
.map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))
val res = rdd.aggregateByKey((Seq[Int](), 0.0, 0))(
(acc, xs) => (acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1),
(acc1, acc2) => (acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3))
这更难阅读,但会给出与上述数据框方法相同的结果.
This is harder to read but will give the same result as the dataframe approach above.
这篇关于如何使用RDD分组和聚合多个字段?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!