问题描述
我是引发一个新手,我想为转化下面的源数据帧(负载从JSON文件):
+ - + ----- + ----- +
| A |计数|大|
+ - + ----- + ----- +
| A | 1 | M1 |
| A | 1 | M2 |
| A | 2 | M3 |
| A | 3 | M4 |
| C | 4 | M1 |
| C | 1 | M2 |
| C | 2 | M3 |
| B | 3 | M1 |
| B | 4 | M3 |
| B | 5 | M4 |
|开发| 6 | M1 |
|开发| 1 | M2 |
|开发| 2 | M3 |
|开发| 3 | M4 |
|开发| 4 | M5 |
| E | 4 | M1 |
| E | 5 | M2 |
| E | 1 | M3 |
| E | 1 | M4 |
| E | 1 | M5 |
+ - + ----- + ----- +
进入下面的结果数据帧
+ - + - + - + - + - + - +
| A | M1 | M2 | M3 | M4 | M5 |
+ - + - + - + - + - + - +
| A | 1 | 1 | 2 | 3 | 0 |
| C | 4 | 2 | 1 | 0 | 0 |
| B | 3 | 0 | 4 | 5 | 0 |
|开发| 6 | 1 | 2 | 3 | 4 |
| E | 4 | 5 | 1 | 1 | 1 |
+ - + - + - + - + - + - +
下面是在转化规律
-
结果数据框是由与
A +(N大柱)
其中主要
列名通过指定:排序(src_df.map(拉姆达×:X [2])不同的()收集())
-
结果数据帧包含其中,由提供
A
列中的值M
行:排序(src_df.map(拉姆达×:X [0])不同的()收集())
-
对于结果数据框中每个主要列中的值是从源数据框上相应的
A
和主要的价值
(例如,在源数据框第1行中的计数被映射到箱
,其中A
是一个
和列M1
) -
A
和主要
的源数据帧的组合没有重复(请考虑它的主在SQL中的两列键)
让我们开始示例数据:
DF = sqlContext.createDataFrame([
(一,1,M1),(A,1,2),(一,2,M3),
(一,3,M4),(B,4,M1),(B,1,2),
(B,2,M3),(C,3,M1),(C,4立方米),
(C,5,M4),(D,6,M1),(D,1,2),
(D,2,M3),(D,3,M4),(D,4,M5),
(E; 4,M1),(E,5,M2),(E,1,M3),
(E,1,M4),(E,1,M5)],
(一,CNT,主要))
请注意,我已经改变了计数
到 CNT
。 Count是大多数SQL方言保留关键字,这是不是一个列名的好选择。
有至少两种方式来重塑这个数据
-
在汇总数据框
从pyspark.sql.functions进口山坳时,最大专业=排序(df.select(重大)
。不同()
.MAP(拉姆达行:行[0])
。搜集())COLS = [时(COL(重大)==男,COL(CNT))否则(无).alias(M)
在专业M]
MAXS = [在专业最大值(山口(米))。别名(米)为米]reshaped1 =(DF
。选择(COL(一),* COLS)
.groupBy(一)
.agg(* MAXS)
.na.fill(0))reshaped1.show()## + --- + --- + --- + --- + --- + --- +
## | A | M1 | M2 | M3 | M4 | M5 |
## + --- + --- + --- + --- + --- + --- +
## | A | 1 | 1 | 2 | 3 | 0 |
## | C | 4 | 1 | 2 | 0 | 0 |
## | B | 3 | 0 | 4 | 5 | 0 |
## |开发| 6 | 1 | 2 | 3 | 4 |
## | E | 4 | 5 | 1 | 1 | 1 |
## + --- + --- + --- + --- + --- + --- + -
GROUPBY
过RDD从pyspark.sql进口排分组=(DF
.MAP(拉姆达行:(row.a,(row.major,row.cnt)))
.groupByKey())高清make_row(KV):
K,VS = KV
TMP =字典(列表(VS)+(一中,k)])
返回行(** {K:tmp.get(K,0)在[是] K +专业})reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))reshaped2.show()## + --- + --- + --- + --- + --- + --- +
## | A | M1 | M2 | M3 | M4 | M5 |
## + --- + --- + --- + --- + --- + --- +
## | A | 1 | 1 | 2 | 3 | 0 |
## | E | 4 | 5 | 1 | 1 | 1 |
## | B | 3 | 0 | 4 | 5 | 0 |
## | C | 4 | 1 | 2 | 0 | 0 |
## |开发| 6 | 1 | 2 | 3 | 4 |
## + --- + --- + --- + --- + --- + --- +
I am a novice to spark, and I want to transform below source dataframe (load from JSON file):
+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a| 1| m1|
| a| 1| m2|
| a| 2| m3|
| a| 3| m4|
| b| 4| m1|
| b| 1| m2|
| b| 2| m3|
| c| 3| m1|
| c| 4| m3|
| c| 5| m4|
| d| 6| m1|
| d| 1| m2|
| d| 2| m3|
| d| 3| m4|
| d| 4| m5|
| e| 4| m1|
| e| 5| m2|
| e| 1| m3|
| e| 1| m4|
| e| 1| m5|
+--+-----+-----+
Into below result dataframe:
+--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+
Here is the Transformation Rule:
The result dataframe is consisted with
A + (n major columns)
where themajor
columns names are specified by:sorted(src_df.map(lambda x: x[2]).distinct().collect())
The result dataframe contains
m
rows where the values forA
column are provided by:sorted(src_df.map(lambda x: x[0]).distinct().collect())
The value for each major column in result dataframe is the value from source dataframe on the corresponding
A
and major(e.g. the count in Row 1 in source dataframe is mapped to thebox
whereA
isa
and columnm1
)The combinations of
A
andmajor
in source dataframe has no duplication (please consider it a primary key on the two columns in SQL)
Lets start with example data:
df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")],
("a", "cnt", "major"))
Please note that I've changed count
to cnt
. Count is a reserved keyword in most of the SQL dialects and it is not a good choice for a column name.
There are at least two ways to reshape this data:
aggregating over DataFrame
from pyspark.sql.functions import col, when, max majors = sorted(df.select("major") .distinct() .map(lambda row: row[0]) .collect()) cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) for m in majors] maxs = [max(col(m)).alias(m) for m in majors] reshaped1 = (df .select(col("a"), *cols) .groupBy("a") .agg(*maxs) .na.fill(0)) reshaped1.show() ## +---+---+---+---+---+---+ ## | a| m1| m2| m3| m4| m5| ## +---+---+---+---+---+---+ ## | a| 1| 1| 2| 3| 0| ## | b| 4| 1| 2| 0| 0| ## | c| 3| 0| 4| 5| 0| ## | d| 6| 1| 2| 3| 4| ## | e| 4| 5| 1| 1| 1| ## +---+---+---+---+---+---+
groupBy
over RDDfrom pyspark.sql import Row grouped = (df .map(lambda row: (row.a, (row.major, row.cnt))) .groupByKey()) def make_row(kv): k, vs = kv tmp = dict(list(vs) + [("a", k)]) return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors}) reshaped2 = sqlContext.createDataFrame(grouped.map(make_row)) reshaped2.show() ## +---+---+---+---+---+---+ ## | a| m1| m2| m3| m4| m5| ## +---+---+---+---+---+---+ ## | a| 1| 1| 2| 3| 0| ## | e| 4| 5| 1| 1| 1| ## | c| 3| 0| 4| 5| 0| ## | b| 4| 1| 2| 0| 0| ## | d| 6| 1| 2| 3| 4| ## +---+---+---+---+---+---+
这篇关于星火数据框中变换多行列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!