Hive 是一个数据仓库,旨在查询和聚合驻留在 HDFS 上的大型数据集。

标准的 INSERT INTO 语法表现不佳,因为:

  • 每个语句都需要执行一个 Map/Reduce 过程。
  • 每个语句都会导致一个新文件被添加到 HDFS - 随着时间的推移,这将导致从表中读取时的性能非常差。

  • 话虽如此,现在有一个用于 Hive/HCatalog 的 Streaming API,详细信息 here

    我面临着使用 Python 将数据快速插入 Hive 的需求。我知道 pyhivepyhs2 库,但它们似乎都没有使用 Streaming API。

    有没有人成功地让 Python 使用 Streaming API 将多行插入到 Hive 中,这是如何完成的?

    我期待你的见解!

    最佳答案

    Hive 用户可以通过脚本流式传输表以转换该数据:
    添加文件替换-nan-with-zeros.py;

    SELECT
      TRANSFORM (...)
      USING 'python replace-nan-with-zeros.py'
      AS (...)
    FROM some_table;
    

    这是一个简单的 Python 脚本:
        #!/usr/bin/env python
        import sys
    
    kFirstColumns= 7
    
    def main(argv):
    
        for line in sys.stdin:
            line = line.strip();
            inputs = line.split('\t')
    
            # replace NaNs with zeros
            outputs = [ ]
            columnIndex = 1;
            for value in inputs:
                newValue = value
                if columnIndex > kFirstColumns:
                    newValue = value.replace('NaN','0.0')
                outputs.append(newValue)
                columnIndex = columnIndex + 1
    
            print '\t'.join(outputs)
    
    if __name__ == "__main__":
        main(sys.argv[1:])
    

    Hive 和 Python

    Python 可以通过 HiveQL TRANSFORM 语句用作来自 Hive 的 UDF。例如,以下 HiveQL 调用存储在 streaming.py 文件中的 Python 脚本。

    基于 Linux 的 HDInsight

    添加文件 wasb:///streaming.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
      USING 'streaming.py' AS
      (clientid string, phoneLable string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    

    基于 Windows 的 HDInsight

    添加文件 wasb:///streaming.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
      USING 'D:\Python27\python.exe streaming.py' AS
      (clientid string, phoneLable string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    

    下面是这个例子的作用:

    1.文件开头的add file语句将streaming.py文件添加到分布式缓存中,因此集群中的所有节点都可以访问它。

    2.SELECT TRANSFORM ... USING 语句从hivesampletable 中选择数据,并将clientid、devicemake 和devicemodel 传递给streaming.py 脚本。

    3.AS子句描述了streaming.py返回的字段

    这是 HiveQL 示例使用的 streaming.py 文件。
    #!/usr/bin/env python
    
    import sys
    import string
    import hashlib
    
    while True:
      line = sys.stdin.readline()
      if not line:
        break
    
      line = string.strip(line, "\n ")
      clientid, devicemake, devicemodel = string.split(line, "\t")
      phone_label = devicemake + ' ' + devicemodel
      print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
    

    由于我们使用的是流式传输,因此该脚本必须执行以下操作:

    1.从标准输入读取数据。这是通过在此示例中使用 sys.stdin.readline() 来完成的。

    2.使用 string.strip(line, "\n ") 删除尾随的换行符,因为我们只想要文本数据而不是行尾指示符。

    3.在进行流处理时,一行包含所有值,每个值之间有一个制表符。因此 string.split(line, "\t") 可用于拆分每个选项卡的输入,仅返回字段。

    4. 处理完成后,输出必须作为单行写入 STDOUT,每个字段之间有一个制表符。这是通过使用 print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]) 来完成的。

    5.这一切都发生在一个while循环中,该循环将重复直到没有读取到任何行,此时break退出循环并且脚本终止。

    除此之外,脚本只是连接 devicemake 和 devicemodel 的输入值,并计算连接值的哈希值。非常简单,但它描述了从 Hive 调用的任何 Python 脚本应该如何工作的基础知识:循环,读取输入直到没有更多输入,在选项卡处将每一行输入分开,处理,写入一行制表符分隔的输出。

    10-08 02:39