假设我要构建一个Spark应用程序,该应用程序希望能够部分删除。我仍然想保留成功完成分区中的数据。我试图通过将其插入到Hive表中来实现。在(PySpark)伪代码中:

def myExpensiveProcess(x):
 ...

udfDoExpensiveThing = udf(myExpensiveProcess, StringType())

myDataFrame \
  .repartition(100) \
  .withColumn("HardEarnedContent", udfDoExpensiveThing("InputColumn")) \
  .write.insertInto("SomeExistingHiveTable")

我运行它直到完成30个分区,然后我终止了工作。当我检查SomeExistingHiveTable时,我发现它没有新行。

我如何保存完成的数据,而不管哪一个没有完成?

最佳答案

这是预期和期望的行为,可确保输出的一致性。

绕过Spark的数据源API将数据直接写入文件系统。

myDataFrame \
  .repartition(100) \
  .withColumn("HardEarnedContent", udfDoExpensiveThing("InputColumn")) \
  .rdd \
  .foreachPartition(write_to_storage)

其中write_to_storage实现所需的逻辑,例如使用one of the HDFS interfaces

09-11 02:24