




  +  -  + ----- + ----- +
| A |计数|大|
+ - + ----- + ----- +
| A | 1 | M1 |
| A | 1 | M2 |
| A | 2 | M3 |
| A | 3 | M4 |
| C | 4 | M1 |
| C | 1 | M2 |
| C | 2 | M3 |
| B | 3 | M1 |
| B | 4 | M3 |
| B | 5 | M4 |
|开发| 6 | M1 |
|开发| 1 | M2 |
|开发| 2 | M3 |
|开发| 3 | M4 |
|开发| 4 | M5 |
| E | 4 | M1 |
| E | 5 | M2 |
| E | 1 | M3 |
| E | 1 | M4 |
| E | 1 | M5 |
+ - + ----- + ----- +


  +  -  +  -  +  -  +  -  +  -  +  -  +
| A | M1 | M2 | M3 | M4 | M5 |
+ - + - + - + - + - + - +
| A | 1 | 1 | 2 | 3 | 0 |
| C | 4 | 2 | 1 | 0 | 0 |
| B | 3 | 0 | 4 | 5 | 0 |
|开发| 6 | 1 | 2 | 3 | 4 |
| E | 4 | 5 | 1 | 1 | 1 |
+ - + - + - + - + - + - +


  1. 结果数据框是由与 A +(N大柱)其中主要列名通过指定:

     排序(src_df.map(拉姆达×:X [2])不同的()收集())

  2. 结果数据帧包含其中,由提供 A 列中的值 M 行:

     排序(src_df.map(拉姆达×:X [0])不同的()收集())

  3. 对于结果数据框中每个主要列中的值是从源数据框上相应的 A 和主要的价值
    (例如,在源数据框第1行中的计数被映射到,其中 A 一个和列 M1

  4. A 主要的源数据帧的组合没有重复(请考虑它的主在SQL中的两列键)



  DF = sqlContext.createDataFrame([
    (E; 4,M1),(E,5,M2),(E,1,M3),

请注意,我已经改变了计数 CNT 。 Count是大多数SQL方言保留关键字,这是不是一个列名的好选择。


  • 在汇总数据框

        。搜集())COLS = [时(COL(重大)==男,COL(CNT))否则(无).alias(M)
    MAXS = [在专业最大值(山口(米))。别名(米)为米]reshaped1 =(DF
        。选择(COL(一),* COLS)
        .agg(* MAXS)
        .na.fill(0))reshaped1.show()## + --- + --- + --- + --- + --- + --- +
    ## | A | M1 | M2 | M3 | M4 | M5 |
    ## + --- + --- + --- + --- + --- + --- +
    ## | A | 1 | 1 | 2 | 3 | 0 |
    ## | C | 4 | 1 | 2 | 0 | 0 |
    ## | B | 3 | 0 | 4 | 5 | 0 |
    ## |开发| 6 | 1 | 2 | 3 | 4 |
    ## | E | 4 | 5 | 1 | 1 | 1 |
    ## + --- + --- + --- + --- + --- + --- +


        K,VS = KV
        TMP =字典(列表(VS)+(一中,k)])
        返回行(** {K:tmp.get(K,0)在[是] K +专业})reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))reshaped2.show()## + --- + --- + --- + --- + --- + --- +
    ## | A | M1 | M2 | M3 | M4 | M5 |
    ## + --- + --- + --- + --- + --- + --- +
    ## | A | 1 | 1 | 2 | 3 | 0 |
    ## | E | 4 | 5 | 1 | 1 | 1 |
    ## | B | 3 | 0 | 4 | 5 | 0 |
    ## | C | 4 | 1 | 2 | 0 | 0 |
    ## |开发| 6 | 1 | 2 | 3 | 4 |
    ## + --- + --- + --- + --- + --- + --- +

I am a novice to spark, and I want to transform below source dataframe (load from JSON file):

|A |count|major|
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1|   m3|
| e|    1|   m4|
| e|    1|   m5|

Into below result dataframe:

|A |m1|m2|m3|m4|m5|
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|

Here is the Transformation Rule:

  1. The result dataframe is consisted with A + (n major columns) where the major columns names are specified by:

    sorted(src_df.map(lambda x: x[2]).distinct().collect())

  2. The result dataframe contains m rows where the values for A column are provided by:

    sorted(src_df.map(lambda x: x[0]).distinct().collect())

  3. The value for each major column in result dataframe is the value from source dataframe on the corresponding A and major(e.g. the count in Row 1 in source dataframe is mapped to the box where A is a and column m1)

  4. The combinations of A and major in source dataframe has no duplication (please consider it a primary key on the two columns in SQL)


Lets start with example data:

df = sqlContext.createDataFrame([
    ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
    ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
    ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
    ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
    ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
    ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
    ("e", 1, "m4"), ("e", 1, "m5")],
    ("a", "cnt", "major"))

Please note that I've changed count to cnt. Count is a reserved keyword in most of the SQL dialects and it is not a good choice for a column name.

There are at least two ways to reshape this data:

  • aggregating over DataFrame

    from pyspark.sql.functions import col, when, max
    majors = sorted(df.select("major")
        .map(lambda row: row[0])
    cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m)
        for m in  majors]
    maxs = [max(col(m)).alias(m) for m in majors]
    reshaped1 = (df
        .select(col("a"), *cols)
    ## +---+---+---+---+---+---+
    ## |  a| m1| m2| m3| m4| m5|
    ## +---+---+---+---+---+---+
    ## |  a|  1|  1|  2|  3|  0|
    ## |  b|  4|  1|  2|  0|  0|
    ## |  c|  3|  0|  4|  5|  0|
    ## |  d|  6|  1|  2|  3|  4|
    ## |  e|  4|  5|  1|  1|  1|
    ## +---+---+---+---+---+---+

  • groupBy over RDD

    from pyspark.sql import Row
    grouped = (df
        .map(lambda row: (row.a, (row.major, row.cnt)))
    def make_row(kv):
        k, vs = kv
        tmp = dict(list(vs) + [("a", k)])
        return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})
    reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))
    ## +---+---+---+---+---+---+
    ## |  a| m1| m2| m3| m4| m5|
    ## +---+---+---+---+---+---+
    ## |  a|  1|  1|  2|  3|  0|
    ## |  e|  4|  5|  1|  1|  1|
    ## |  c|  3|  0|  4|  5|  0|
    ## |  b|  4|  1|  2|  0|  0|
    ## |  d|  6|  1|  2|  3|  4|
    ## +---+---+---+---+---+---+


08-01 04:35