问题描述
我想弄清楚是否有一个函数可以检查 spark DataFrame 的列是否包含列表中的任何值:
I'm trying to figure out if there is a function that would check if a column of a spark DataFrame contains any of the values in a list:
# define a dataframe
rdd = sc.parallelize([(0,100), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])
# define a list of scores
l = [1]
# filter out records by scores by list l
records = df.filter(~df.score.contains(l))
# expected: (0,100), (0,1), (1,10), (3,18)
我在运行此代码时遇到问题:
I get an issue running this code :
java.lang.RuntimeException: 不支持的文字类型类 java.util.ArrayList [1]
有没有办法做到这一点,或者我们是否必须遍历列表以传递包含?
Is there a way to do this or do we have to loop through the list to pass contains?
推荐答案
我看到了一些不使用 udf的方法/code>.
I see some ways to do this without using a udf
.
您可以使用 pyspark.sql.functions.regexp_extract
,利用如果没有匹配则返回空字符串的事实.
You could use a list comprehension with pyspark.sql.functions.regexp_extract
, exploiting the fact that an empty string is returned if there is no match.
尝试提取列表 l
中的所有值并连接结果.如果生成的连接字符串是空字符串,则意味着没有匹配的值.
Try to extract all of the values in the list l
and concatenate the results. If the resulting concatenated string is an empty string, that means none of the values matched.
例如:
from pyspark.sql.functions import concat, regexp_extract
records = df.where(concat(*[regexp_extract("score", str(val), 0) for val in l]) != "")
records.show()
#+---+-----+
#| id|score|
#+---+-----+
#| 0| 100|
#| 0| 1|
#| 1| 10|
#| 3| 18|
#| 3| 18|
#| 3| 18|
#+---+-----+
如果你看一下执行计划,你会发现它很聪明地将 score
列隐式转换为 string
:
If you take a look at the execution plan, you'll see that it's smart enough cast the score
column to string
implicitly:
records.explain()
#== Physical Plan ==
#*Filter NOT (concat(regexp_extract(cast(score#11L as string), 1, 0)) = )
#+- Scan ExistingRDD[id#10L,score#11L]
另一种方法是使用 pyspark.sql.Column.like
(或类似的 rlike
):
Another way is to use pyspark.sql.Column.like
(or similarly with rlike
):
from functools import reduce
from pyspark.sql.functions import col
records = df.where(
reduce(
lambda a, b: a|b,
map(
lambda val: col("score").like(val.join(["%", "%"])),
map(str, l)
)
)
)
产生与上述相同的输出并具有以下执行计划:
Which produces the same output as above and has the following execution plan:
#== Physical Plan ==
#*Filter Contains(cast(score#11L as string), 1)
#+- Scan ExistingRDD[id#10L,score#11L]
如果你只想要不同的记录,你可以这样做:
If you wanted only distinct records, you can do:
records.distinct().show()
#+---+-----+
#| id|score|
#+---+-----+
#| 0| 1|
#| 0| 100|
#| 3| 18|
#| 1| 10|
#+---+-----+
这篇关于pyspark 我们如何检查列值是否包含在列表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!