问题描述
我试图从端口R A code到斯卡拉进行客户分析。我已经计算近因,频率和货币因素对星火成数据帧。
I'm trying to port a code from R to Scala to perform Customer Analysis. I have already computed Recency, Frequency and Monetary factors on Spark into a DataFrame.
下面是数据框的模式:
df.printSchema
root
|-- customerId: integer (nullable = false)
|-- recency: long (nullable = false)
|-- frequency: long (nullable = false)
|-- monetary: double (nullable = false)
这里是一个数据样本,以及:
And here is a data sample as well :
df.order($"customerId").show
+----------+-------+---------+------------------+
|customerId|recency|frequency| monetary|
+----------+-------+---------+------------------+
| 1| 297| 114| 733.27|
| 2| 564| 11| 867.66|
| 3| 1304| 1| 35.89|
| 4| 287| 25| 153.08|
| 6| 290| 94| 316.772|
| 8| 1186| 3| 440.21|
| 11| 561| 5| 489.70|
| 14| 333| 57| 123.94|
我试图找到区间上给出一个概率段每一列向量位数。
I'm trying to find the intervals for on a quantile vector for each column given a probability segment.
,换言之给出非递减断点的概率矢量,
在我的情况下,这将是位数载体,查找包含x的每个元素的区间;
In other words, given a probability vector of non-decreasing breakpoints,in my case it will be the quantile vector, find the interval containing each element of x;
即。 (伪code)
if i <- findInterval(x,v),
for each index j in x
v[i[j]] ≤ x[j] < v[i[j] + 1] where v[0] := - Inf, v[N+1] := + Inf, and N <- length(v).
在R,这转化为下面code:
In R, this translates to the following code :
probSegment <- c(0.0, 0.25, 0.50, 0.75, 1.0)
RFM_table$Rsegment <- findInterval(RFM_table$Recency, quantile(RFM_table$Recency, probSegment))
RFM_table$Fsegment <- findInterval(RFM_table$Frequency, quantile(RFM_table$Frequency, probSegment))
RFM_table$Msegment <- findInterval(RFM_table$Monetary, quantile(RFM_table$Monetary, probSegment))
我有点坚持的位数函数思想。
I'm kind of stuck with the quantile function thought.
在较早<一href=\"http://stackoverflow.com/questions/31432843/how-to-find-median-using-pyspark/31437177?noredirect=1#comment53486994_31437177\">discussion与@ zero323,他建议我用它可以作为快捷方式的 PERCENTRANK
窗函数。我不知道,我可以申请PERCENTRANK函数在这种情况下。
In an earlier discussion with @zero323, he suggest that I used the percentRank
window function which can be used as a shortcut. I'm not sure that I can apply the percentRank function in this case.
如何申请与斯卡拉星火一个数据框列位数函数?如果这是不可能的,我可以使用PERCENTRANK函数呢?
How can I apply a quantile function on a Dataframe column with Scala Spark? If this is not possible, can I use the percentRank function instead?
感谢。
推荐答案
好吧,我仍然认为, PERCENT_RANK
是不够好这里。百分比 PERCENT_RANK
窗函数计算如下:
Well, I still believe that percent_rank
is good enough here. Percent percent_rank
window function is computed as:
让我们定义的 PR 的是:
转化如下:
给人使用的百分位的定义,根据维基百科 ,由Microsoft Excel。
gives a definition of a percentile used, according to Wikipedia, by Microsoft Excel.
所以,你真正需要的唯一事情是 findInterval
UDF将返回一个正确的区间索引。或者您可以使用等级
直接和匹配的等级
范围。
So the only thing you really need is findInterval
UDF which will return a correct interval index. Alternatively you can use rank
directly and match on rank
ranges.
修改
OK,它看起来像 PERCENT_RANK
是不是一个好主意,毕竟:
OK, it looks like percent_rank
is not a good idea after all:
WARN窗口:无分区定义窗口操作!所有的数据移动到单个分区,这可能会导致严重的性能下降。
我不完全知道什么是移动数据到一个分区叫非集合函数的点,但看起来我们又回到了起点。它可以使用 zipWithIndex
在普通 RDD
:
I am not exactly sure what is the point of moving data to a single partition to call non-aggregate function but it looks like we are back to square one. It is possible to use zipWithIndex
on plain RDD
:
import org.apache.spark.sql.{Row, DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField, LongType}
import org.apache.spark.sql.functions.udf
val df = sc.parallelize(Seq(
(1, 297, 114, 733.27),
(2, 564, 11, 867.66),
(3, 1304, 1, 35.89),
(4, 287, 25, 153.08),
(6, 290, 94, 316.772),
(8, 1186, 3, 440.21),
(11, 561, 5, 489.70),
(14, 333, 57, 123.94)
)).toDF("customerId", "recency", "frequency", "monetary")
df.registerTempTable("df")
sqlContext.cacheTable("df")
一个小帮手:
def addRowNumber(df: DataFrame): DataFrame = {
// Prepare new schema
val schema = StructType(
StructField("row_number", LongType, false) +: df.schema.fields)
// Add row number
val rowsWithIndex = df.rdd.zipWithIndex
.map{case (row: Row, idx: Long) => Row.fromSeq(idx +: row.toSeq)}
// Create DataFrame
sqlContext.createDataFrame(rowsWithIndex, schema)
}
和实际的功能:
def findInterval(df: DataFrame, column: Column,
probSegment: Array[Double], outname: String): DataFrame = {
val n = df.count
// Map quantiles to indices
val breakIndices = probSegment.map(p => (p * (n - 1)).toLong)
// Add row number
val dfWithRowNumber = addRowNumber(df.orderBy(column))
// Map indices to values
val breaks = dfWithRowNumber
.where($"row_number".isin(breakIndices:_*))
.select(column.cast("double"))
.map(_.getDouble(0))
.collect
// Get interval
val f = udf((x: Double) =>
scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1))
// Final result
dfWithRowNumber
.select($"*", f(column.cast("double")).alias(outname))
.drop("row_number")
}
和用法示例:
scala> val probs = Array(0.0, 0.25, 0.50, 0.75, 1.0)
probs: Array[Double] = Array(0.0, 0.25, 0.5, 0.75, 1.0)
scala> findInterval(df, $"recency", probs, "interval").show
+----------+-------+---------+--------+--------+
|customerId|recency|frequency|monetary|interval|
+----------+-------+---------+--------+--------+
| 4| 287| 25| 153.08| 1|
| 6| 290| 94| 316.772| 2|
| 1| 297| 114| 733.27| 2|
| 14| 333| 57| 123.94| 3|
| 11| 561| 5| 489.7| 3|
| 2| 564| 11| 867.66| 4|
| 8| 1186| 3| 440.21| 4|
| 3| 1304| 1| 35.89| 5|
+----------+-------+---------+--------+--------+
但我想这是很不理想。
but I guess it is far from optimal.
这篇关于找到一个数据框区间数与分位数功能的使用Scala星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!