对同一个节点上已有的数据使用Hive窗口功能时,是否会发生数据困惑?
特别是在以下示例中,在使用窗口函数数据之前,已经通过Spark repartition()函数对“City”进行了重新分区,
这应确保将城市“A”的所有数据共同定位在同一节点上(假设城市的数据可以适合一个节点)。
df = sqlContext.createDataFrame(
[('A', '1', 2009, "data1"),
('A', '1', 2015, "data2"),
('A', '22', 2015, "data3"),
('A', '22', 2016, "data4"),
('BB', '333', 2014, "data5"),
('BB', '333', 2012, "data6"),
('BB', '333', 2016, "data7")
],
("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# | BB| 333|2012|data6|
# | BB| 333|2014|data5|
# | BB| 333|2016|data7|
# | A| 22|2016|data4|
# | A| 22|2015|data3|
# | A| 1|2009|data1|
# | A| 1|2015|data2|
# +----+------+----+-----+
然后,我必须按“Person”进行窗口函数分区,这不是Spark repartition()中的分区键,如下所示。
df.registerTempTable('example')
sqlStr = """\
select *,
row_number() over (partition by Person order by year desc) ranking
from example
"""
sqlContext.sql(sqlStr).show(100)
# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# | BB| 333|2016|data7| 1|
# | BB| 333|2014|data5| 2|
# | BB| 333|2012|data6| 3|
# | A| 1|2015|data2| 1|
# | A| 1|2009|data1| 2|
# | A| 22|2016|data4| 1|
# | A| 22|2015|data3| 2|
# +----+------+----+-----+-------+
这是我的问题:
顺便说一句,我也想知道用Spark窗口函数实现示例Hive查询是否简单。
最佳答案
窗口函数中的partition by
子句和repartition
均以相同的TungstenExchange
机制执行。您在分析执行计划时会看到以下内容:
sqlContext.sql(sqlStr).explain()
## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
## +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
## +- Project [City#0,Person#1,year#2L,data#3]
## +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
## +- ConvertToUnsafe
## +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]
关于第二个问题,您的假设是正确的。即使数据已经位于单个节点上,Spark也没有有关数据分发的先验知识,并且将再次对数据进行洗牌。
最后,根据一个观点,您的查询已经是一个Spark查询,或者不可能使用普通的Spark执行此查询。
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
w = Window.partitionBy("person").orderBy(col("year").desc())
df.withColumn("ranking", row_number().over(w))