本文介绍了重新分区Dask DataFrame以获取均匀分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含索引这不是唯一的( client_id )。重新分区和重置索引最终会导致分区非常不均匀-有些分区只包含几行,几千行。例如以下代码:

I have a Dask DataFrames that contains index which is not unique (client_id). Repartitioning and resetting index ends up with very uneven partitions - some contains only a few rows, some thousands. For instance the following code:

for p in range(ddd.npartitions):
    print(len(ddd.get_partition(p)))

打印出类似这样的内容:

prints out something like that:

55
17
5
41
51
1144
4391
75153
138970
197105
409466
415925
486076
306377
543998
395974
530056
374293
237
12
104
52
28


我的DataFrame是一键编码的,具有500多个列。较大的分区无法容纳在内存中。我想对DataFrame进行重新分区以使其具有甚至大小的分区。您知道执行此操作的有效方法吗?

My DataFrame is one-hot encoded and has over 500 columns. Larger partitions don't fit in memory. I wanted to repartition the DataFrame to have partitions even in size. Do you know an efficient way to do this?

编辑1

简单复制:

df = pd.DataFrame({'x':np.arange(0,10000),'y':np.arange(0,10000)})
df2 = pd.DataFrame({'x':np.append(np.arange(0,4995),np.arange(5000,10000,1000)),'y2':np.arange(0,10000,2)})
dd_df = dd.from_pandas(df, npartitions=10).set_index('x')
dd_df2= dd.from_pandas(df2, npartitions=5).set_index('x')
new_ddf=dd_df.merge(dd_df2, how='right')
#new_ddf = new_ddf.reset_index().set_index('x')
#new_ddf = new_ddf.repartition(npartitions=2)
new_ddf.divisions
for p in range(new_ddf.npartitions):
    print(len(new_ddf.get_partition(p)))

请注意最后一个分区(单个元素):

Note the last partitions (one single element):

1000
1000
1000
1000
995
1
1
1
1
1


即使我们取消注释co修补线,分区的大小仍然不均匀。

Even when we uncomment the commented lines, partitions remain uneven in the size.

编辑II:Walkoround

可以通过以下代码实现简单的wlakoround。
是否有更精致的方法(更多是Dask方式)?

Simple wlakoround can be achieved by the following code.Is there a more elgant way to do this (more in a Dask way)?

def repartition(ddf, npartitions=None):
    MAX_PART_SIZE = 100*1024

    if npartitions is None:
        npartitions = ddf.npartitions

    one_row_size = sum([dt.itemsize for dt in ddf.dtypes])
    length = len(ddf)

    requested_part_size = length/npartitions*one_row_size
    if requested_part_size <= MAX_PART_SIZE:
        np = npartitions
    else:
        np = length*one_row_size/MAX_PART_SIZE

    chunksize = int(length/np)


    vc = ddf.index.value_counts().to_frame(name='count').compute().sort_index()

    vsum = 0
    divisions = [ddf.divisions[0]]
    for i,v in vc.iterrows():
        vsum+=v['count']
        if vsum > chunksize:
            divisions.append(i)
            vsum = 0
    divisions.append(ddf.divisions[-1])


    return ddf.repartition(divisions=divisions, force=True)


推荐答案

您没错, .repartition 不会解决问题,因为它不处理任何用于计算除法的逻辑,而只是尝试尽可能地合并现有分区。这是我针对相同问题提出的解决方案:

You're correct that .repartition won't do the trick since it doesn't handle any of the logic for computing divisions and just tries to combine the existing partitions wherever possible. Here's a solution I came up with for the same problem:

def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)

内部函数 sorted_division_locations 可以满足您的要求,但仅适用于类似于列表的实际对象,而不适用于懒惰的 dask.dataframe.Index 。这样可以避免在有很多重复项的情况下提取完整索引,而只是获取计数并从中进行本地重建。

The internal function sorted_division_locations does what you want already, but it only works on an actual list-like, not a lazy dask.dataframe.Index. This avoids pulling the full index in case there are many duplicates and instead just gets the counts and reconstructs locally from that.

如果您的数据帧太大,甚至连索引都赢了内存不足,那么您就需要做一些更聪明的事情。

If your dataframe is so large that even the index won't fit in memory then you'd need to do something even more clever.

这篇关于重新分区Dask DataFrame以获取均匀分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 18:07