



我试图从端口R A code到斯卡拉进行客户分析。我已经计算近因,频率和货币因素对星火成数据帧。

I'm trying to port a code from R to Scala to perform Customer Analysis. I have already computed Recency, Frequency and Monetary factors on Spark into a DataFrame.


 |-- customerId: integer (nullable = false)
 |-- recency: long (nullable = false)
 |-- frequency: long (nullable = false)
 |-- monetary: double (nullable = false)


And here is a data sample as well :


|customerId|recency|frequency|          monetary|
|         1|    297|      114|            733.27|
|         2|    564|       11|            867.66|
|         3|   1304|        1|             35.89|
|         4|    287|       25|            153.08|
|         6|    290|       94|           316.772|
|         8|   1186|        3|            440.21|
|        11|    561|        5|            489.70|
|        14|    333|       57|            123.94|


I'm trying to find the intervals for on a quantile vector for each column given a probability segment.


In other words, given a probability vector of non-decreasing breakpoints,in my case it will be the quantile vector, find the interval containing each element of x;

即。 (伪code)

if i <- findInterval(x,v),
for each index j in x
    v[i[j]] ≤ x[j] < v[i[j] + 1] where v[0] := - Inf, v[N+1] := + Inf, and N <- length(v).


In R, this translates to the following code :

probSegment <- c(0.0, 0.25, 0.50, 0.75, 1.0)

RFM_table$Rsegment <- findInterval(RFM_table$Recency, quantile(RFM_table$Recency, probSegment))
RFM_table$Fsegment <- findInterval(RFM_table$Frequency, quantile(RFM_table$Frequency, probSegment))
RFM_table$Msegment <- findInterval(RFM_table$Monetary, quantile(RFM_table$Monetary, probSegment))


I'm kind of stuck with the quantile function thought.

在较早<一href=\"http://stackoverflow.com/questions/31432843/how-to-find-median-using-pyspark/31437177?noredirect=1#comment53486994_31437177\">discussion与@ zero323,他建议我用它可以作为快捷方式的 PERCENTRANK 窗函数。我不知道,我可以申请PERCENTRANK函数在这种情况下。

In an earlier discussion with @zero323, he suggest that I used the percentRank window function which can be used as a shortcut. I'm not sure that I can apply the percentRank function in this case.


How can I apply a quantile function on a Dataframe column with Scala Spark? If this is not possible, can I use the percentRank function instead?



好吧,我仍然认为, PERCENT_RANK 是不够好这里。百分比 PERCENT_RANK 窗函数计算如下:

Well, I still believe that percent_rank is good enough here. Percent percent_rank window function is computed as:

让我们定义的 PR 的是:


给人使用的百分位的定义,根据维基百科 ,由Microsoft Excel。

gives a definition of a percentile used, according to Wikipedia, by Microsoft Excel.

所以,你真正需要的唯一事情是 findInterval UDF将返回一个正确的区间索引。或者您可以使用等级直接和匹配的等级范围。

So the only thing you really need is findInterval UDF which will return a correct interval index. Alternatively you can use rank directly and match on rank ranges.


OK,它看起来像 PERCENT_RANK 是不是一个好主意,毕竟:

OK, it looks like percent_rank is not a good idea after all:


我不完全知道什么是移动数据到一个分区叫非集合函数的点,但看起来我们又回到了起点。它可以使用 zipWithIndex 在普通 RDD

I am not exactly sure what is the point of moving data to a single partition to call non-aggregate function but it looks like we are back to square one. It is possible to use zipWithIndex on plain RDD:

import org.apache.spark.sql.{Row, DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField, LongType}
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1, 297, 114, 733.27),
  (2, 564, 11, 867.66),
  (3, 1304, 1,  35.89),
  (4, 287, 25, 153.08),
  (6, 290, 94, 316.772),
  (8, 1186, 3, 440.21),
  (11, 561, 5, 489.70),
  (14, 333, 57, 123.94)
)).toDF("customerId", "recency", "frequency", "monetary")



def addRowNumber(df: DataFrame): DataFrame = {
  // Prepare new schema
  val schema = StructType(
    StructField("row_number", LongType, false) +: df.schema.fields)
  // Add row number
  val rowsWithIndex = df.rdd.zipWithIndex
    .map{case (row: Row, idx: Long) => Row.fromSeq(idx +: row.toSeq)}
  // Create DataFrame
  sqlContext.createDataFrame(rowsWithIndex, schema)


def findInterval(df: DataFrame, column: Column,
    probSegment: Array[Double], outname: String): DataFrame = {

  val n = df.count
  // Map quantiles to indices
  val breakIndices  = probSegment.map(p => (p * (n - 1)).toLong)

  // Add row number
  val dfWithRowNumber = addRowNumber(df.orderBy(column))

  // Map indices to values
  val breaks  = dfWithRowNumber

  // Get interval
  val f = udf((x: Double) =>
    scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1))

  // Final result
    .select($"*", f(column.cast("double")).alias(outname))


scala> val probs = Array(0.0, 0.25, 0.50, 0.75, 1.0)
probs: Array[Double] = Array(0.0, 0.25, 0.5, 0.75, 1.0)

scala>  findInterval(df, $"recency", probs, "interval").show
|         4|    287|       25|  153.08|       1|
|         6|    290|       94| 316.772|       2|
|         1|    297|      114|  733.27|       2|
|        14|    333|       57|  123.94|       3|
|        11|    561|        5|   489.7|       3|
|         2|    564|       11|  867.66|       4|
|         8|   1186|        3|  440.21|       4|
|         3|   1304|        1|   35.89|       5|


but I guess it is far from optimal.
