问题描述
我想用重新分区编写一个大型数据帧,所以我想计算源数据帧的重新分区数.
numberofpartition= {数据帧大小/default_blocksize}
所以请告诉我如何在 spark scala 中计算数据帧的大小
提前致谢.
Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
一旦加载到内存中,我们就可以获得实际Dataframe的大小,例如您可以查看以下代码.
scala>val df = spark.read.format("orc").load("/tmp/srinivas/")df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 更多字段]标度>导入 org.apache.commons.io.FileUtils导入 org.apache.commons.io.FileUtils标度>val 字节 = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes字节:BigInt = 763275709标度>FileUtils.byteCountToDisplaySize(bytes.toLong)res5:字符串 = 727 MB标度>导入 sys.process._导入 sys.process._标度>hdfs dfs -ls -h/tmp/srinivas/".!找到 2 个项目-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46/tmp/srinivas/_SUCCESS-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46/tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917.srinivas.orres6:整数 = 0
val 字节 = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytesval dataSize = bytes.toLongval numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt//也许您可以更改或修改它以获得所需的分区.df.repartition(if(numPartitions == 0) 1 else numPartitions).[...]
Edit - 1
:请根据您的 Spark 版本使用以下逻辑.
火花 2.4
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
火花 2.3
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
对于 Python
spark._jsparkSession.sessionState().executePlan(df._jdf.queryExecution().logical()).optimizedPlan().stats().sizeInBytes()
I want to write one large sized dataframe with repartition so i want to calculate number of repartition for my source dataframe.
numberofpartition= {size of dataframe/default_blocksize}
so please tell me how to calculate size of dataframe in spark scala
Thanks in Advance.
Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
we can get the size of actual Dataframe once its loaded into memory, for example you can check below code.
scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils
scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709
scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB
scala> import sys.process._
import sys.process._
scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
val dataSize = bytes.toLong
val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt // May be you can change or modify this to get required partitions.
df.repartition(if(numPartitions == 0) 1 else numPartitions)
.[...]
Edit - 1
: Please use below logic as per your spark versions.
spark 2.4
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
spark 2.3
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
For Python
spark._jsparkSession.sessionState().executePlan(df._jdf.queryExecution().logical()).optimizedPlan().stats().sizeInBytes()
这篇关于如何在spark scala中计算数据帧的大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!