本文介绍了在PySpark 2.0中读取序列文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个序列文件,其值看起来像

I have a sequence file whose values look like

(string_value, json_value)

我不在乎字符串值.

在Scala中,我可以通过以下方式读取文件

In Scala I can read the file by

val reader = sc.sequenceFile[String, String]("/path...")
val data = reader.map{case (x, y) => (y.toString)}
val jsondata = spark.read.json(data)

我很难将其转换为PySpark.我尝试使用

I am having a hard time converting this to PySpark. I have tried using

reader= sc.sequenceFile("/path","org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
data = reader.map(lambda x,y: str(y))
jsondata = spark.read.json(data)

这些错误是隐秘的,但如果有帮助,我可以提供.我的问题是,在pySpark2中读取这些序列文件的正确语法是什么?

The errors are cryptic but I can provide them if that helps. My question is, is what is the right syntax for reading these sequence files in pySpark2?

我认为我没有将数组元素正确转换为字符串.如果我做类似简单的事情,就会得到类似的错误

I think I am not converting the array elements to strings correctly. I get similar errors if I do something simple like

m = sc.parallelize([(1, 2), (3, 4)])
m.map(lambda x,y: y.toString).collect()

m = sc.parallelize([(1, 2), (3, 4)])
m.map(lambda x,y: str(y)).collect()

谢谢!

推荐答案

代码的基本问题是所使用的功能.传递给map的函数应采用单个参数.使用以下任一方法:

The fundamental problem with your code is the function you use. Function passed to map should take a single argument. Use either:

reader.map(lambda x: x[1])

或者只是:

reader.values()

只要keyClassvalueClass匹配数据,这应该是您在这里所需要的,并且不需要其他类型转换(这由sequenceFile内部处理).用Scala编写:

As long as keyClass and valueClass match the data this should be all you need here and there should be no need for additional type conversions (this is handled internally by sequenceFile). Write in Scala:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste
// Entering paste mode (ctrl-D to finish)

sc
  .parallelize(Seq(
    ("foo", """{"foo": 1}"""), ("bar", """{"bar": 2}""")))
  .saveAsSequenceFile("example")

// Exiting paste mode, now interpreting.

用Python阅读:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.5.1 (default, Dec  7 2015 11:16:01)
SparkSession available as 'spark'.
In [1]: Text = "org.apache.hadoop.io.Text"

In [2]: (sc
   ...:     .sequenceFile("example", Text, Text)
   ...:     .values()
   ...:     .first())
Out[2]: '{"bar": 2}'

注意:

旧版Python版本支持元组参数解压缩:

Legacy Python versions support tuple parameter unpacking:

reader.map(lambda (_, v): v)

请勿使用用于应向前兼容的代码.

Don't use it for code that should be forward compatible.

这篇关于在PySpark 2.0中读取序列文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-27 16:21