本文介绍了通过Hadoop的输入格式例如BigQuery的连接器,用于pyspark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经存入的BigQuery桌的大型数据集,我希望将其加载到pypark RDD为ETL数据处理。

I have a large dataset stored into a BigQuery table and I would like to load it into a pypark RDD for ETL data processing.

我意识到,BigQuery的支持Hadoop的输入/输出格式为:

I realized that BigQuery supports the Hadoop Input / Output format

和pyspark应能,以便通过使用该方法newAPIHadoopRDD创建一个RDD使用这个接口。

and pyspark should be able to use this interface in order to create an RDD by using the method "newAPIHadoopRDD".

不幸的是,两端的文档似乎稀缺和超越我的Hadoop /星火/ BigQuery的知识。有没有谁已经找到了如何做到这一点任何人?

Unfortunately, the documentation on both ends seems scarce and goes beyond my knowledge of Hadoop/Spark/BigQuery. Is there anybody who has figured out how to do this?

推荐答案

谷歌现在拥有的的关于如何使用BigQuery的连接器的火花。

Google now has an example on how to use the BigQuery connector with Spark.

有似乎是使用GsonBigQueryInputFormat一个问题,但我有一个简单的莎士比亚单词统计工作的例子。

There does seem to be a problem using the GsonBigQueryInputFormat, but I got a simple Shakespeare word counting example working

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)

这篇关于通过Hadoop的输入格式例如BigQuery的连接器,用于pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-29 04:46
查看更多