问题描述
我有一个数据框,我想对每行应用一个函数.此功能取决于其他数据框.
I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.
简化示例.我有如下三个数据框:
Simplified example. I have three dataframes like below:
df = sc.parallelize([
['a', 'b', 1],
['c', 'd', 3]
]).toDF(('feat1', 'feat2', 'value'))
df_other_1 = sc.parallelize([
['a', 0, 1, 0.0],
['a', 1, 3, 0.1],
['a', 3, 10, 1.0],
['c', 0, 10, 0.2],
['c', 10, 25, 0.5]
]).toDF(('feat1', 'lower', 'upper', 'score'))
df_other_2 = sc.parallelize([
['b', 0, 4, 0.1],
['b', 4, 20, 0.5],
['b', 20, 30, 1.0],
['d', 0, 5, 0.05],
['d', 5, 22, 0.9]
]).toDF(('feat1', 'lower', 'upper', 'score'))
对于 df
的每一行,我想从 df_other_1
feat1 和 feat2
的唯一上限值>和 df_other_2
,即对于第一行,唯一值是(1、3、10、4、20、30).然后,我将它们排序为(30,20,10,4,3,1)并添加到前面,第一个数字上方. df
会变成这样:
For each row of df
, I want to collect the unique upper values for feat1
and feat2
from df_other_1
and df_other_2
, i.e. for first row, the unique values are (1, 3, 10, 4, 20, 30). Then, I'll sort them like (30, 20, 10, 4, 3, 1) and add to the front, one number above the first one. The df
would become like so:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
['c', 'd', 3, [26, 25, 22, 10, 5]]
]).toDF(('feat1', 'feat2', 'value', 'lst'))
然后,对于 df
的每一行以及 lst
的各个值,我想计算分数
的总和来自 df_other_1
和 df_other_2
的地方,其中 lst
的每个值都在 upper
和 lower
之内.我的目标是在总得分高于某个阈值(例如1.4)的每个 lst
中找到最低的值.
Then, for each row of df
and for each of the respective values of the lst
, I want to calculate the sum of score
from both df_other_1
and df_other_2
where each value of lst
falls within upper
and lower
. My goal is to find the lowest value in each lst
whose total score is above some threshold (e.g. 1.4).
这是计算总分的方法.因此,对于 df
的第一行, lst
的第一个值为31.在 feat1
的 df_other_1
中,它高于最高存储分区,因此得分为1.与 df_other_2
相同.因此,总分将为1 + 1 = 2.对于10的值(再次用于第一行),总得分将为1 + 0.5 = 1.5.
Here's how to calculate the total score. So, for the first row of df
, the first value of lst
is 31. In df_other_1
for feat1
, it is above the highest bucket so it would get a score of 1. Same for df_other_2
. So, the total score would be 1+1 =2. For the value of 10 (again for the first row), the total score would be 1 + 0.5 = 1.5.
这是 df
最终的样子:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))
我实际上是在寻找这些目标值 4
和 25
.中间步骤并不重要.
I'm actually looking to find these target values 4
and 25
. The intermediate steps do not really matter.
=========================================================================
==========================================================================
这是我到目前为止尝试过的:
Here's what I tried so far:
def get_threshold_for_row(feat1, feat2, threshold):
this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
this_df_other_2 = df_other_2.filter(col('feat1') == feat2)
values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
values_feat_1.append(values_feat_1[-1] + 1)
values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
values_feat_2.append(values_feat_2[-1] + 1)
values = values_feat_1 + values_feat_2
values = list(set(values)) #Keep unique values
values.sort(reverse=True) #Sort from largest to smallest
df_1_score = df_2_score = 0
prev_value = 10000 #Any large number
prev_score = 10000
for value in values:
df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)
total_score = df_1_score + df_2_score
if total_score < threshold and prev_score >= threshold:
return prev_value
prev_score = total_score
prev_value = value
def is_dataframe_empty(df):
return len(df.take(1)) == 0
def get_score_for_key(scores_df, grouping_key, this_id, value):
if is_dataframe_empty(scores_df):
return 0.0
w = Window.partitionBy([grouping_key]).orderBy(col('upper'))
scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
.withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
.drop('prev_value')
scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
.withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
.drop('next_value').cache()
grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) &
(((value >= col('from_value')) & (value < col('to_value'))) |
((value >= col('to_value')) & (col('is_last') == 1)) |
((value < col('from_value')) & (col('is_first') == 1)) |
(col('from_value').isNull()))) \
.withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
.collect()[0]['final_score']
return grouping_key_score
df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
.map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
.toDF()
但是我得到: AttributeError:"Py4JError"对象没有属性"message"
很抱歉,很长的帖子.有什么想法吗?
Sorry for the long post. Any ideas?
推荐答案
tl; dr 这在UDF中是不可能的.
tl;dr That is not possible in UDFs.
从广义上讲,UDF是一个接受零个或多个列值(作为列引用)的函数(实际上是Catalyst表达式).
In the most broader sense, a UDF is a function (a Catalyst expression actually) that accepts zero or more column values (as Column references).
如果UDF是用户定义的聚合函数(UDAF),则UDF只能处理在更广泛的情况下可能是整个DataFrame的记录.
A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF).
如果要在UDF中处理多个数据框,则必须 join
,使数据框具有要用于UDF的列.
If you want to work on more than one DataFrame in a UDF you have to join
the DataFrames to have the columns you want to use for the UDF.
这篇关于如何将DataFrame作为输入传递给Spark UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!