


I'm reading a larger number (100s to 1000s) of parquet files into a single dask dataframe (single machine, all local). I realized that

files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()


from dask import delayed
from fastparquet import ParquetFile

def load_chunk(pth):
    return ParquetFile(pth).to_pandas()

ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

对于我的特定应用程序,第二种方法( from_delayed )需要6秒钟才能完成,第一种方法需要39秒.在 dd.read_parquet 案例中,在工人甚至开始做某事之前似乎有很多开销,并且有很多 transfer -... 操作分散跨任务流图.我想了解这里发生了什么. read_parquet 方法慢得多的原因可能是什么?它与仅读取文件并将它们分成小块有什么不同?

For my particular application, the second approach (from_delayed) takes 6 seconds to complete, the first approach takes 39 seconds. In the dd.read_parquet case there seems to be a lot of overhead before the workers even start to do something, and there are quite a few transfer-... operations scattered across the task stream plot. I'd like to understand what's going on here. What could be the reason that the read_parquet approach is so much slower? What does it do differently than just reading the files and putting them in chunks?



You are experiencing the client trying to establish the min/max statistics of the columns of the data, and thereby establish a good index for the dataframe. An index can be very useful in preventing reading from data files which are not needed for your particular job.


In many cases, this is a good idea, where the amount of data in a file is large and the total number of files is small. In other cases, the same information might be contained in a special "_metadata" file, so that there would be no need to read from all the files first.


To prevent the scan of the files' footers, you should call

dd.read_parquet(..,. gather_statistics=False)


This should be the default in the next version of dask.


07-25 09:14