问题描述
我正在使用pySpark 2.3.0,并创建了一个非常简单的Spark数据框来测试VectorAssembler的功能.这是一个较大数据框的子集,在这里我只选择了一些数字(双数据类型)列:
I am working with pySpark 2.3.0 and have a very simple Spark dataframe I created to test the functionality of VectorAssembler. This is a subset of a larger dataframe where I only picked a few numeric (double data type) columns:
>>>cols = ['index','host_listings_count','neighbourhood_group_cleansed',\
'bathrooms','bedrooms','beds','square_feet', 'guests_included',\
'review_scores_rating']
>>>test = df[cols]
>>>test.take(3)
从上面看来,在我看来这个Spark数据帧没有错.因此,我如下所示创建了汇编程序,并显示了所显示的错误.可能出了什么问题?
From the above it seems to me that there is nothing wrong with this Spark dataframe. So I then create the assembler as shown below and get the shown error. What could possibly have gone wrong?
>>>>>>>>>>>>>>>>>>>>功能导入import VectorAssembler>>汇编程序= VectorAssembler(inputCols = cols,outputCol ="features")>>>输出= assembler.transform(测试)>>> output.take(3)
驱动程序堆栈跟踪:位于org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1435)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1423)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1422)在scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:802)在scala.Option.foreach(Option.scala:257)在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)在org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)在org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)在org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)在org.apache.spark.sql.Dataset $$ anonfun $ collectToPython $ 1.apply $ mcI $ sp(Dataset.scala:2768)在org.apache.spark.sql.Dataset $$ anonfun $ collectToPython $ 1.apply(Dataset.scala:2765)在org.apache.spark.sql.Dataset $$ anonfun $ collectToPython $ 1.apply(Dataset.scala:2765)在org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57)在org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)在org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)处sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)处py4j.GatewayConnection.run(GatewayConnection.java:214)位于java.lang.Thread.run(Thread.java:748)由以下原因引起:org.apache.spark.SparkException:无法执行用户定义函数($ anonfun $ 3:(结构)=>向量),位于org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(未知)来源)org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:377)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 2.apply(SparkPlan.scala:231)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 2.apply(SparkPlan.scala:225)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在org.apache.spark.rdd.RDD.iterator(RDD.scala:287)处org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)在org.apache.spark.scheduler.Task.run(Task.scala:99)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:322)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)... 1更多原因:org.apache.spark.SparkException:值汇编不能为null.在org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ assemble $ 1.apply(VectorAssembler.scala:160)在org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ assemble $ 1.apply(VectorAssembler.scala:143)在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)在org.apache.spark.ml.feature.VectorAssembler $ .assemble(VectorAssembler.scala:143)在org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ 3.apply(VectorAssembler.scala:99)在org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ 3.apply(VectorAssembler.scala:98)...还有16个
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) ... 16 more
推荐答案
您发布的堆栈跟踪提到该问题是由正在汇编的列中的空值引起的.
The stack trace you posted mentions that the problem is caused by null values in the columns being assembled.
您需要在 cols
列中处理 null
值.在调用transform之前,先尝试 test.fillna(0,subset = cols)
,或者在那些列中滤除具有空值的行.
You need to deal with null
values in your cols
columns.Try test.fillna(0, subset=cols)
before calling transform, or alternatively, filter out rows with null values in those columns.
这篇关于Spark VectorAssembler错误-PySpark 2.3-Python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!