本文介绍了有效计算宽 Spark DF 的行总数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由几千列乘以大约一百万行的宽 Spark 数据框,我想计算行总数.到目前为止,我的解决方案如下.我用了:dplyr - 使用正则表达式的多列的总和https://github.com/tidyverse/rlang/issues/116

I have a wide spark data frame of a few thousand columns by about a million rows, for which I would like to calculate the row totals. My solution so far is below. I used:dplyr - sum of multiple columns using regular expressions andhttps://github.com/tidyverse/rlang/issues/116

library(sparklyr)
library(DBI)
library(dplyr)
library(rlang)

sc1 <- spark_connect(master = "local")
wide_df = as.data.frame(matrix(ceiling(runif(2000, 0, 20)), 10, 200))
wide_sdf = sdf_copy_to(sc1, wide_df, overwrite = TRUE, name = "wide_sdf")

col_eqn = paste0(colnames(wide_df), collapse = "+" )

# build up the SQL query and send to spark with DBI
query = paste0("SELECT (",
               col_eqn,
               ") as total FROM wide_sdf")

dbGetQuery(sc1, query)

# Equivalent approach using dplyr instead
col_eqn2 = quo(!! parse_expr(col_eqn))

wide_sdf %>%
    transmute("total" := !!col_eqn2) %>%
        collect() %>%
            as.data.frame()

当列数增加时问题就来了.在 spark SQL 上,它似乎一次计算一个元素,即 (((V1 + V1) + V3) + V4)...) 这会由于非常高的递归而导致错误.

The problems come when the number of columns is increased. On spark SQL it seems to be calculated one element at a time i.e. (((V1 + V1) + V3) + V4)...) This is leading to errors due to very high recursion.

有没有人有更有效的替代方法?任何帮助将不胜感激.

Does anyone have an alternative more efficient approach? Any help would be much appreciated.

推荐答案

你在这里走运了.以一种或另一种方式,您将遇到一些递归限制(即使您绕过 SQL 解析器,足够大的表达式总和会使查询计划器崩溃).有一些缓慢的解决方案可用:

You're out of luck here. One way or another you're are going to hit some recursion limits (even if you go around SQL parser, sufficiently large sum of expressions will crash query planner). There are some slow solutions available:

  • 使用 spark_apply(以与 R 之间的转换为代价):

  • Use spark_apply (at the cost of conversion to and from R):

wide_sdf %>% spark_apply(function(df) { data.frame(total = rowSums(df)) })

  • 转换为长格式并聚合(以explode 和shuffle 为代价):

  • Convert to long format and aggregate (at the cost of explode and shuffle):

    key_expr <- "monotonically_increasing_id() AS key"
    
    value_expr <- paste(
     "explode(array(", paste(colnames(wide_sdf), collapse=","), ")) AS value"
    )
    
    wide_sdf %>%
      spark_dataframe() %>%
      # Add id and explode. We need a separate invoke so id is applied
      # before "lateral view"
      sparklyr::invoke("selectExpr", list(key_expr, "*")) %>%
      sparklyr::invoke("selectExpr", list("key", value_expr)) %>%
      sdf_register() %>%
      # Aggregate by id
      group_by(key) %>%
      summarize(total = sum(value)) %>%
      arrange(key)
    

  • 为了获得更高的效率,您应该考虑编写 Scala 扩展并直接在 Row 对象上应用 sum,而不会爆炸:

    To get something more efficient you should consider writing Scala extension and applying sum directly on a Row object, without exploding:

    package com.example.sparklyr.rowsum
    
    import org.apache.spark.sql.{DataFrame, Encoders}
    
    object RowSum {
      def apply(df: DataFrame, cols: Seq[String]) = df.map {
        row => cols.map(c => row.getAs[Double](c)).sum
      }(Encoders.scalaDouble)
    }
    

    invoke_static(
      sc, "com.example.sparklyr.rowsum.RowSum", "apply",
      wide_sdf %>% spark_dataframe
    ) %>% sdf_register()
    

    这篇关于有效计算宽 Spark DF 的行总数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

    08-29 02:30