问题描述
我有RDD[Row]
,需要持久化到第三方存储库.但是这个第三方存储库在一次调用中最多接受 5 MB.
I have RDD[Row]
, which needs to be persisted to a third party repository.But this third party repository accepts of maximum of 5 MB in a single call.
所以我想根据 RDD 中存在的数据大小而不是根据 RDD 中存在的行数创建分区.
So I want to create partition based on the size of the data present in RDD and not based on the number of rows present in RDD.
如何找到 RDD
的大小并基于它创建分区?
How can I find the size of a RDD
and create partitions based on it?
推荐答案
正如 Justin 和 Wang 所提到的,获得 RDD 的大小并不简单.我们可以做一个估计.
As Justin and Wang mentioned it is not straight forward to get the size of RDD. We can just do a estimate.
我们可以对 RDD 进行采样,然后使用 SizeEstimator 得到样本的大小.正如王和贾斯汀所说,根据离线采样的大小数据,假设 X 行离线使用 Y GB,运行时 Z 行可能占用 Z*Y/X GB
We can sample a RDD and then use SizeEstimator to get the size of sample. As Wang and Justin mentioned,based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB
这是获取 RDD 大小/估计的示例 Scala 代码.
Here is the sample scala code to get the size/estimate of a RDD.
我是 Scala 和 Spark 的新手.下面的示例可能会以更好的方式编写
I am new to scala and spark. Below sample may be written in a better way
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
这篇关于如何找到RDD的大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!