问题描述
在我的 Spark 应用程序中,我需要将 DataFrame
转换为 .csv
文件,并将其放入远程 SFTP
服务器.我决定为此任务使用 spark-sftp 库.
In my Spark application I need to convert DataFrame
to .csv
file and put it to remote SFTP
server. I decided to use spark-sftp library for this task.
我的 sbt 文件如下所示:
import sbt.Keys.scalaVersion
name := "TEST"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.2"
val ENVIRONMENT_MODE = "development"
mainClass in Compile := Some("MainApp")
mainClass in (Compile, packageBin) := Some("MainApp")
mainClass in assembly := Some("MainApp")
assemblyJarName in assembly := ENVIRONMENT_MODE + "_test" + ".jar"
// Spark Packages from "bintray.com"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven/"
// "Spark Project SQL"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
// "Spark Project Core"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
// Current library is a PostgreSQL database connection JDBC4 driver.
libraryDependencies += "postgresql" % "postgresql" % "9.1-901-1.jdbc4"
// "scala-xml" is a Scala library for working with XML files.
libraryDependencies += "org.scala-lang.modules" %% "scala-xml" % "1.1.1"
// "Apache Commons VFS" is a virtual file system library.
libraryDependencies += "org.apache.commons" % "commons-vfs2" % "2.2"
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.jcraft" % "jsch" % "0.1.54",
"com.springml" %% "spark-sftp" % "1.1.3"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// The mapping of path names to merge strategies is done via the setting "assemblyMergeStrategy".
assemblyMergeStrategy in assembly := {
case PathList("META-INF", _ @ _*) => MergeStrategy.discard
case _ => MergeStrategy.last
}
我编译sbt文件没有任何错误.当我尝试测试下一个代码时,它会引发错误.
I compile sbt file without any error. When I try to test next code it raise error.
import spark.sqlContext.implicits._
val df: DataFrame = Seq(
("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
println("Count: " + df.count()) // Next command show in console: 5
df.write
.format("com.springml.spark.sftp")
.option("host", "XXXX")
.option("username", "XXXX")
.option("password", "XXXX")
.option("fileType", "csv")
.option("delimiter", ";")
.option("codec", "bzip2")
.save("/reports/daily.csv")
错误:
Exception in thread "main" java.lang.NoSuchMethodError: com.springml.sftp.client.SFTPClient.<init>(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;I)V
at com.springml.spark.sftp.DefaultSource.getSFTPClient(DefaultSource.scala:186)
at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:122)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at report.CALL.runTask(CALL.scala:42)
at JobController.runJob(JobController.scala:38)
at MainApp$.main(MainApp.scala:74)
at MainApp.main(MainApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
问题的原因是什么?您还会建议其他什么解决方案?
What the reason of the problem? What other solutions would you recommend?
jar tvf development_test.jar
命令返回拥抱结果.我在该回复中注意到了以下几行:
jar tvf development_test.jar
command return hug result. I notice such lines in that response:
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/sftp/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/sftp/client/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/sftp/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/sftp/util/
2430 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/CryptoUtils.class
829 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/FileNameFilter.class
1375 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/ProgressMonitor.class
10308 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/SFTPClient.class
3361 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DatasetRelation$.class
11896 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DatasetRelation.class
1241 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anon$1.class
1363 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$1.class
1168 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$10.class
1170 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$11.class
1168 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$12.class
1387 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$13.class
1363 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$14.class
1391 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$15.class
1193 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$16.class
1194 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$17.class
1293 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$18.class
1271 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$19.class
1339 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$2.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$20.class
1192 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$21.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$22.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$23.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$24.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$25.class
1494 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$26.class
1520 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$27.class
1367 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$3.class
1169 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$4.class
1166 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$5.class
1169 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$6.class
1170 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$7.class
1269 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$8.class
1248 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$9.class
19336 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource.class
1758 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DeleteTempFileShutdownHook.class
848 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/constants$.class
871 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/constants.class
1041 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils$.class
1637 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils$ImplicitDataFrameWriter.class
1366 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils.class
推荐答案
我注意到 spark-sftp 库( 1.1.3
)具有多个依赖项.其中一个 sftp客户端( 1.0.3
)库. spark-sftp
库使用重复的 sftp-client
库的某些方法.这是我的代码.
I notice that spark-sftp library (1.1.3
) has several dependencies. On of them sftp-client (1.0.3
) library. spark-sftp
library use some method's of sftp-client
library which was duplicated. Here is my code which works.
def runJob(): Unit ={
try {
val spark: SparkSession = initializeSpark()
import spark.sqlContext.implicits._
// Create DataFrame.
val df: DataFrame = Seq(("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"), ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"), ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"), ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"), ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
df.show()
// Create the object based on class "SFTPClient".
val sftpClient = new SFTPClient(null, "username", "password", "host", 22)
val tmpFolder = System.getProperty("java.io.tmpdir")
val hdfsTemp = tmpFolder
val source = writeToTemp(spark, df, hdfsTemp, tmpFolder, "csv", "true", ";", "rowTag", "rootTag")
println("source: " + source)
// Copy file to FTP server.
sftpClient.copyToFTP(source, "/reports/example.csv")
} catch {
case e: Exception => e.printStackTrace()
}
}
def writeToTemp(sparkSession: SparkSession, df: DataFrame, hdfsTemp: String, tempFolder: String, fileType: String, header: String, delimiter: String, rowTag: String, rootTag: String) : String = {
val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID
val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix
val localTempLocation = tempFolder + File.separator + randomSuffix
println("hdfsTempLocation: " + hdfsTempLocation)
println("localTempLocation: " + localTempLocation)
addShutdownHook(localTempLocation)
df.coalesce(1).write.option("header", header).option("delimiter", delimiter).csv(hdfsTempLocation)
copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation)
println(copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation))
copiedFile(localTempLocation)
}
def addShutdownHook(tempLocation: String) {
println("Adding hook for file " + tempLocation)
val hook = new DeleteTempFileShutdownHook(tempLocation)
Runtime.getRuntime.addShutdownHook(hook)
}
def copyFromHdfs(sparkSession: SparkSession, hdfsTemp : String, fileLocation : String): String = {
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
val hdfsPath = new Path(hdfsTemp)
val fs = hdfsPath.getFileSystem(hadoopConf)
if ("hdfs".equalsIgnoreCase(fs.getScheme)) {
fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))
fs.deleteOnExit(new Path(hdfsTemp))
fileLocation
} else {
hdfsTemp
}
}
def copiedFile(tempFileLocation: String) : String = {
val baseTemp = new File(tempFileLocation)
val files = baseTemp.listFiles().filter { x =>
!x.isDirectory && !x.getName.contains("SUCCESS") && !x.isHidden && !x.getName.contains(".crc")
}
files(0).getAbsolutePath
}
我删除了有关 codec
选项的信息,因为最终的csv文件中的字符集存在问题.
I removed information about codec
option cause there was problems with charset in final csv file.
这篇关于如何将DataFrame转换为csv文件并将其放置到远程SFTP服务器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!