对同一个节点上已有的数据使用Hive窗口功能时,是否会发生数据困惑?

特别是在以下示例中,在使用窗口函数数据之前,已经通过Spark repartition()函数对“City”进行了重新分区,
这应确保将城市“A”的所有数据共同定位在同一节点上(假设城市的数据可以适合一个节点)。

df = sqlContext.createDataFrame(
    [('A', '1', 2009, "data1"),
     ('A', '1', 2015, "data2"),
     ('A', '22', 2015, "data3"),
     ('A', '22', 2016, "data4"),
     ('BB', '333', 2014, "data5"),
     ('BB', '333', 2012, "data6"),
     ('BB', '333', 2016, "data7")
    ],
    ("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# |  BB|   333|2012|data6|
# |  BB|   333|2014|data5|
# |  BB|   333|2016|data7|
# |   A|    22|2016|data4|
# |   A|    22|2015|data3|
# |   A|     1|2009|data1|
# |   A|     1|2015|data2|
# +----+------+----+-----+

然后,我必须按“Person”进行窗口函数分区,这不是Spark repartition()中的分区键,如下所示。
df.registerTempTable('example')
sqlStr = """\
    select *,
        row_number() over (partition by Person order by year desc) ranking
    from example
"""
sqlContext.sql(sqlStr).show(100)

# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# |  BB|   333|2016|data7|      1|
# |  BB|   333|2014|data5|      2|
# |  BB|   333|2012|data6|      3|
# |   A|     1|2015|data2|      1|
# |   A|     1|2009|data1|      2|
# |   A|    22|2016|data4|      1|
# |   A|    22|2015|data3|      2|
# +----+------+----+-----+-------+

这是我的问题:
  • Spark“repartition”和Hive“partition by”之间是否有任何关系或不同?在引擎盖下,它们是否在Spark上翻译成相同的东西?
  • 我想检查一下我的理解是否正确。即使所有数据都已经在同一个节点上,如果我调用Spark df.repartition('A_key_different_from_current_partidion_key'),数据也将被重排到许多节点,而不是一起存放在同一节点上。

  • 顺便说一句,我也想知道用Spark窗口函数实现示例Hive查询是否简单。

    最佳答案

    窗口函数中的partition by子句和repartition均以相同的TungstenExchange机制执行。您在分析执行计划时会看到以下内容:

    sqlContext.sql(sqlStr).explain()
    
    ## == Physical Plan ==
    ## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
    ## +- Sort [Person#1 ASC,year#2L DESC], false, 0
    ##    +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
    ##       +- Project [City#0,Person#1,year#2L,data#3]
    ##          +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
    ##             +- ConvertToUnsafe
    ##                +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]
    

    关于第二个问题,您的假设是正确的。即使数据已经位于单个节点上,Spark也没有有关数据分发的先验知识,并且将再次对数据进行洗牌。

    最后,根据一个观点,您的查询已经是一个Spark查询,或者不可能使用普通的Spark执行此查询。
  • 这是一个Spark查询,因为DSL对应方将使用完全相同的机制
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number
    
    w = Window.partitionBy("person").orderBy(col("year").desc())
    df.withColumn("ranking", row_number().over(w))
    
  • 无法使用纯Spark执行此操作,因为自Spark 1.6起,尚无窗口函数的 native 实现。它在Spark 2.0中进行了更改。
  • 10-02 01:16
    查看更多