嗨,我有两张这样的桌子。
源表
orig1 orig2 orig3 xref1 xref2 xref3
1 1 1 2 2 2
1 1 1 3 3 3
23 23 23 12 12 12
目标表:
orig1 orig2 orig3 xref1 xref2 xref3 version
1 1 1 1 1 1 0
我需要输出如下
1)我需要匹配
(source(orig1 orig2 orig3) == target(orig1 orig2 orig3))
,如果它的macthing我们需要从源表追加到目标表,将版本增加1
如果不匹配,则将版本追加为“0”
预期产量为:
orig1 orig2 orig3 xref1 xref2 xref3 version
1 1 1 1 1 1 0
1 1 1 2 2 2 1
1 1 1 3 3 3 2
23 23 23 12 12 12 0
我试过数据帧级别。但它并没有如预期的那样工作。任何帮助都将不胜感激。
我试着用下面的方法。
val source = spark.sql("select xref1,xref2,xref3,orig1,orig2,orig3 from default.source")
val target = spark.sql("select xref1,xref2,xref3,orig1,orig2,orig3 from default.target")
val target10 = spark.sql("select xref1,xref2,xref3,orig1,orig2,orig3,version from default.target")
val diff=( source.select("xref1","xref2","xref3","orig1","orig2","orig3") == target.select("xref1","xref2","xref3","orig1","orig2","orig3"))
if ( diff == false ){
val diff1 = source.select("orig1","orig2","orig3").except(target.select("orig1","orig2","orig3"))
if ( diff1.count > 0 ) {
val ver = target10.groupBy("orig1","orig2","orig3").max("version")
val common = source.select("orig1","orig2","orig3").intersect(target.select("orig1","orig2","orig3"))
val result = common.join(ver, common("orig1") === ver("orig1") && common("orig2") === ver("orig2") && common("orig3") === ver("orig3"), "inner").select(ver("orig1"),ver("orig2"),ver("orig3"),(ver("max(version)") + 1
) as "version")
val result1 = result.join(source, result("orig1") === source("orig1") && result("orig2") === source("orig2") && result("orig3") === source("orig3"), "inner").select(source("orig1"),source("orig2"),source("orig3"),result("version"),source("xref1"),source("xref2"),source("xref3"))
val result2=source.select("orig1","orig2","orig3").except(target.select("orig1","orig2","orig3")).withColumn("version",lit(0))
val execpettarget=result2.select($"orig1".alias("DIV"),$"orig2".alias("SEC"),$"orig3".alias("UN"),$"version".alias("VER"))
val result23 = execpettarget.join(source, execpettarget("DIV") === source("orig1") && execpettarget("SEC") === source("orig2") && execpettarget("UN") === source("orig3"), "inner").select(source("orig1"),source("orig2"),source("orig3"),execpettarget("VER"),source("orig1"),source("orig2"), source("orig3"))
val final_result = result1.union(result23)
final_result.show()
}else{
println("else")
val ver1 = target10.groupBy("orig1","orig2","orig3").max("version")
val common1 = source.select("orig1","orig2","orig3").intersect(target.select("orig1","orig2","orig3"))
val result11 = common1.join(ver1, common1("orig1") === ver1("orig1") && common1("orig2") === ver1("orig2") && common1("orig3") === ver1("orig3"), "inner").select(ver1("orig1"),ver1("orig2"),ver1("orig3"),(ver1("max(version)") + 1) as "version")
val result3 = result11.join(source, result11("orig1") === source("orig1") && result11("orig2") === source("orig2") && result11("orig3") === source("orig3"), "inner").select(source("orig1"),source("orig2"),source("orig3"),result11("version"),source("xref1"),source("xref2"),source("xref3"))
result3.show()
}}
但在最后的联接中,源有两个重复行。所以当连接源和目标时,我得到了多行。
最佳答案
我没有试图破译你的代码,但基于源代码、目标代码和预期结果,我认为这可能是一个解决方案:
val w = Window.partitionBy('orig1,'orig2,'orig3).orderBy('version.desc)
val joined = source
.withColumn("version", lit(null).cast(IntegerType))
.union(target)
.withColumn("version", row_number().over(w) + coalesce(max('version).over(w),lit(0)) - lit(1))
joined.show()
我的想法是,一个连接没有意义,因为你想最终得到源记录+目标记录:=>
union
在此联合之后,您需要处理每组相似的键(orig1、orig2、orig3)=>
Window
您关心组中最高的版本号,否则请选择0:=>
max
&coalesce
您想将此最大值用作窗口其余部分的报价:=>
row_number
根据示例,此代码将输出:
+-----+-----+-----+-----+-----+-----+-------+
|orig1|orig2|orig3|xref1|xref2|xref3|version|
+-----+-----+-----+-----+-----+-----+-------+
| 23| 23| 23| 12| 12| 12| 0|
| 1| 1| 1| 1| 1| 1| 0|
| 1| 1| 1| 2| 2| 2| 1|
| 1| 1| 1| 3| 3| 3| 2|
+-----+-----+-----+-----+-----+-----+-------+