本文介绍了Spark Dataframe 除方法问题外的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例来减去两个数据帧.所以我使用了数据框 except() 方法.

I have a use case to minus two dataframes . So i have used the dataframe except() method.

这在较小的数据集上在本地运行良好.

This is working fine locally on smaller set of data.

但是当我运行 AWS S3 存储桶时,except() 方法没有按预期减负.分布式环境有什么需要注意的吗?

But when I ran over AWS S3 bucket ,the except() method is not making minus as expected . Is there anything needs to be taken care on distributed environment ?

有人遇到过类似的问题吗?

Does anyone faced this similar issue ?

这是我的示例代码

val values = List(List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "A", "Yes")
  , List("Two", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "X", "No")
  , List("Three", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "M", "Yes")
  , List("Four", "2017-11-01T23:59:59.000", "2017-12-09T23:59:58.000", "A", "No")
  , List("Five", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "", "No")
  ,List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "", "No")
)
  .map(row => (row(0), row(1), row(2), row(3), row(4)))

val spark = SparkSession.builder().master("local").getOrCreate()

import spark.implicits._

val df = values.toDF("KEY", "ROW_START_DATE", "ROW_END_DATE", "CODE", "Indicator")

val filterCond = (col("ROW_START_DATE") <= "2017-10-31T23:59:59.999" && col("ROW_END_DATE") >= "2017-10-31T23:59:59.999" && col("CODE").isin("M", "A", "R", "G"))


val Filtered = df.filter(filterCond)
val Excluded = df.except(df.filter(filterCond))

预期输出:

df.show(false)
Filtered.show(false)
Excluded.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY  |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |
|Two  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X   |No       |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M   |Yes      |
|Four |2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A   |No       |
|Five |2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|    |No       |
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|    |No       |
+-----+-----------------------+-----------------------+----+---------+
+-----+-----------------------+-----------------------+----+---------+
|KEY  |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M   |Yes      |
+-----+-----------------------+-----------------------+----+---------+
+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A   |No       |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X   |No       |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|    |No       |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|    |No       |
+----+-----------------------+-----------------------+----+---------+

但是当运行在 S3 存储桶上时得到类似下面的内容

But Getting something like below when ran over S3 bucket

Filtered.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY  |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One  |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M   |Yes      |
+-----+-----------------------+-----------------------+----+---------+

Excluded.show(false)

+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE         |ROW_END_DATE           |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A   |Yes      |---> wrong
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A   |No       |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X   |No       |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|    |No       |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|    |No       |
+----+-----------------------+-----------------------+----+---------+

还有其他方法可以减去两个火花数据帧吗?

Is there any other way to perform minus of two spark dataframe ?

推荐答案

可以根据两个数据帧的唯一性在两个数据帧上使用 leftanti join,这将提供您期望从 except 操作获得的输出.

One could use leftanti join on the two dataframes based on the uniqueness on the two dataframes and that would give you the output that you were expecting from the except operation.

val diffdf = df1.join(df2,Seq("uniquekey"),"leftanti")

这篇关于Spark Dataframe 除方法问题外的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 04:36