本文介绍了如何从PySpark DataFrame批处理项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个PySpark数据框,对于每条记录,我想调用一个API.所以基本上说我有100000k条记录,我想将项目分成1000个组并调用API.如何使用PySpark做到这一点?进行批处理的原因是因为API可能不会接受来自大数据系统的大量数据.

I have a PySpark data frame and for each (batch of) record(s), I want to call an API. So basically say I have 100000k records, I want to batch up items into groups of say 1000 and call an API. How can I do this with PySpark? Reason for the batching is because the API probably will not accept a huge chunk of data from a Big Data system.

我首先想到了 LIMIT ,但这不会是确定性的".此外,看起来效率低下吗?

I first thought of LIMIT but that wont be "deterministic". Furthermore it seems like it would be inefficient?

推荐答案

df.foreachPartition { ele =>
   ele.grouped(1000).foreach { chunk =>
   postToServer(chunk)
}

代码在scala中,您可以在python中进行检查.它将创建1000个批次.

Code is in scala, you can check same in python. It will create batches of 1000.

这篇关于如何从PySpark DataFrame批处理项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-31 14:46