本文介绍了Spark Streaming作为Parquet格式追加到S3,太多小分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个使用Spark Streaming从AWS EMR上的Kinesis流接收数据的应用程序.目标之一是将数据持久保存到S3(EMRFS)中,为此,我使用了2分钟的不重叠窗口.

I am building an app that uses Spark Streaming to receive data from Kinesis streams on AWS EMR. One of the goals is to persist the data into S3 (EMRFS), and for this I am using a 2 minutes non-overlapping window.

我的方法:

Kinesis Stream-> Spark流,使用120s的不重叠窗口,批量持续时间约60秒,将流数据保存为S3:

Kinesis Stream -> Spark Streaming with batch duration about 60 seconds, using a non-overlapping window of 120s, save the streamed data into S3 as:

val rdd1 = kinesisStream.map( rdd => /* decode the data */)
rdd1.window(Seconds(120), Seconds(120).foreachRDD { rdd =>
        val spark = SparkSession...
        import spark.implicits._
        // convert rdd to df
        val df = rdd.toDF(columnNames: _*)
        df.write.parquet("s3://bucket/20161211.parquet")
}

下面是s3://bucket/20161211.parquet的样子:

Here is what s3://bucket/20161211.parquet looks like after a while:

如您所见,很多零散的小分区(对于读取性能而言是可怕的)...问题是,当我将数据流传输到此S3拼花文件中时,有什么方法可以控制小分区的数量吗?

As you can see, lots of fragmented small partitions (which is horrendous for read performance)...the question is, is there any way to control the number of small partitions as I stream data into this S3 parquet file?

谢谢

我想做的是每天做这样的事情:

What I am thinking to do, is to each day do something like this:

val df = spark.read.parquet("s3://bucket/20161211.parquet")
df.coalesce(4).write.parquet("s3://bucket/20161211_4parition.parquet")

我将数据帧重新划分为4个分区,然后将它们保存回....

where I kind of repartition the dataframe to 4 partitions and save them back....

这行得通,我觉得每天这样做都不是一种优雅的解决方案...

It works, I feel that doing this every day is not elegant solution...

推荐答案

这实际上与您要执行的操作非常接近,每个分区都将作为独立文件写入Spark中.但是,coalesce有点令人困惑,因为它可以(有效地)应用到调用合并的位置的上游. Scala文档中的警告是:

That's actually pretty close to what you want to do, each partition will get written out as an individual file in Spark. However coalesce is a bit confusing since it can (effectively) apply upstream of where the coalesce is called. The warning from the Scala doc is:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).

在Dataset中,persistcount进行广泛评估要容易一些,因为默认的coalesce函数不会将repartition用作输入标志(尽管您可以构造手动).

In Dataset's its a bit easier to persist and count to do wide evaluation since the default coalesce function doesn't take repartition as a flag for input (although you could construct an instance of Repartition manually).

另一种选择是拥有第二个定期批处理作业(甚至第二个流作业)来清理/合并结果,但这可能会有些复杂,因为它引入了第二个移动部件来进行跟踪.

Another option is to have a second periodic batch job (or even a second streaming job) that cleans up/merges the results, but this can be a bit complicated as it introduces a second moving part to keep track of.

这篇关于Spark Streaming作为Parquet格式追加到S3,太多小分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:49