问题描述
我有一个脚本来分析BSON转储,但是它仅适用于未压缩的文件。
I have a script to analyse BSON dumps, however it works only with uncompressed files. I get an empty RDD while reading gz bson files.
pyspark_location = 'lib/pymongo_spark.py'
HDFS_HOME = 'hdfs://1.1.1.1/'
INPUT_FILE = 'big_bson.gz'
class BsonEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
return JSONEncoder.default(self, obj)
def setup_spark_with_pymongo(app_name='App'):
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sc.addPyFile(pyspark_location)
return sc
def main():
spark_context = setup_spark_with_pymongo('PysparkApp')
filename = HDFS_HOME + INPUT_FILE
import pymongo_spark
pymongo_spark.activate()
rdd = spark_context.BSONFileRDD(filename)
print(rdd.first()) #Raises ValueError("RDD is empty")
我正在使用mongo-java-driver-3.2.2.jar,mongo-hadoop-spark -1.5.2.jar,pymongo-3.2.2-py2.7-linux-x86_64和pymongo_spark以及spark-submit。
部署的Spark版本与Hadoop 2.6.4一起为1.6.1。
I am using mongo-java-driver-3.2.2.jar, mongo-hadoop-spark-1.5.2.jar, pymongo-3.2.2-py2.7-linux-x86_64 and pymongo_spark in along with spark-submit.The version of Spark deployed is 1.6.1 along with Hadoop 2.6.4.
我知道该库不支持拆分压缩的BSON文件。它应该有一个分裂。
我有数百个压缩的BSON文件来分析和压缩每个文件似乎都不可行。
I am aware that the library does not support splitting compressed BSON files, however it should with a single split.I have got hundreds of compressed BSON files to analyse and deflating each of them doesn't seem to be a viable option.
任何想法我该如何进行进一步?
预先感谢!
Any idea how should I proceed further?Thanks in advance!
推荐答案
我刚刚在环境中进行过测试: mongo- hadoop-spark-1.5.2.jar
,适用于Hadoop 2.6.4的Spark版本1.6.1,Pymongo 3.2.2。源文件是的输出,以及一个用于单个拆分的小文件(未压缩的集合大小为105MB)。通过PySpark运行:
I've just tested in the environment: mongo-hadoop-spark-1.5.2.jar
, spark version 1.6.1 for Hadoop 2.6.4, Pymongo 3.2.2. The source file is an output from mongodump compressed, and a small size file for a single split (uncompressed collection size of 105MB). Running through PySpark:
from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()
conf = SparkConf().setAppName("pyspark-bson")
file_path = "/file/example_bson.gz"
rdd = sc.BSONFileRDD(file_path)
rdd.first()
它能够读取压缩的BSON文件,并且列出了第一个文档。请确保您可以访问输入文件,并且该文件采用正确的BSON格式。
It is able to read the compressed BSON file, and listed the first document. Please make sure you can reach the input file, and the file is in the correct BSON format.
这篇关于PySpark:读取压缩的BSON文件时为空RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!