问题描述
我正在尝试测试如何使用Spark 2.1在HDFS 2.7中写入数据.我的数据是一个简单的虚拟值序列,并且输出应按以下属性划分: id 和 key .
I am trying to test how to write data in HDFS 2.7 using Spark 2.1. My data is a simple sequence of dummy values and the output should be partitioned by the attributes: id and key.
// Simple case class to cast the data
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)
// Actual data to be stored
val testData = Seq(
SimpleTest("test", 12, 13.5.toFloat, 1),
SimpleTest("test", 12, 13.5.toFloat, 2),
SimpleTest("test", 12, 13.5.toFloat, 3),
SimpleTest("simple", 12, 13.5.toFloat, 1),
SimpleTest("simple", 12, 13.5.toFloat, 2),
SimpleTest("simple", 12, 13.5.toFloat, 3)
)
// Spark's workflow to distribute, partition and store
// sc and sql are the SparkContext and SparkSession, respectively
val testDataP = sc.parallelize(testData, 6)
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
我希望在HDFS中获得以下树结构:
I am expecting to get the following tree structure in HDFS:
- /path/to/file
|- /id=test/key=1/part-01.parquet
|- /id=test/key=2/part-02.parquet
|- /id=test/key=3/part-03.parquet
|- /id=simple/key=1/part-04.parquet
|- /id=simple/key=2/part-05.parquet
|- /id=simple/key=3/part-06.parquet
但是当我运行前面的代码时,我得到以下输出:
But when I run the previous code I get the following output:
/path/to/file/id=/key=24/
|-/part-01.parquet
|-/part-02.parquet
|-/part-03.parquet
|-/part-04.parquet
|-/part-05.parquet
|-/part-06.parquet
我不知道代码中是否有错误,或者Spark正在执行其他操作.
I do not know if there is something wrong in the code, or is there something else that Spark is doing.
我正在执行spark-submit
,如下所示:
I'm executing spark-submit
as follows:
推荐答案
我找到了解决方案!根据Cloudera的说法,这是一个 mapred-site.xml 配置问题(请查看下面的链接).另外,不要将数据帧写为:testDf.write.partitionBy("id", "key").parquet("/path/to/file")
I found a solution! According to Cloudera, is a mapred-site.xml configuration problem (check link below). Also, instead of writing the dataframe as: testDf.write.partitionBy("id", "key").parquet("/path/to/file")
我这样做如下:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")
.您可以分别用HDFS的主节点名称和端口替换<namenode>
和<port>
.
I did it as follows: testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")
. You can substitute <namenode>
and <port>
with the HDFS' masternode name and port, respectively.
特别感谢@ jacek-laskowski的宝贵贡献.
Special thanks to @jacek-laskowski, for his valuable contribution.
参考:
https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/mp/36363#M1090
这篇关于如何在Spark 2.1中保存分区的实木复合地板文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!