本文介绍了如何在带有SparkR的SparkDataFrame中使用未定义的变量列表作为列名?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在SparkR的世界里不断进步,现在面临着一个我无法解决的问题。

在处理SparkDataFrame操作时,我可能希望更新一些列,或聚合其他列。我学会了如何在个案的基础上做到这一点,也就是一栏一栏地做。

举个例子:

library(SparkR)
library(magrittr)

# Creating SDF
nb.row <- 10 
nb.col <- 10 
m <- matrix(runif(n=nb.row*nb.col, min = 0, max = 100), nb.row, nb.col)
sdf <- createDataFrame(data.frame(ID = 1:10, CODE = base::sample(LETTERS[1:2]), V = m))
  1. 如果我想更新列,我可以执行如下操作:
sdf <- withColumn(sdf, "V_1", sdf$V_1 * 1000)
sdf <- withColumn(sdf, "V_2", sdf$V_2 * 1000)
  1. 如果我想聚合列,可以执行如下操作:
agg1 <- agg(groupBy(sdf, "CODE"), "SV_6" = sum(sdf$V_6), "SV_7" = sum(sdf$V_7))

我的问题是:当我不知道要处理的列的列表时,如何处理这些情况?(在R Basic上很容易,在SparkR中这对我来说似乎是无法克服的...)

  1. 回到更新的案例。我找到了如下内容:
list.var.1 <- paste0("V_", 1:5)
for (i in 1:length(list.var.1)) {
  sdf <- withColumn(sdf, list.var.1[i], sdf[[list.var.1[i]]] * 1000)
}

这给了我预期的结果,但它是最简单的脚本吗?不能更淡或更多&官方&qot;?

  1. 回到聚合的案例。我找到了如下内容:
# Useful functions
DFjoin <- function(left_df, right_df, key = "key", join_type = "left"){
    left_df <- withColumnRenamed(left_df, key, "left_key")
    right_df <- withColumnRenamed(right_df, key, "right_key")
    result <- join(
        left_df, right_df,
        left_df$left_key == right_df$right_key,
        joinType = join_type)
    result <- withColumnRenamed(result, "left_key", key)
    result$right_key <- NULL
    return(result)
}

sum_spark <- function(res, df, gb, col) {
  Cols <- paste0('S', col)
  tmp <- agg(groupBy(df, gb), alias(sum(df[[col]]), Cols))
  result <- DFjoin(res, tmp, "CODE")
}

# First step to create base SDF called res
res <- SparkR::select(sdf, sdf$CODE) %>% SparkR::distinct()

# Updating res in a for loop with join
for (i in 1:length(list.var.2)){
  res <- sum_spark(res, sdf, "CODE", list.var.2[i])
}

这也给了我预期的结果,但脚本看起来真的很重(根据我的说法,与R Basic相比)。我说错了吗?

我找不到有关此问题的更多信息。所以一切都有帮助!!

推荐答案

您可以参考this answer,了解如何将LApply与其他SparkR函数结合使用来获得您想要的东西,而不是使用for loops

分享一个有用的函数,用于在下面的一列列上使用SparkR::agg,这将满足您的目的:

#' Apply SparkR aggregate function on list of columns
#'
#' This function acts as a boilerplate for simplifying the code to do
#' aggregation on multiple columns as a list and apply Spark::agg function on
#' that.
#'
#' @param spark_df Spark dataframe (Grouped or ususal) on which some SparkR
#'     aggregate function to be applied
#' @param agg_cols_list List of Spark column object having some aggregate
#'     function
#'
#' @examples dontrun{
#'   # sdf is a SparkR dataframe having numeric columns "a" & "b"
#'   sdf <- SparkR::createDataFrame(data.frame(a = c(1, 2), b = c(1, 5)))
#'   sparkr_agg_listargs(sdf,
#'     lapply(c("a", "b"), function(x) sum(SparkR::column(x)))
#'   )
#' }
sparkr_agg_listargs <- function(spark_df, agg_cols_list) {
  do.call(SparkR::agg, c(spark_df, agg_cols_list))
}

请有效使用SparkR::alias获取所需的新列名称。

这篇关于如何在带有SparkR的SparkDataFrame中使用未定义的变量列表作为列名?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 12:51