我有一个配置单元表,数据每天都在增加。在特定的一天,某些损坏的记录已插入表中。有没有办法我可以将表与HDFS上的主文件匹配并从Hive中提取损坏的记录
要么
如何在具有100万行的配置单元表中识别损坏的记录?

最佳答案

使用join, except1 找出加载到Hive表vs文件中的损坏记录。
Example:

//read the file
val df=spark.read.<format>("<path>")

//read hive table
val df1=spark.read.table("<db>.<hive_table_name>")

//without using md5 hash
df.exceptAll(df1).show()
df1.exceptAll(df).show()

//create md5 hash by concatenating all column values
val df2=df.withColumn("md_hash",md5(concat_ws(",",df.columns.map(c => col(c)): _*))).select("md_hash")

val df3=df1.withColumn("md_hash",md5(concat_ws(",",df.columns.map(c => col(c)): _*))).select("md_hash")

//get non matching rows from df2 that are not existed in df3
df2.except(df3).show()
df2.exceptAll(df3).show()

//get non matching rows from df3 that are not existed in df2
df3.exceptA(df2).show()
df3.exceptAll(df2).show()

//or using full outer join
df3.join(df2,df3("md_hash") === df2("md_hash"),"full").
filter((df2("md_hash").isNull || df3("md_hash").isNull)).
show(10,false)

关于apache-spark - 如何在Hive表中检查损坏的记录,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/63119639/

10-16 01:39