我们正在使用与Spark 1.3.1连接的PySpark库。

我们有两个数据帧,documents_df := {document_id, document_text}keywords_df := {keyword}。我们想结合两个数据帧,并使用document_df.document_text字符串中出现keyword_df.keyword的条件返回带有{document_id, keyword}对的结果数据帧。

例如,在PostgreSQL中,我们可以使用以下形式的ON子句来实现此目的:
document_df.document_text ilike '%' || keyword_df.keyword || '%'
但是,在PySpark中,我无法使用任何形式的连接语法。有人做过这样的事情吗?

亲切的问候,

将要

最佳答案

可能有两种不同的方式,但通常不建议这样做。首先让我们创建一个虚拟数据:

from pyspark.sql import Row

document_row = Row("document_id", "document_text")
keyword_row = Row("keyword")

documents_df = sc.parallelize([
    document_row(1L, "apache spark is the best"),
    document_row(2L, "erlang rocks"),
    document_row(3L, "but haskell is better")
]).toDF()

keywords_df = sc.parallelize([
    keyword_row("erlang"),
    keyword_row("haskell"),
    keyword_row("spark")
]).toDF()
  • Hive UDF
    documents_df.registerTempTable("documents")
    keywords_df.registerTempTable("keywords")
    
    query = """SELECT document_id, keyword
        FROM documents JOIN keywords
        ON document_text LIKE CONCAT('%', keyword, '%')"""
    
    like_with_hive_udf = sqlContext.sql(query)
    like_with_hive_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    
  • Python UDF
    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import BooleanType
    
    # Of you can replace `in` with a regular expression
    contains = udf(lambda s, q: q in s, BooleanType())
    
    like_with_python_udf = (documents_df.join(keywords_df)
        .where(contains(col("document_text"), col("keyword")))
        .select(col("document_id"), col("keyword")))
    like_with_python_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

  • 为什么不推荐?因为在两种情况下都需要笛卡尔积:
    like_with_hive_udf.explain()
    
    ## TungstenProject [document_id#2L,keyword#4]
    ##  Filter document_text#3 LIKE concat(%,keyword#4,%)
    ##   CartesianProduct
    ##    Scan PhysicalRDD[document_id#2L,document_text#3]
    ##    Scan PhysicalRDD[keyword#4]
    
    like_with_python_udf.explain()
    
    ## TungstenProject [document_id#2L,keyword#4]
    ##  Filter pythonUDF#13
    ##   !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ...
    ##    CartesianProduct
    ##     Scan PhysicalRDD[document_id#2L,document_text#3]
    ##     Scan PhysicalRDD[keyword#4]
    

    在没有完整笛卡尔坐标的情况下,还有其他方法可以达到类似的效果。
  • 加入标记化文档-如果关键字列表很大以在单个计算机的内存中处理,则很有用
    from pyspark.ml.feature import Tokenizer
    from pyspark.sql.functions import explode
    
    tokenizer = Tokenizer(inputCol="document_text", outputCol="words")
    
    tokenized = (tokenizer.transform(documents_df)
        .select(col("document_id"), explode(col("words")).alias("token")))
    
    like_with_tokenizer = (tokenized
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          3|haskell|
    ## |          1|  spark|
    ## |          2| erlang|
    ## +-----------+-------+
    

    这需要洗牌,但不需要笛卡尔:
    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#2L,keyword#4]
    ##  SortMergeJoin [token#29], [keyword#4]
    ##   TungstenSort [token#29 ASC], false, 0
    ##    TungstenExchange hashpartitioning(token#29)
    ##     TungstenProject [document_id#2L,token#29]
    ##      !Generate explode(words#27), true, false, [document_id#2L, ...
    ##       ConvertToSafe
    ##        TungstenProject [document_id#2L,UDF(document_text#3) AS words#27]
    ##         Scan PhysicalRDD[document_id#2L,document_text#3]
    ##   TungstenSort [keyword#4 ASC], false, 0
    ##    TungstenExchange hashpartitioning(keyword#4)
    ##     ConvertToUnsafe
    ##      Scan PhysicalRDD[keyword#4]
    
  • Python UDF和广播变量-如果关键字列表相对较小
    from pyspark.sql.types import ArrayType, StringType
    
    keywords = sc.broadcast(set(
        keywords_df.map(lambda row: row[0]).collect()))
    
    bd_contains = udf(
        lambda s: list(set(s.split()) & keywords.value),
        ArrayType(StringType()))
    
    
    like_with_bd = (documents_df.select(
        col("document_id"),
        explode(bd_contains(col("document_text"))).alias("keyword")))
    
    like_with_bd.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

    它既不需要随机播放,也不需要笛卡尔坐标,但是您仍然必须将广播变量传输到每个工作节点。
    like_with_bd.explain()
    
    ## TungstenProject [document_id#2L,keyword#46]
    ##  !Generate explode(pythonUDF#47), true, false, ...
    ##   ConvertToSafe
    ##    TungstenProject [document_id#2L,pythonUDF#47]
    ##     !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ...
    ##      Scan PhysicalRDD[document_id#2L,document_text#3]
    
  • 自Spark 1.6.0起,您可以使用sql.functions.broadcast标记一个小的数据框,从而获得与上述类似的效果,而无需使用UDF和显式广播变量。重用标记化数据:
    from pyspark.sql.functions import broadcast
    
    like_with_tokenizer_and_bd = (broadcast(tokenized)
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#3L,keyword#5]
    ##  BroadcastHashJoin [token#10], [keyword#5], BuildLeft
    ##   TungstenProject [document_id#3L,token#10]
    ##    !Generate explode(words#8), true, false, ...
    ##     ConvertToSafe
    ##      TungstenProject [document_id#3L,UDF(document_text#4) AS words#8]
    ##       Scan PhysicalRDD[document_id#3L,document_text#4]
    ##   ConvertToUnsafe
    ##    Scan PhysicalRDD[keyword#5]
    

  • 相关的:
  • 有关近似匹配,请参见Efficient string matching in Apache Spark
  • 10-01 02:27
    查看更多