




I am trying a small program where I am considering an employee dataset and trying to calculate sum of salaries distributed in various departments. I have a reproducible example. Can anybody please explain me what is happening.

 emp_list=[(u'ACC', [u'101', u'a', u'ACC', u'1000']),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', [u'103', u'c', u'IT', u'3000']),
 (u'ACC', [u'104', u'd', u'ACC', u'4000']),
 (u'ACC', [u'105', u'e', u'ACC', u'5000']),
 (u'HR', [u'106', u'f', u'HR', u'6000']),
 (u'ACC', [u'107', u'g', u'ACC', u'7000']),
 (u'FIN', [u'108', u'h', u'FIN', u'8000']),
 (u'ACC', [u'109', u'k', u'ACC', u'9000']),
 (u'HR', [u'1010', u'l', u'HR', u'10000']),
 (u'ACC', [u'1011', u'm', u'ACC', u'11000']),
 (u'ACC', [u'1012', u'n', u'ACC', u'12000']),
 (u'FIN', [u'1013', u'o', u'FIN', u'13000']),
 (u'IT', [u'1014', u'p', u'IT', u'14000'])]


emp.reduceByKey(lambda x,y : x[3]+y[3]).take(10)


[(u'ACC', u'00'),
 (u'HR', u'600010000'),
 (u'FIN', u'800013000'),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', u'300014000')]

Can anybody please explain me why I am getting strange values for ACC and SALES departments. I want to see the concatenated salaries for these two as well.


You get strange values because logic of your function is invalid. If you use Scala instead of Python this wouldn't even compile. When you apply reduceByKey LHS and RHS and return type should be of the same type:

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

In your case types don't match (input is a list and return type is a string) and function is not associative. To understand what is going on lets consider two different cases:

  1. Only one value per key. Since func is not applied you get this value as an output. Hence (u'SALES', [u'102', u'b', u'SALES', u'2000'])

Multiple values per key. Lets take a subset of values from ACC as an example and assume order of operations is defined as follows

  # 1st partition
  ([u'101', u'a', u'ACC', u'1000'], [u'104', u'd', u'ACC', u'4000']),
  # 2nd partition
  ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000'])

后的第二个应用程序的 FUNC 我们得到





In practice parenthesizing can vary depending on configuration so you can get different outputs.

To get correct results you should use aggregateByKey / combineByKey, map + reduce as suggested by @alexs or map followed by groupByKey and mapValues. The last one should be the most efficient approach here since it doesn't require intermediate objects:

emp.mapValues(lambda x: x[3]).groupByKey().mapValues(lambda xs: "".join(xs))

For reference the same thing using aggregateByKey:

from operator import add

rdd.aggregateByKey("", lambda acc, x: acc + x[3], add)


08-24 03:11