Hive 是一个数据仓库,旨在查询和聚合驻留在 HDFS 上的大型数据集。
标准的 INSERT INTO
语法表现不佳,因为:
话虽如此,现在有一个用于 Hive/HCatalog 的 Streaming API,详细信息 here 。
我面临着使用 Python 将数据快速插入 Hive 的需求。我知道
pyhive
和 pyhs2
库,但它们似乎都没有使用 Streaming API。有没有人成功地让 Python 使用 Streaming API 将多行插入到 Hive 中,这是如何完成的?
我期待你的见解!
最佳答案
Hive 用户可以通过脚本流式传输表以转换该数据:
添加文件替换-nan-with-zeros.py;
SELECT
TRANSFORM (...)
USING 'python replace-nan-with-zeros.py'
AS (...)
FROM some_table;
这是一个简单的 Python 脚本:
#!/usr/bin/env python
import sys
kFirstColumns= 7
def main(argv):
for line in sys.stdin:
line = line.strip();
inputs = line.split('\t')
# replace NaNs with zeros
outputs = [ ]
columnIndex = 1;
for value in inputs:
newValue = value
if columnIndex > kFirstColumns:
newValue = value.replace('NaN','0.0')
outputs.append(newValue)
columnIndex = columnIndex + 1
print '\t'.join(outputs)
if __name__ == "__main__":
main(sys.argv[1:])
Hive 和 Python
Python 可以通过 HiveQL TRANSFORM 语句用作来自 Hive 的 UDF。例如,以下 HiveQL 调用存储在 streaming.py 文件中的 Python 脚本。
基于 Linux 的 HDInsight
添加文件 wasb:///streaming.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'streaming.py' AS
(clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
基于 Windows 的 HDInsight
添加文件 wasb:///streaming.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'D:\Python27\python.exe streaming.py' AS
(clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
下面是这个例子的作用:
1.文件开头的add file语句将streaming.py文件添加到分布式缓存中,因此集群中的所有节点都可以访问它。
2.SELECT TRANSFORM ... USING 语句从hivesampletable 中选择数据,并将clientid、devicemake 和devicemodel 传递给streaming.py 脚本。
3.AS子句描述了streaming.py返回的字段
这是 HiveQL 示例使用的 streaming.py 文件。
#!/usr/bin/env python
import sys
import string
import hashlib
while True:
line = sys.stdin.readline()
if not line:
break
line = string.strip(line, "\n ")
clientid, devicemake, devicemodel = string.split(line, "\t")
phone_label = devicemake + ' ' + devicemodel
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
由于我们使用的是流式传输,因此该脚本必须执行以下操作:
1.从标准输入读取数据。这是通过在此示例中使用 sys.stdin.readline() 来完成的。
2.使用 string.strip(line, "\n ") 删除尾随的换行符,因为我们只想要文本数据而不是行尾指示符。
3.在进行流处理时,一行包含所有值,每个值之间有一个制表符。因此 string.split(line, "\t") 可用于拆分每个选项卡的输入,仅返回字段。
4. 处理完成后,输出必须作为单行写入 STDOUT,每个字段之间有一个制表符。这是通过使用 print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]) 来完成的。
5.这一切都发生在一个while循环中,该循环将重复直到没有读取到任何行,此时break退出循环并且脚本终止。
除此之外,脚本只是连接 devicemake 和 devicemodel 的输入值,并计算连接值的哈希值。非常简单,但它描述了从 Hive 调用的任何 Python 脚本应该如何工作的基础知识:循环,读取输入直到没有更多输入,在选项卡处将每一行输入分开,处理,写入一行制表符分隔的输出。