本文介绍了我们怎样才能使用SQL式的&QUOT 2星火SQL dataframes;喜欢"标准?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用的PySpark库星火1.3.1接口。

我们有两个dataframes, documents_df:= {DOCUMENT_ID,document_text} keywords_df:= {关键词} 。我们想加入这两个dataframes并返回与 {DOCUMENT_ID,关键词} 对所得数据帧,使用该keyword_df.keyword出现在document_df.document_text字符串的标准。

在PostgreSQL中,例如,我们可以使用形式的ON子句实现这一点:

document_df.document_text ILIKE'%'|| keyword_df.keyword || '%'

然而,在PySpark,我不能得到任何形式的连接语法工作的。有没有人取得了这样的事情过吗?

通过亲切的问候,

威尔


解决方案

有可能以两种不同的方式,但一般来说不建议使用。首先让我们创建一个虚拟的数据:

 从pyspark.sql进口排document_row =行(DOCUMENT_ID,document_text)
keyword_row =行(关键字)documents_df = sc.parallelize([
    document_row(1L,阿帕奇火花是最好的),
    document_row(2L,二郎石),
    document_row(3L,但Haskell是更好)
])。toDF()keywords_df = sc.parallelize([
    keyword_row(二郎),
    keyword_row(哈斯克尔),
    keyword_row(火花)
])。toDF()


  1. 蜂巢的UDF

      documents_df.registerTempTable(文档)
    keywords_df.registerTempTable(关键词)查询=SELECT DOCUMENT_ID,关键字
        从文档中JOIN关键字
        ON document_text LIKE CONCAT('%',关键字,'%')like_with_hive_udf = sqlContext.sql(查询)
    like_with_hive_udf.show()## + ----------- + ------- +
    ## | DOCUMENT_ID |关键词|
    ## + ----------- + ------- +
    ## | 1 |火花|
    ## | 2 |二郎|
    ## | 3 |哈斯克尔|
    ## + ----------- + ------- +


  2. Python的UDF

     从pyspark.sql.functions导入UDF,山坳
    从pyspark.sql.types进口BooleanType#你可以in`用常规的前pression取代`
    包含= UDF(拉姆达S,Q:●在S,BooleanType())like_with_python_udf =(documents_df.join(keywords_df)
        。凡(包含(COL(document_text),列(关键字)))
        。选择(COL(DOCUMENT_ID),列(关键字)))
    like_with_python_udf.show()## + ----------- + ------- +
    ## | DOCUMENT_ID |关键词|
    ## + ----------- + ------- +
    ## | 1 |火花|
    ## | 2 |二郎|
    ## | 3 |哈斯克尔|
    ## + ----------- + ------- +


为什么不推荐?因为在这两种情况下,它需要一个笛卡尔积

  like_with_hive_udf.explain()## TungstenProject [DOCUMENT_ID#2L,关键字#4]
##滤波器document_text#3 LIKE CONCAT(%,关键字#4%)
## 笛卡尔积
##扫描PhysicalRDD [DOCUMENT_ID#2L,document_text#3]
##扫描PhysicalRDD [关键字#4]like_with_python_udf.explain()## TungstenProject [DOCUMENT_ID#2L,关键字#4]
##过滤pythonUDF#13
## BatchPythonEvaluation PythonUDF#<!拉姆达>(document_text#3,#关键字4),...
## 笛卡尔积
##扫描PhysicalRDD [DOCUMENT_ID#2L,document_text#3]
##扫描PhysicalRDD [关键字#4]

有其他的方式来实现,而不完整的笛卡尔类似的效果。


  1. 在加入标记过的文件 - 如果有用的关键字列表是到大在一台机器的内存来处理

     从pyspark.ml.feature进口标记生成器
    从pyspark.sql.functions导入爆炸标记生成器=标记生成器(inputCol =document_text,outputCol =字)符号化=(tokenizer.transform(documents_df)
        。选择(COL(DOCUMENT_ID),爆炸(COL(字))。别名(令牌)))like_with_tokenizer =(标记化
        。加入(keywords_df,列(令牌)== COL(关键字))
        .drop(令牌))like_with_tokenizer.show()## + ----------- + ------- +
    ## | DOCUMENT_ID |关键词|
    ## + ----------- + ------- +
    ## | 3 |哈斯克尔|
    ## | 1 |火花|
    ## | 2 |二郎|
    ## + ----------- + ------- +

    这需要洗牌,但不是直角坐标:

      like_with_tokenizer.explain()## TungstenProject [DOCUMENT_ID#2L,关键字#4]
    ## SortMergeJoin [令牌#29],[关键字#4]
    ## TungstenSort [令牌#29 ASC],假的,0
    ## TungstenExchange hashpartitioning(令牌#29)
    ## TungstenProject [DOCUMENT_ID#2L,令牌#29]
    ##!生成爆炸(字27号),真,假,[DOCUMENT_ID#2L,...
    ## ConvertToSafe
    ## TungstenProject [DOCUMENT_ID#2L,UDF(document_text#3)字27号]
    ##扫描PhysicalRDD [DOCUMENT_ID#2L,document_text#3]
    ## TungstenSort [关键字#4 ASC],假的,0
    ## TungstenExchange hashpartitioning(关键字4#)
    ## ConvertToUnsafe
    ##扫描PhysicalRDD [关键字#4]


  2. Python的UDF和广播变量 - 如果关键字列表是比较小的。

     从pyspark.sql.types输入数组类型,StringType关键字= sc.broadcast(集(
        keywords_df.map(拉姆达行:行[0])。收集()))bd_contains = UDF(
        拉姆达小号:名单(套(s.split())及keywords.value)
        数组类型(StringType()))
    like_with_bd =(documents_df.select(
        COL(DOCUMENT_ID),
        爆炸(bd_contains(COL(document_text)))。别名(关键字)))like_with_bd.show()## + ----------- + ------- +
    ## | DOCUMENT_ID |关键词|
    ## + ----------- + ------- +
    ## | 1 |火花|
    ## | 2 |二郎|
    ## | 3 |哈斯克尔|
    ## + ----------- + ------- +

    这既不需要也不洗牌,但笛卡尔你还是要播出变量传递到每一个工作节点。

      like_with_bd.explain()## TungstenProject [DOCUMENT_ID#2L,关键字#46]
    ##!生成爆炸(pythonUDF#47),真的,假的,...
    ## ConvertToSafe
    ## TungstenProject [DOCUMENT_ID#2L,pythonUDF#47]
    !## BatchPythonEvaluation PythonUDF#<&拉姆达GT;(3 document_text#),...
    ##扫描PhysicalRDD [DOCUMENT_ID#2L,document_text#3]


  3. 由于星火1.6.0可以使用标记一个小的数据帧 sql.functions.broadcast 来得到与上述类似的效果,而无需使用UDF和明确的广播变量。重复使用符号化的数据:

     从pyspark.sql.functions进口广播like_with_tokenizer_and_bd =(广播(标记化)
        。加入(keywords_df,列(令牌)== COL(关键字))
        .drop(令牌))like_with_tokenizer.explain()## TungstenProject [DOCUMENT_ID#3L,关键字#5]
    ## BroadcastHashJoin [令牌#10],[关键字#5],BuildLeft
    ## TungstenProject [DOCUMENT_ID#3L,令牌#10]
    ##!生成爆炸(字#8),真,假...
    ## ConvertToSafe
    ## TungstenProject [DOCUMENT_ID#3L,UDF(document_text#4)词#8]
    ##扫描PhysicalRDD [DOCUMENT_ID#3L,document_text#4]
    ## ConvertToUnsafe
    ##扫描PhysicalRDD [关键字#5]


We are using the PySpark libraries interfacing with Spark 1.3.1.

We have two dataframes, documents_df := {document_id, document_text} and keywords_df := {keyword}. We would like to JOIN the two dataframes and return a resulting dataframe with {document_id, keyword} pairs, using the criteria that the keyword_df.keyword appears in the document_df.document_text string.

In PostgreSQL, for example, we could achieve this using an ON clause of the form:

document_df.document_text ilike '%' || keyword_df.keyword || '%'

In PySpark however, I cannot get any form of join syntax to work. Has anybody achieved something like this before?

With kind regards,

Will

解决方案

It is possible in a two different ways but generally speaking not recommended. First lets create a dummy data:

from pyspark.sql import Row

document_row = Row("document_id", "document_text")
keyword_row = Row("keyword")

documents_df = sc.parallelize([
    document_row(1L, "apache spark is the best"),
    document_row(2L, "erlang rocks"),
    document_row(3L, "but haskell is better")
]).toDF()

keywords_df = sc.parallelize([
    keyword_row("erlang"),
    keyword_row("haskell"),
    keyword_row("spark")
]).toDF()
  1. Hive UDFs

    documents_df.registerTempTable("documents")
    keywords_df.registerTempTable("keywords")
    
    query = """SELECT document_id, keyword
        FROM documents JOIN keywords
        ON document_text LIKE CONCAT('%', keyword, '%')"""
    
    like_with_hive_udf = sqlContext.sql(query)
    like_with_hive_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

  2. Python UDF

    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import BooleanType
    
    # Of you can replace `in` with a regular expression
    contains = udf(lambda s, q: q in s, BooleanType())
    
    like_with_python_udf = (documents_df.join(keywords_df)
        .where(contains(col("document_text"), col("keyword")))
        .select(col("document_id"), col("keyword")))
    like_with_python_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

Why not recommended? Because in both cases it requires a Cartesian product:

like_with_hive_udf.explain()

## TungstenProject [document_id#2L,keyword#4]
##  Filter document_text#3 LIKE concat(%,keyword#4,%)
##   CartesianProduct
##    Scan PhysicalRDD[document_id#2L,document_text#3]
##    Scan PhysicalRDD[keyword#4]

like_with_python_udf.explain()

## TungstenProject [document_id#2L,keyword#4]
##  Filter pythonUDF#13
##   !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ...
##    CartesianProduct
##     Scan PhysicalRDD[document_id#2L,document_text#3]
##     Scan PhysicalRDD[keyword#4]

There are other ways to achieve a similar effect without a full Cartesian.

  1. Join on tokenized document - useful if keywords list is to large to be handled in a memory of a single machine

    from pyspark.ml.feature import Tokenizer
    from pyspark.sql.functions import explode
    
    tokenizer = Tokenizer(inputCol="document_text", outputCol="words")
    
    tokenized = (tokenizer.transform(documents_df)
        .select(col("document_id"), explode(col("words")).alias("token")))
    
    like_with_tokenizer = (tokenized
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          3|haskell|
    ## |          1|  spark|
    ## |          2| erlang|
    ## +-----------+-------+
    

    This requires shuffle but not Cartesian:

    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#2L,keyword#4]
    ##  SortMergeJoin [token#29], [keyword#4]
    ##   TungstenSort [token#29 ASC], false, 0
    ##    TungstenExchange hashpartitioning(token#29)
    ##     TungstenProject [document_id#2L,token#29]
    ##      !Generate explode(words#27), true, false, [document_id#2L, ...
    ##       ConvertToSafe
    ##        TungstenProject [document_id#2L,UDF(document_text#3) AS words#27]
    ##         Scan PhysicalRDD[document_id#2L,document_text#3]
    ##   TungstenSort [keyword#4 ASC], false, 0
    ##    TungstenExchange hashpartitioning(keyword#4)
    ##     ConvertToUnsafe
    ##      Scan PhysicalRDD[keyword#4]
    

  2. Python UDF and broadcast variable - if keywords list is relatively small

    from pyspark.sql.types import ArrayType, StringType
    
    keywords = sc.broadcast(set(
        keywords_df.map(lambda row: row[0]).collect()))
    
    bd_contains = udf(
        lambda s: list(set(s.split()) & keywords.value),
        ArrayType(StringType()))
    
    
    like_with_bd = (documents_df.select(
        col("document_id"),
        explode(bd_contains(col("document_text"))).alias("keyword")))
    
    like_with_bd.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

    It requires neither shuffle nor Cartesian but you still have to transfer broadcast variable to each worker node.

    like_with_bd.explain()
    
    ## TungstenProject [document_id#2L,keyword#46]
    ##  !Generate explode(pythonUDF#47), true, false, ...
    ##   ConvertToSafe
    ##    TungstenProject [document_id#2L,pythonUDF#47]
    ##     !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ...
    ##      Scan PhysicalRDD[document_id#2L,document_text#3]
    

  3. Since Spark 1.6.0 you can mark a small data frame using sql.functions.broadcast to get a similar effect as above without using UDFs and explicit broadcast variables. Reusing tokenized data:

    from pyspark.sql.functions import broadcast
    
    like_with_tokenizer_and_bd = (broadcast(tokenized)
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#3L,keyword#5]
    ##  BroadcastHashJoin [token#10], [keyword#5], BuildLeft
    ##   TungstenProject [document_id#3L,token#10]
    ##    !Generate explode(words#8), true, false, ...
    ##     ConvertToSafe
    ##      TungstenProject [document_id#3L,UDF(document_text#4) AS words#8]
    ##       Scan PhysicalRDD[document_id#3L,document_text#4]
    ##   ConvertToUnsafe
    ##    Scan PhysicalRDD[keyword#5]
    

这篇关于我们怎样才能使用SQL式的&QUOT 2星火SQL dataframes;喜欢&QUOT;标准?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:27