我是Spark&pyspark的新手。
我正在将一个小的csv文件(约40k)读入数据帧。
from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
我得到了一些奇怪的错误,这种错误并非每次都会发生,但确实会定期发生
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
一旦引发了EOFError,我将再次看到它,直到执行需要与Spark服务器进行交互的操作为止
当我调用df2.count()时,它显示[Stage xxx]提示符,这是我进入Spark服务器的意思。当我使用df2做某事时,所有触发的事情最终似乎都会再次给出EOFError。
df(vs. df2)似乎没有发生这种情况,因此df.map()行似乎一定有这种情况。
最佳答案
将数据帧转换为rdd后,您能否尝试做映射。您正在将map函数应用于数据框,然后再次从中创建数据框。语法类似于
df.rdd.map().toDF()
请让我知道它是否有效。谢谢。
关于python - 调用 map 后pyspark EOFError,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36592665/