交互的最佳方式是什么

交互的最佳方式是什么

本文介绍了使用 Pyspark 与 Hbase 交互的最佳方式是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark [spark2.3.1] 和 Hbase1.2.1,我想知道使用 pyspark 访问 Hbase 的最佳方式是什么?

我进行了一些初始级别的搜索,发现几乎没有可用的选项,例如使用 shc-core:1.1.1-2.1-s_2.11.jar 这可以实现,但是无论我在哪里尝试寻找一些示例,在大多数地方,代码是用 Scala 编写的,或者示例也是基于 Scala 的.我尝试在 pyspark 中实现基本代码:

from pyspark import SparkContext从 pyspark.sql 导入 SQLContext定义主():sc = SparkContext()sqlc = SQLContext(sc)data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'目录 = ''.join("""{"table":{"namespace":"default", "name":"firsttable"},"rowkey":"key",列":{"firstcol":{"cf":"rowkey", "col":"key", "type":"string"},"secondcol":{"cf":"d", "col":"colname", "type":"string"}}}.分裂())df = sqlc.read.options(catalog=catalog).format(data_source_format).load()df.select("secondcol").show()# PySpark 应用程序的入口点如果 __name__ == '__main__':主要的()

并使用:

spark-submit --master yarn-client --files/opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --jars/home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py

它返回空白输出:

+---------+|第二列|+---------++---------+

我不确定我做错了什么?也不确定这样做的最佳方法是什么??

任何参考将不胜感激.

问候

解决方案

最后,使用 SHC,我能够使用 pyspark 代码通过 Spark-2.3.1 连接到 HBase-1.2.1.以下是我的作品:

  • 我所有的 hadoop [namenode, datanode, nodemanager, resourcemanager] &hbase [Hmaster、HRegionServer、HQuorumPeer] 守护进程已在我的 EC2 实例上启动并运行.

  • 我在 hdfs 位置/test/emp.csv 放置了 emp.csv 文件,其中包含数据:

key,empId,empName,empWeight1,"E007","布佩什",115.102、《E008》、《楚汉》、110.233,"E009","Prithvi",90.04,"E0010","拉吉",80.05,"E0011","Chauhan",100.0

  • 我使用以下代码行创建了 readwriteHBase.py 文件[用于从 HDFS 读取 emp.csv 文件,然后首先在 HBase 中创建 tblEmployee,将数据推送到 tblEmployee 然后再次读取来自同一个表的一些数据并将其显示在控制台上]:

    from pyspark.sql import SparkSession定义主():spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"writeCatalog = ''.join("""{"table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},"rowkey":"key",列":{"key":{"cf":"rowkey", "col":"key", "type":"int"},"empId":{"cf":"personal","col":"empId","type":"string"},"empName":{"cf":"personal", "col":"empName", "type":"string"},"empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}}}.分裂())writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")print("csv 文件读取", writeDF.show())writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()print("csv 文件写入 HBase")readCatalog = ''.join("""{"table":{"namespace":"default", "name":"tblEmployee"},"rowkey":"key",列":{"key":{"cf":"rowkey", "col":"key", "type":"int"},"empId":{"cf":"personal","col":"empId","type":"string"},"empName":{"cf":"personal", "col":"empName", "type":"string"}}}.分裂())print("要从Hbase表中读取数据")readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()print("从HBase表中读取的数据")readDF.select("empId", "empName").show()readDF.show()# PySpark 应用程序的入口点如果 __name__ == '__main__':主要的()

  • 使用以下命令在 VM 控制台上运行此脚本:

    spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/readwriteHBase.py

  • 中间结果:读取 CSV 文件后:

    +---+-----+-------+---------+|key|empId|empName|empWeight|+---+-----+-------+---------+|1|E007|布佩什|115.1||2|E008|茶涵|110.23||3|E009|Prithvi|90.0||4|E0010|拉杰|80.0||5|E0011|Chauhan|100.0|+---+-----+-------+---------+

  • 最终输出:从 HBase 表中读取数据后:

    +-----+-------+|empId|empName|+-----+-----+|E007|布佩什||E008|茶涵||E009|Prithvi||E0010|拉杰||E0011|Chauhan|+-----+-----+

注意:在创建 Hbase 表并将数据插入 HBase 表时,它期望 NumberOfRegions 应该大于 3,因此我添加了 options(catalog=writeCatalog, newtable=5) 同时向 HBase 添加数据

I am using pyspark [spark2.3.1] and Hbase1.2.1, I am wondering what could be the best possible way of accessing Hbase using pyspark?

I did some initial level of search and found that there are few options available like using shc-core:1.1.1-2.1-s_2.11.jar this could be achieved, but whereever I try to look for some example, at most of the places code is written in Scala or examples are also scala based. I tried implementing basic code in pyspark:

from pyspark import SparkContext
from pyspark.sql import SQLContext

def main():
    sc = SparkContext()
    sqlc = SQLContext(sc)
    data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"firsttable"},
        "rowkey":"key",
        "columns":{
            "firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
            "secondcol":{"cf":"d", "col":"colname", "type":"string"}
        }
    }""".split())
    df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
    df.select("secondcol").show()

# entry point for PySpark application
if __name__ == '__main__':
    main()

and running it using:

spark-submit  --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11  --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py

It is returning me blank output:

+---------+
|secondcol|
+---------+
+---------+

I am not sure what am I doing wrong? Also not sure what would be the best approach of doing this??

Any references would be appreciated.

Regards

解决方案

Finally, Using SHC, I am able to connect to HBase-1.2.1 with Spark-2.3.1 using pyspark code. Following is my work:

  • All my hadoop [namenode, datanode, nodemanager, resourcemanager] & hbase [Hmaster, HRegionServer, HQuorumPeer] deamons were up and running on my EC2 instance.

  • I placed emp.csv file at hdfs location /test/emp.csv, with data:

  • I created readwriteHBase.py file with following line of code [for reading emp.csv file from HDFS, then creating tblEmployee first in HBase, pushing the data into tblEmployee then once again reading some data from the same table and displaying it on console]:

    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()
    
        dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"
        writeCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"},
                      "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}
                    }
                  }""".split())
    
        writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")
        print("csv file read", writeDF.show())
        writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()
        print("csv file written to HBase")
    
        readCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"}
                    }
                  }""".split())
    
        print("going to read data from Hbase table")
        readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()
        print("data read from HBase table")
        readDF.select("empId", "empName").show()
        readDF.show()
    
    # entry point for PySpark application
    if __name__ == '__main__':
        main()
    

  • Ran this script on VM console using command:

    spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
    

  • Intermediate Result: After reading CSV file:

    +---+-----+-------+---------+
    |key|empId|empName|empWeight|
    +---+-----+-------+---------+
    |  1| E007|Bhupesh|    115.1|
    |  2| E008|Chauhan|   110.23|
    |  3| E009|Prithvi|     90.0|
    |  4|E0010|    Raj|     80.0|
    |  5|E0011|Chauhan|    100.0|
    +---+-----+-------+---------+
    

  • Final Output : after reading data from HBase table:

    +-----+-------+
    |empId|empName|
    +-----+-------+
    | E007|Bhupesh|
    | E008|Chauhan|
    | E009|Prithvi|
    |E0010|    Raj|
    |E0011|Chauhan|
    +-----+-------+
    

Note: While creating Hbase table and inserting data into HBase table it expects NumberOfRegions should be greater than 3, hence I have added options(catalog=writeCatalog, newtable=5) while adding data to HBase

这篇关于使用 Pyspark 与 Hbase 交互的最佳方式是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-29 15:34