问题描述
经过一些处理后,我有一个 DStream[String , ArrayList[String]] ,所以当我使用 saveAsTextFile 将它写入 hdfs 并且在每批之后它会覆盖数据时,如何通过附加到以前的结果来写入新结果
After some processing I have a DStream[String , ArrayList[String]] , so when I am writing it to hdfs using saveAsTextFile and after every batch it overwrites the data , so how to write new result by appending to previous results
output.foreachRDD(r => {
r.saveAsTextFile(path)
})
编辑 :: 如果有人可以帮助我将输出转换为 avro 格式,然后通过附加写入 HDFS
Edit :: If anyone could help me in converting the output to avro format and then writing to HDFS with appending
推荐答案
saveAsTextFile
不支持追加.如果使用固定文件名调用,则每次都会覆盖它.我们可以每次都执行 saveAsTextFile(path+timestamp)
来保存到一个新文件.这就是 DStream.saveAsTextFiles(path)
saveAsTextFile
does not support append. If called with a fixed filename, it will overwrite it every time.We could do saveAsTextFile(path+timestamp)
to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)
一种支持 append
的易于访问的格式是 Parquet.我们首先将数据 RDD 转换为 DataFrame
或 Dataset
,然后我们可以从在该抽象之上提供的写入支持中受益.
An easily accessible format that supports append
is Parquet. We first transform our data RDD to a DataFrame
or Dataset
and then we can benefit from the write support offered on top of that abstraction.
case class DataStructure(field1,..., fieldn)
... streaming setup, dstream declaration, ...
val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd =>
import sparkSession.implicits._
val df = rdd.toDF()
df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")
})
请注意,附加到 Parquet 文件会随着时间的推移变得更加昂贵,因此不时旋转目标文件仍然是一项要求.
Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.
这篇关于如何在不覆盖的情况下将 Spark Streaming 输出写入 HDFS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!