为了缩小问题的范围,我删除了其他类依赖关系,并使用了以下干净的代码:
object LoaderProcessor extends App {
val logger = LoggerFactory.getLogger(this.getClass())
execute()
def execute(): Unit = {
val spark = get_spark()
import spark.implicits._
var df = spark.read
.format("csv")
.option("delimiter", ",")
.option("header", true)
.option("inferSchema", "true")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.load(args(2))
df = df.withColumn("zs_source", lit(1)) //the only operation on dataframe
val o_file = Config().getString("myapp.dataFolder") + "/8/1/data.csv"
logger.info("Writing output to: {}", o_file)
df.write.mode("overwrite")
.option("header", "true").csv(o_file)
}
def get_spark(): SparkSession = {
val env = System.getenv("MYAPP_ENV")
var spark:SparkSession = null
if (env == null || env == "dev_local") {
spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("MyApp")
.getOrCreate;
}else{
spark = org.apache.spark.sql.SparkSession.builder
.appName("MyApp")
//.enableHiveSupport()
.getOrCreate;
}
spark.sparkContext.setCheckpointDir(Config().getString("myapp.rddcp"))
return spark
}
}
它在客户端模式中可以很好地工作。无法解决问题。我的集群位于HDInsight上。
同样,也注意到“写”操作一直在输出文件夹上进行写操作,如下所示:
part-00000-3e9566ae-c13c-468a-8732-e7b8a8df5335-c000.csv
然后几秒钟:
部分00000-4f4979a0-d9f9-481b-aac4-115e63b9f59c-c000.csv
8/12/01 15:08:53 INFO ApplicationMaster:在单独的线程中启动用户应用程序
18/12/01 15:08:53 INFO ApplicationMaster:WAITINGSpark上下文初始化...
18/12/01 15:08:55 INFO Config $:环境:dev
18/12/01 15:08:55错误ApplicationMaster:未捕获的异常:
java.lang.IllegalStateException:用户未初始化spark上下文!
在org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:510)
在org.apache.spark.deploy.yarn.ApplicationMaster.org $ apache $ spark $ deploy $ yarn $ ApplicationMaster $$ runImpl(ApplicationMaster.scala:345)
在org.apache.spark.deploy.yarn.ApplicationMaster $$ anonfun $ run $ 2.apply $ mcV $ sp(ApplicationMaster.scala:260)
在org.apache.spark.deploy.yarn.ApplicationMaster $$ anonfun $ run $ 2.apply(ApplicationMaster.scala:260)
在org.apache.spark.deploy.yarn.ApplicationMaster $$ anonfun $ run $ 2.apply(ApplicationMaster.scala:260)
在org.apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 5.run(ApplicationMaster.scala:815)
在java.security.AccessController.doPrivileged(本机方法)
在javax.security.auth.Subject.doAs(Subject.java:422)
在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
在org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814)
在org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
在org.apache.spark.deploy.yarn.ApplicationMaster $ .main(ApplicationMaster.scala:839)
在org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
spark-submit --master yarn --deploy-mode cluster --jars "wasb://xx@yy/zs/jars/config-1.3.1.jar" --class myapp.LoaderProcessor "wasb://xx@yy/zs/jars/myapp.jar" l 8 /data/8_data.csv 1 , true false
-> 问题spark-submit --deploy-mode client --jars "wasb://xx@yy/zs/jars/config-1.3.1.jar" --class myapp.LoaderProcessor "wasb://xx@yy/zs/jars/myapp.jar" l 8 /data/8_data.csv 1 , true false
-> 工作!!! 最佳答案
编辑:根据我们的评论更新
问题是您总是使用if (env == null || env == "dev_local")
创建本地上下文(在分布式环境中MYAPP_ENV
为null)