问题描述
我有RDD[Row]
,需要将其保存到第三方存储库中.但是此第三方存储库在一次调用中最多接受5 MB.
I have RDD[Row]
, which needs to be persisted to a third party repository.But this third party repository accepts of maximum of 5 MB in a single call.
所以我想基于RDD中存在的数据大小而不是RDD中存在的行数来创建分区.
So I want to create partition based on the size of the data present in RDD and not based on the number of rows present in RDD.
如何找到RDD
的大小并基于它创建分区?
How can I find the size of a RDD
and create partitions based on it?
推荐答案
正如Justin和Wang提到的那样,获取RDD的大小并非直截了当.我们可以做一个估计.
As Justin and Wang mentioned it is not straight forward to get the size of RDD. We can just do a estimate.
我们可以对RDD进行采样,然后使用 SizeEstimator 来获取样本的大小.正如王和贾斯汀所说,根据离线取样的大小数据,例如X行使用Y GB离线,运行时Z行可能需要Z * Y/X GB
We can sample a RDD and then use SizeEstimator to get the size of sample. As Wang and Justin mentioned,based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB
这里是样本Scala代码,用于获取RDD的大小/估算值.
Here is the sample scala code to get the size/estimate of a RDD.
我是scala和spark的新手.下面的示例可能会写得更好
I am new to scala and spark. Below sample may be written in a better way
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
这篇关于如何找到RDD的大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!