我是新手,可以通过POC学习。作为此POC的一部分,我试图直接执行hql文件,该文件具有transform关键字以使用python udf。

我已经在CLI“hive -f filename.hql”中测试了hql脚本,并且工作正常。
我在spark-sql中尝试过的相同脚本,但是由于hdfs path not found错误而失败。我试图以下面的不同方式提供hdfs路径,但都无法正常工作

"/test/scripts/test.hql"

"hdfs://test.net:8020/test/scripts/test.hql"

"hdfs:///test.net:8020/test/scripts/test.hql"

还尝试如下给出 hive 转换代码中的完整路径
USING "scl enable python27 'python hdfs://test.net:8020/user/test/scripts/TestPython.py'"

配置单元代码
add file hdfs://test.net:8020/user/test/scripts/TestPython.py;


select * from
    (select transform (*)
    USING "scl enable python27 'python TestPython.py'"
    as (Col_1     STRING,
    col_2        STRING,
    ...
    ..
    col_125 STRING
    )
    FROM
    test.transform_inner_temp1 a) b;

TestPython代码:
#!/usr/bin/env python
'''
Created on June 2, 2017

@author: test
'''
import sys
from datetime import datetime
import decimal
import string
D = decimal.Decimal
for line in sys.stdin:
    line = sys.stdin.readline()
    TempList = line.strip().split('\t')
    col_1 = TempList[0]
    ...
    ....
    col_125 = TempList[34] + TempList[32]
    outList.extend((col_1,....col_125))
    outValue = "\t".join(map(str,outList))
    print "%s"%(outValue)

所以我尝试了另一种方法直接在spark-submit中执行
spark-submit --master yarn-cluster  hdfs://test.net:8020/user/test/scripts/testspark.py

testspark.py
from pyspark.sql.types import StringType
from pyspark import SparkConf, SparkContext
from pyspark import SQLContext
conf = SparkConf().setAppName("gveeran pyspark test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
with open("hdfs://test.net:8020/user/test/scripts/test.hql") as fr:
   query = fr.read()
results = sqlContext.sql(query)
results.show()

但是还是如下相同的问题
Traceback (most recent call last):
  File "PySparkTest2.py", line 7, in <module>
    with open("hdfs://test.net:8020/user/test/scripts/test.hql") as fr:
IOError: [Errno 2] No such file or directory: 'hdfs://test.net:8020/user/test/scripts/test.hql'

最佳答案

您可以将文件作为查询读取,然后作为spark sql作业执行

例:-

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
sc =SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)
with open("/home/hadoop/test/abc.hql") as fr:
    query = fr.read()
    print(query)
    results = sqlCtx.sql(query)

关于python - 如何在Spark中使用转换python udf执行hql脚本?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46213003/

10-11 22:42
查看更多