我有一个火花Java代码,在spark-core_2.11 v2.2.0中运行良好,但在spark-core_2.11 v2.3.1中引发了异常。
代码基本上是将“ isrecurrence”列映射到值1(如果值为true)和0(如果值为false)。
该列还包含值“ null”(作为字符串)。这些“空”字符串将被替换为“ \ N”(因为配置单元将读取此数据为NULL)。

码:

public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

String srcCols = "id,whoid,whatid,whocount,whatcount,subject,activitydate,status,priority,ishighpriority,ownerid,description,isdeleted,accountid,isclosed,createddate,createdbyid,lastmodifieddate,lastmodifiedbyid,systemmodstamp,isarchived,calldurationinseconds,calltype,calldisposition,callobject,reminderdatetime,isreminderset,recurrenceactivityid,isrecurrence,recurrencestartdateonly,recurrenceenddateonly,recurrencetimezonesidkey,recurrencetype,recurrenceinterval,recurrencedayofweekmask,recurrencedayofmonth,recurrenceinstance,recurrencemonthofyear,recurrenceregeneratedtype";
String table = "task";
String[] colArr = srcCols.split(",");
List<String> colsList = Arrays.asList(colArr);
Dataset<Row> filtered = spark.read().format("com.springml.spark.salesforce")
                    .option("username", prop.getProperty("salesforce_user"))
                    .option("password", prop.getProperty("salesforce_auth"))
                    .option("login", prop.getProperty("salesforce_login_url"))
                    .option("soql", "SELECT "+srcCols+" from "+table)
                    .option("version", prop.getProperty("salesforce_version"))
                    .load().persist();

String column = "isrecurrence"; //This column has values 'true' or 'false' as string.
                                //'true' will be mapped to '1' (as string)
                                //'false' will be mapped to '0' (as string).

String newCol = column + "_mapped_to_new_value";

filtered = filtered.selectExpr(convertListToSeq(colsList))
                            .withColumn(newCol, //code is breaking here at "withColumn"
                                when(filtered.col(column).notEqual("null"),
                                    when(filtered.col(column).equalTo("true"), 1)
                                    .otherwise(when(filtered.col(column).equalTo("false"), 0)))
                                .otherwise(lit("\\N"))).alias(newCol)
                            .drop(filtered.col(column));

filtered.write().mode(SaveMode.Overwrite).option("delimiter", "^").csv(hdfsExportLoaction);


错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 16 more fields];;
'Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 16 more fields]
+- Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 15 more fields]
   +- Project [id#35, whoid#21, whatid#1, whocount#13, whatcount#5, subject#27, activitydate#22, status#19, priority#24, ishighpriority#10, ownerid#15, description#2, isdeleted#20, accountid#3, isclosed#12, createddate#34, createdbyid#16, lastmodifieddate#0, lastmodifiedbyid#37, systemmodstamp#28, isarchived#30, calldurationinseconds#23, calltype#9, calldisposition#6, ... 15 more fields]
      +- Relation[LastModifiedDate#0,WhatId#1,Description#2,AccountId#3,RecurrenceDayOfWeekMask#4,WhatCount#5,CallDisposition#6,ReminderDateTime#7,RecurrenceEndDateOnly#8,CallType#9,IsHighPriority#10,RecurrenceRegeneratedType#11,IsClosed#12,WhoCount#13,RecurrenceInterval#14,OwnerId#15,CreatedById#16,RecurrenceActivityId#17,IsReminderSet#18,Status#19,IsDeleted#20,WhoId#21,ActivityDate#22,CallDurationInSeconds#23,... 15 more fields] DatasetRelation(null,com.springml.salesforce.wave.impl.ForceAPIImpl@68303c3e,SELECT id,whoid,whatid,whocount,whatcount,subject,activitydate,status,priority,ishighpriority,ownerid,description,isdeleted,accountid,isclosed,createddate,createdbyid,lastmodifieddate,lastmodifiedbyid,systemmodstamp,isarchived,calldurationinseconds,calltype,calldisposition,callobject,reminderdatetime,isreminderset,recurrenceactivityid,isrecurrence,recurrencestartdateonly,recurrenceenddateonly,recurrencetimezonesidkey,recurrencetype,recurrenceinterval,recurrencedayofweekmask,recurrencedayofmonth,recurrenceinstance,recurrencemonthofyear,recurrenceregeneratedtype from task,null,org.apache.spark.sql.SQLContext@2ec23ec3,null,0,1000,None,false,false)

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:356)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:354)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:354)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1312)
        at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2197)
        at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2164)
        at com.sfdc.SaleforceReader.mapColumns(SaleforceReader.java:187)
        at com.sfdc.SaleforceReader.main(SaleforceReader.java:547)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/07/10 09:38:51 INFO SparkContext: Invoking stop() from shutdown hook
19/07/10 09:38:51 INFO AbstractConnector: Stopped Spark@72456279{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/07/10 09:38:51 INFO SparkUI: Stopped Spark web UI at http://ebdp-po-e007s.sys.comcast.net:4040
19/07/10 09:38:51 INFO YarnClientSchedulerBackend: Interrupting monitor thread
19/07/10 09:38:51 INFO YarnClientSchedulerBackend: Shutting down all executors
19/07/10 09:38:51 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/07/10 09:38:51 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
19/07/10 09:38:51 INFO YarnClientSchedulerBackend: Stopped
19/07/10 09:38:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/07/10 09:38:51 INFO MemoryStore: MemoryStore cleared
19/07/10 09:38:51 INFO BlockManager: BlockManager stopped
19/07/10 09:38:51 INFO BlockManagerMaster: BlockManagerMaster stopped


我不确定这是否由于嵌套的when-otherwise()而发生。

最佳答案

我使用了lit(),它起作用了:

when(filtered.col(column).equalTo("true"), lit(1))
                                    .otherwise(when(filtered.col(column).equalTo("false"), lit(0)))


代替

when(filtered.col(column).equalTo("true"), 1)
                                    .otherwise(when(filtered.col(column).equalTo("false"), 0))

关于java - Spark Java代码在spark_core v2.2中运行,但在spark_core v2.3中失败,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56971470/

10-08 23:37