本文介绍了将Spark数据帧写入单个实木复合地板文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试做一些非常简单的事情,并且遇到了一些非常愚蠢的挣扎.我认为这一定与对火花在做什么产生了根本性的误解有关.我将不胜感激任何帮助或解释.

I am trying to do something very simple and I'm having some very stupid struggles. I think it must have to do with a fundamental misunderstanding of what spark is doing. I would greatly appreciate any help or explanation.

我有一个很大的表(〜3 TB,约300MM行,25k分区),另存为s3中的实木复合地板,我想将其的很小一部分作为一个实木复合地板文件提供给某人.不幸的是,这需要永远完成,我不明白为什么.我尝试了以下方法:

I have a very large (~3 TB, ~300MM rows, 25k partitions) table, saved as parquet in s3, and I would like to give someone a tiny sample of it as a single parquet file. Unfortunately, this is taking forever to finish and I don't understand why. I have tried the following:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后当它不起作用时,我尝试了一下,我认为应该是一样的,但是我不确定. (我添加了print以进行调试.)

and then when that didn't work I tried this, which I thought should be the same, but I wasn't sure. (I added the print's in an effort to debug.)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看Yarn UI时, write的打印语句都使用25k映射器. count花费了3分钟,show花费了25分钟,而write花费了约40分钟,尽管最终 did 写入了我要查找的单个文件表.

When I watch the Yarn UI, both print statements and the write are using 25k mappers. The count took 3 mins, the show took 25 mins, and the write took ~40 mins, although it finally did write the single file table I was looking for.

在我看来,第一行应该占据前500行并将它们合并到一个分区,然后其他行应该非常快地发生(在单个映射器/缩减器上).有人可以在这里看到我在做什么吗?有人告诉我也许我应该使用sample而不是limit,但是据我了解,limit应该更快.是吗?

It seems to me like the first line should take the top 500 rows and coalesce them to a single partition, and then the other lines should happen extremely fast (on a single mapper/reducer). Can anyone see what I'm doing wrong here? I've been told maybe I should use sample instead of limit but as I understand it limit should be much faster. Is that right?

提前感谢您的任何想法!

Thanks in advance for any thoughts!

推荐答案

我将首先处理print函数问题,因为这是理解spark的基础.然后limitsample.然后repartitioncoalesce.

I’ll approach the print functions issue first, as it’s something fundamental to understanding spark. Then limit vs sample. Then repartition vs coalesce.

print函数以这种方式花费很长时间的原因是因为coalesce是惰性转换. spark中的大多数转换都是惰性的,并且只有在调用 action 时才能进行评估.

The reasons the print functions take so long in this manner is because coalesce is a lazy transformation. Most transformations in spark are lazy and do not get evaluated until an action gets called.

动作是可以做的事情,并且(大多)不要返回结果.类似于countshow.它们返回一个数字和一些数据,而coalesce返回一个具有1个分区的数据帧(有点,见下文).

Actions are things that do stuff and (mostly) dont return a new dataframe as a result. Like count, show. They return a number, and some data, whereas coalesce returns a dataframe with 1 partition (sort of, see below).

发生的事情是,您每次在tiny数据帧上调用操作时,都会重新运行sql查询和coalesce调用.这就是为什么他们每次通话都使用25k映射器的原因.

What is happening is that you are rerunning the sql query and the coalesce call each time you call an action on the tiny dataframe. That’s why they are using the 25k mappers for each call.

为节省时间,请在第一行中添加.cache()方法(无论如何对于您的print代码).

To save time, add the .cache() method to the first line (for your print code anyway).

然后,数据帧转换实际上是在第一行上执行的,结果一直保存在spark节点的内存中.

Then the data frame transformations are actually executed on your first line and the result persisted in memory on your spark nodes.

这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询两次,因为结果已被缓存,然后操作便可以使用该缓存的结果.

This won’t have any impact on the initial query time for the first line, but at least you’re not running that query 2 more times because the result has been cached, and the actions can then use that cached result.

要从内存中删除它,请使用.unpersist()方法.

To remove it from memory, use the .unpersist() method.

现在为您要执行的实际查询...

这实际上取决于您的数据如何分区.就像是,它是否被划分在特定的字段上等等?

It really depends on how your data is partitioned. As in, is it partitioned on specific fields etc...

您在问题中提到了它,但是sample可能是正确的方法.

You mentioned it in your question, but sample might the right way to go.

这是为什么?

limit必须在第一行中搜索500条.除非按行号(或某种递增ID)对数据进行分区,否则前500行可以存储在25k分区中的任何一个中.

limit has to search for 500 of the first rows. Unless your data is partitioned by row number (or some sort of incrementing id) then the first 500 rows could be stored in any of the the 25k partitions.

因此spark必须在所有参数中进行搜索,直到找到所有正确的值为止.不仅如此,它还必须执行一个额外的步骤来对数据进行排序以使其具有正确的顺序.

So spark has to go search through all of them until it finds all the correct values. Not only that, it has to perform an additional step of sorting the data to have the correct order.

sample只能获取500个随机值.由于涉及的数据没有顺序/排序,而且不必在特定的分区中搜索特定的行,因此操作起来容易得多.

sample just grabs 500 random values. Much easier to do as there’s no order/sorting of the data involved and it doesn’t have to search through specific partitions for specific rows.

limit可以更快,但也有其局限性.我通常只将其用于非常小的子集(如10/20行).

While limit can be faster, it also has its, erm, limits. I usually only use it for very small subsets like 10/20 rows.

现在可以进行分区了....

Now for partitioning....

我认为coalesce的问题是实际上会更改分区.现在我不确定,所以要放些盐.

The problem I think with coalesce is it virtually changes the partitioning. Now I’m not sure about this, so pinch of salt.

根据pyspark文档:

因此,您的500行实际上仍然位于您的25,000个物理分区上,这些分区被spark视为1个虚拟分区.

So your 500 rows will actually still sit across your 25k physical partitions that are considered by spark to be 1 virtual partition.

在这里使用.repartition(1).cache()造成随机播放(通常很糟糕)并保留在火花记忆中可能是一个好主意.因为当您write时,不是让25k映射器查看物理分区,而是应该让1个映射器查看火花存储器中的内容.然后write变得容易.您还需要处理一小部分,因此(希望)任何改组都应该是可管理的.

Causing a shuffle (usually bad) and persisting in spark memory with .repartition(1).cache() is possibly a good idea here. Because instead of having the 25k mappers looking at the physical partitions when you write, it should only result in 1 mapper looking at what is in spark memory. Then write becomes easy. You’re also dealing with a small subset, so any shuffling should (hopefully) be manageable.

显然,这通常是不好的做法,并且不会改变spark在执行原始sql查询时可能要运行25k映射器的事实.希望sample能够解决这个问题.

Obviously this is usually bad practice, and doesn’t change the fact spark will probably want to run 25k mappers when it performs the original sql query. Hopefully sample takes care of that.

修改以明确改组,repartitioncoalesce

edit to clarify shuffling, repartition and coalesce

您在4节点群集上的16个分区中有2个数据集.您想加入它们并作为新数据集写入16个分区中.

You have 2 datasets in 16 partitions on a 4 node cluster. You want to join them and write as a new dataset in 16 partitions.

数据1的行1可能在节点1上,数据2的行1可能在节点4上.

Row 1 for data 1 might be on node 1, and row 1 for data 2 on node 4.

为了将这些行连接在一起,spark必须物理移动一个或两个,然后写入新分区.

In order to join these rows together, spark has to physically move one, or both of them, then write to a new partition.

这是一种随机操作,可以在群集中物理移动数据.

That’s a shuffle, physically moving data around a cluster.

所有数据都被16分区并不重要,重要的是数据在群集中的位置.

It doesn’t matter that everything is partitioned by 16, what matters is where the data is sitting on he cluster.

data.repartition(4)将物理地将数据从每个节点的每4个分区集移动到每个节点1个分区.

data.repartition(4) will physically move data from each 4 sets of partitions per node into 1 partition per node.

Spark可能会将所有4个分区从节点1移到其他3个节点,并移到那些节点上的新单个分区中,反之亦然.

Spark might move all 4 partitions from node 1 over to the 3 other nodes, in a new single partition on those nodes, and vice versa.

我不希望这样做,但这是一个极端的例子,可以证明这一点.

I wouldn’t think it’d do this, but it’s an extreme case that demonstrates the point.

尽管coalesce(4)调用不会移动数据,但它更聪明.相反,它会识别每个节点已经有4个分区&总共4个节点...我将每个节点的所有4个分区称为一个分区,然后我将拥有4个分区!"

A coalesce(4) call though, doesn’t move the data, it’s much more clever. Instead, it recognises "I already have 4 partitions per node & 4 nodes in total... I’m just going to call all 4 of those partitions per node a single partition and then I’ll have 4 total partitions!"

因此它不需要移动任何数据,因为它只是将现有分区合并到一个合并的分区中.

So it doesn’t need to move any data because it just combines existing partitions into a joined partition.

这篇关于将Spark数据帧写入单个实木复合地板文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 15:16