问题描述
我正在使用python编写火花作业.但是,我需要读取一大堆avro文件.
I am writing a spark job using python. However, I need to read in a whole bunch of avro files.
这是最接近的我在Spark的示例文件夹中找到的解决方案.但是,您需要使用spark-submit提交此python脚本.在spark-submit的命令行中,您可以指定驱动程序类,在这种情况下,将找到所有的avrokey和avrovalue类.
This is the closest solution that I have found in Spark's example folder. However, you need to submit this python script using spark-submit. In the command line of spark-submit, you can specify the driver-class, in that case, all your avrokey, avrovalue class will be located.
avro_rdd = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
就我而言,我需要在Python脚本中运行所有内容,我尝试创建一个环境变量以包含jar文件,用手指交叉Python会将jar添加到路径中,但显然不是,它给出了我出现意外的类错误.
In my case, I need to run everything within the Python script, I have tried to create an environment variable to include the jar file, finger cross Python will add the jar to the path but clearly it is not, it is giving me unexpected class error.
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
有人可以帮助我如何在一个python脚本中读取avro文件吗?
Can anyone help me how to read avro file in one python script?
推荐答案
火花> = 2.4.0
您可以使用内置的Avro支持 .该API向后兼容spark-avro
包,但有一些附加功能(最显着的是from_avro
/to_avro
函数).
You can use built-in Avro support. The API is backwards compatible with the spark-avro
package, with a few additions (most notably from_avro
/ to_avro
function).
请注意,模块未与标准Spark二进制文件捆绑在一起,必须使用spark.jars.packages
或等效机制将其包含在内.
Please note that module is not bundled with standard Spark binaries and has to be included using spark.jars.packages
or equivalent mechanism.
另请参见 Pyspark 2.4.0,使用读取流从kafka读取avro-Python
火花< 2.4.0
您可以使用 spark-avro
库.首先让我们创建一个示例数据集:
You can use spark-avro
library. First lets create an example dataset:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
使用spark-csv
读取它很简单:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+
这篇关于如何在PySpark中读取Avro文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!