假设我要构建一个Spark应用程序,该应用程序希望能够部分删除。我仍然想保留成功完成分区中的数据。我试图通过将其插入到Hive表中来实现。在(PySpark)伪代码中:
def myExpensiveProcess(x):
...
udfDoExpensiveThing = udf(myExpensiveProcess, StringType())
myDataFrame \
.repartition(100) \
.withColumn("HardEarnedContent", udfDoExpensiveThing("InputColumn")) \
.write.insertInto("SomeExistingHiveTable")
我运行它直到完成30个分区,然后我终止了工作。当我检查
SomeExistingHiveTable
时,我发现它没有新行。我如何保存完成的数据,而不管哪一个没有完成?
最佳答案
这是预期和期望的行为,可确保输出的一致性。
绕过Spark的数据源API将数据直接写入文件系统。
myDataFrame \
.repartition(100) \
.withColumn("HardEarnedContent", udfDoExpensiveThing("InputColumn")) \
.rdd \
.foreachPartition(write_to_storage)
其中
write_to_storage
实现所需的逻辑,例如使用one of the HDFS interfaces。