问题描述
我是一个新手,我想在源数据帧下面转换(从 JSON 文件加载):
I am a novice to spark, and I want to transform below source dataframe (load from JSON file):
+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a| 1| m1|
| a| 1| m2|
| a| 2| m3|
| a| 3| m4|
| b| 4| m1|
| b| 1| m2|
| b| 2| m3|
| c| 3| m1|
| c| 4| m3|
| c| 5| m4|
| d| 6| m1|
| d| 1| m2|
| d| 2| m3|
| d| 3| m4|
| d| 4| m5|
| e| 4| m1|
| e| 5| m2|
| e| 1| m3|
| e| 1| m4|
| e| 1| m5|
+--+-----+-----+
进入结果数据框:
+--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+
这是转换规则:
结果数据框由
A + (n major columns)
组成,其中major
列名称由:
The result dataframe is consisted with
A + (n major columns)
where themajor
columns names are specified by:
sorted(src_df.map(lambda x: x[2]).distinct().collect())
结果数据帧包含 m
行,其中 A
列的值由:
The result dataframe contains m
rows where the values for A
column are provided by:
sorted(src_df.map(lambda x: x[0]).distinct().collect())
结果数据帧中每个主要列的值是来自相应A
和主要数据帧的源数据帧的值(例如,源数据帧中第 1 行的计数映射到 box
,其中 A
是 a
和列 m1
)
The value for each major column in result dataframe is the value from source dataframe on the corresponding A
and major(e.g. the count in Row 1 in source dataframe is mapped to the box
where A
is a
and column m1
)
源数据帧中A
和major
的组合没有重复(请把它当作SQL中两列的主键)
The combinations of A
and major
in source dataframe has no duplication (please consider it a primary key on the two columns in SQL)
推荐答案
让我们从示例数据开始:
Lets start with example data:
df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")],
("a", "cnt", "major"))
请注意,我已将 count
更改为 cnt
.Count 是大多数 SQL 方言中的保留关键字,它不是列名的好选择.
Please note that I've changed count
to cnt
. Count is a reserved keyword in most of the SQL dialects and it is not a good choice for a column name.
至少有两种方法可以重塑这些数据:
There are at least two ways to reshape this data:
在 DataFrame 上聚合
aggregating over DataFrame
from pyspark.sql.functions import col, when, max
majors = sorted(df.select("major")
.distinct()
.map(lambda row: row[0])
.collect())
cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m)
for m in majors]
maxs = [max(col(m)).alias(m) for m in majors]
reshaped1 = (df
.select(col("a"), *cols)
.groupBy("a")
.agg(*maxs)
.na.fill(0))
reshaped1.show()
## +---+---+---+---+---+---+
## | a| m1| m2| m3| m4| m5|
## +---+---+---+---+---+---+
## | a| 1| 1| 2| 3| 0|
## | b| 4| 1| 2| 0| 0|
## | c| 3| 0| 4| 5| 0|
## | d| 6| 1| 2| 3| 4|
## | e| 4| 5| 1| 1| 1|
## +---+---+---+---+---+---+
groupBy
RDD
from pyspark.sql import Row
grouped = (df
.map(lambda row: (row.a, (row.major, row.cnt)))
.groupByKey())
def make_row(kv):
k, vs = kv
tmp = dict(list(vs) + [("a", k)])
return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})
reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))
reshaped2.show()
## +---+---+---+---+---+---+
## | a| m1| m2| m3| m4| m5|
## +---+---+---+---+---+---+
## | a| 1| 1| 2| 3| 0|
## | e| 4| 5| 1| 1| 1|
## | c| 3| 0| 4| 5| 0|
## | b| 4| 1| 2| 0| 0|
## | d| 6| 1| 2| 3| 4|
## +---+---+---+---+---+---+
这篇关于Spark 数据框将多行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!