我是Spark&pyspark的新手。

我正在将一个小的csv文件(约40k)读入数据帧。

from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()

我得到了一些奇怪的错误,这种错误并非每次都会发生,但确实会定期发生
>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
    raise EOFError
EOFError
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

一旦引发了EOFError,我将再次看到它,直到执行需要与Spark服务器进行交互的操作为止

当我调用df2.count()时,它显示[Stage xxx]提示符,这是我进入Spark服务器的意思。当我使用df2做某事时,所有触发的事情最终似乎都会再次给出EOFError。

df(vs. df2)似乎没有发生这种情况,因此df.map()行似乎一定有这种情况。

最佳答案

将数据帧转换为rdd后,您能否尝试做映射。您正在将map函数应用于数据框,然后再次从中创建数据框。语法类似于

df.rdd.map().toDF()

请让我知道它是否有效。谢谢。

关于python - 调用 map 后pyspark EOFError,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36592665/

10-11 15:19