问题描述
我正在尝试从Google存储桶中将json文件读取到本地Spark机器上的pyspark数据帧中.这是代码:
I am trying to read a json file from a google bucket into a pyspark dataframe on a local spark machine. Here's the code:
import pandas as pd
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
conf = SparkConf().setAll([('spark.executor.memory', '16g'),
('spark.executor.cores','4'),
('spark.cores.max','4')]).setMaster('local[*]')
spark = (SparkSession.
builder.
config(conf=conf).
getOrCreate())
sc = spark.sparkContext
import glob
import bz2
import json
import pickle
bucket_path = "gs://<SOME_PATH>/"
client = storage.Client(project='<SOME_PROJECT>')
bucket = client.get_bucket ('<SOME_PATH>')
blobs = bucket.list_blobs()
theframes = []
for blob in blobs:
print(blob.name)
testspark = spark.read.json(bucket_path + blob.name).cache()
theframes.append(testspark)
它正在从存储桶中读取文件正常(我可以从blob.name中看到打印内容),但是随后崩溃,如下所示:
It's reading files from the bucket fine (I can see the print out from blob.name), but then crashes like this:
Traceback (most recent call last):
File "test_code.py", line 66, in <module>
testspark = spark.read.json(bucket_path + blob.name).cache()
File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 274, in json
return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o51.json.
: java.io.IOException: No FileSystem for scheme: gs
我已经看到了在stackoverflow上讨论过的这种类型的错误,但是当我拥有pyspark时,大多数解决方案似乎都在Scala中,并且/或者涉及到与core-site.xml混淆,而我所做的却没有任何效果.
I've seen this type of error discussed on stackoverflow, but most solutions seem to be in Scala while I have pyspark, and/or involve messing with core-site.xml, which I've done to no effect.
我正在使用spark 2.4.1和python 3.6.7.
I am using spark 2.4.1 and python 3.6.7.
我们将不胜感激!
推荐答案
需要一些配置参数才能识别"gs".作为分布式文件系统.
Some config params are required to recognize "gs" as a distributed filesystem.
将此设置用于Google云存储连接器gcs-connector-hadoop2-latest.jar
Use this setting for google cloud storage connector, gcs-connector-hadoop2-latest.jar
spark = SparkSession \
.builder \
.config("spark.jars", "/path/to/gcs-connector-hadoop2-latest.jar") \
.getOrCreate()
可以从pyspark设置的其他配置
Other configs that can be set from pyspark
spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true,
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'ture')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "/path/to/keyfile")
# Following are required if you are using oAuth
spark._jsc.hadoopConfiguration().set('fs.gs.auth.client.id', 'YOUR_OAUTH_CLIENT_ID')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.client.secret', 'OAUTH_SECRET')
或者,您可以在core-site.xml或spark-defaults.conf中设置这些配置.
Alternatively you can set up these configs in core-site.xml or spark-defaults.conf.
这篇关于pyspark错误:: java.io.IOException:方案:gs没有文件系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!