问题描述
我有一个由几千列乘以大约一百万行的宽 Spark 数据框,我想计算行总数.到目前为止,我的解决方案如下.我用了:dplyr - 使用正则表达式的多列的总和和https://github.com/tidyverse/rlang/issues/116
I have a wide spark data frame of a few thousand columns by about a million rows, for which I would like to calculate the row totals. My solution so far is below. I used:dplyr - sum of multiple columns using regular expressions andhttps://github.com/tidyverse/rlang/issues/116
library(sparklyr)
library(DBI)
library(dplyr)
library(rlang)
sc1 <- spark_connect(master = "local")
wide_df = as.data.frame(matrix(ceiling(runif(2000, 0, 20)), 10, 200))
wide_sdf = sdf_copy_to(sc1, wide_df, overwrite = TRUE, name = "wide_sdf")
col_eqn = paste0(colnames(wide_df), collapse = "+" )
# build up the SQL query and send to spark with DBI
query = paste0("SELECT (",
col_eqn,
") as total FROM wide_sdf")
dbGetQuery(sc1, query)
# Equivalent approach using dplyr instead
col_eqn2 = quo(!! parse_expr(col_eqn))
wide_sdf %>%
transmute("total" := !!col_eqn2) %>%
collect() %>%
as.data.frame()
当列数增加时问题就来了.在 spark SQL 上,它似乎一次计算一个元素,即 (((V1 + V1) + V3) + V4)...) 这会由于非常高的递归而导致错误.
The problems come when the number of columns is increased. On spark SQL it seems to be calculated one element at a time i.e. (((V1 + V1) + V3) + V4)...) This is leading to errors due to very high recursion.
有没有人有更有效的替代方法?任何帮助将不胜感激.
Does anyone have an alternative more efficient approach? Any help would be much appreciated.
推荐答案
你在这里走运了.以一种或另一种方式,您将遇到一些递归限制(即使您绕过 SQL 解析器,足够大的表达式总和会使查询计划器崩溃).有一些缓慢的解决方案可用:
You're out of luck here. One way or another you're are going to hit some recursion limits (even if you go around SQL parser, sufficiently large sum of expressions will crash query planner). There are some slow solutions available:
使用
spark_apply
(以与 R 之间的转换为代价):
Use
spark_apply
(at the cost of conversion to and from R):
wide_sdf %>% spark_apply(function(df) { data.frame(total = rowSums(df)) })
转换为长格式并聚合(以explode
和shuffle 为代价):
Convert to long format and aggregate (at the cost of explode
and shuffle):
key_expr <- "monotonically_increasing_id() AS key"
value_expr <- paste(
"explode(array(", paste(colnames(wide_sdf), collapse=","), ")) AS value"
)
wide_sdf %>%
spark_dataframe() %>%
# Add id and explode. We need a separate invoke so id is applied
# before "lateral view"
sparklyr::invoke("selectExpr", list(key_expr, "*")) %>%
sparklyr::invoke("selectExpr", list("key", value_expr)) %>%
sdf_register() %>%
# Aggregate by id
group_by(key) %>%
summarize(total = sum(value)) %>%
arrange(key)
为了获得更高的效率,您应该考虑编写 Scala 扩展并直接在 Row
对象上应用 sum,而不会爆炸:
To get something more efficient you should consider writing Scala extension and applying sum directly on a Row
object, without exploding:
package com.example.sparklyr.rowsum
import org.apache.spark.sql.{DataFrame, Encoders}
object RowSum {
def apply(df: DataFrame, cols: Seq[String]) = df.map {
row => cols.map(c => row.getAs[Double](c)).sum
}(Encoders.scalaDouble)
}
和
invoke_static(
sc, "com.example.sparklyr.rowsum.RowSum", "apply",
wide_sdf %>% spark_dataframe
) %>% sdf_register()
这篇关于有效计算宽 Spark DF 的行总数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!