在“一次写入,多次读取”工作流程中,我经常使用FastExport实用程序解析从Teradata转储的大型文本文件(20GB-60GB),并使用Pandas将其加载到Pytables中。我正在使用多处理程序对文本文件进行分块,然后将它们分发到不同的进程中,以根据每行5MM左右的行数拆分一个.H5文件,以支持并行写入。并行写入多个hdf5文件大约需要12分钟,而对于25MM行x 64列写入单个hdf5文件则需要2 22分钟。

%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop

对于编写按行分割的多个h5文件的情况,我最终将拥有多个结构相同的文件,希望将它们合并到一个h5file根目录/数据目录/表中

为了测试组合功能,下面是代码片段:

import tables as tb
import pandas as pd

tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')

filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    store.append(key='data',value=df,format='t',chunksize=200000)

store.close()

这是%timeit结果:
1 loops, best of 3: 8min 22s per loop

从根本上讲,这大部分时间都是我通过并行编写多个h5文件获得的。我有一个两部分的问题:
  • 有没有一种方法可以更有效地合并(追加)具有相同表格式的h5文件?(类似SQL Union的功能)。
  • 如果不是
  • ,则在从所有列的位置选择大多数查询时,将行拆分是合理的事情吗?我正在考虑编写一个map/combine函数,该函数将在表的所有部分中查找以便从哪里查询。 Pandas this函数执行此操作以基于列进行拆分。


  • 根据Jeff的建议进行更新:

    关于删除合并前文件写入过程中的索引编制和压缩的大量建议。删除索引,压缩并将每个合并前文件的最大行数设置为1MM行后:
    %timeit -n 1 write_multiple_hdf_multiprocess()
    1 loops, best of 3: 9min 37s per loop
    

    这比以前快了2分钟多一点,几乎和我解析数据的速度一样快。将数据列设置为所需的字段后(本例中为3):
    for f in filenames:
        s=pd.HDFStore(f)
        df=s.select('data')
        dc=df.columns[1:4]
        store.append(key='data',value=df,format='t',data_columns=dc)
    

    这比以前慢了大约2分钟:1 loops, best of 3: 10min 23s per loop。从上面的代码中删除压缩后,我得到1 loops, best of 3: 8min 48s per loop(几乎与第一次尝试压缩和没有数据列索引相同)。为了让您大致了解压缩的工作原理,未压缩的存储约为13.5GB,而使用blosc的压缩版本约为3.7GB。

    总而言之,我的过程使用18 minutes 15 seconds创建了一个合并的未压缩hdf5文件。与单个文件写入(压缩)相比,此速度约为4 minutes 7 seconds更快。

    这将我带到问题的第二部分,如果我不合并文件并使用预合并文件以map/combine方式处理,那是解决这个问题的合理方法吗?我应该如何考虑实现这一点?

    为了进行全面披露,我使用的是Pandas版本的0.12.0,Pytables版本的3.0.0,我的数据处理工作流程如下(伪代码):
    def generate_chunks_from_text_file(reader,chunksize=50000):
        """ generator that yields processed text chunks """
    
        for i, line in enumerate(reader.readlines()):
            ----process data and yield chunk -----
    
    
    def data_reader(reader,queue):
        """ read data from file and put it into a queue for multiprocessing """
    
        for chunk in self.generate_chunks_from_text_file(reader):
            queue.put(chunk) # put data in the queue for the writer
    
    def data_processor(queue,filename,dtype,min_size):
        """" subprocess that reads the next value in the queue and writes hdf store. """
    
        store=pd.HDFStore(filename)
    
        while True:
    
            results = queue.get()
            array=np.array(results,dtype=dt) # convert to numpy array
            df = pd.DataFrame(array) #covert to pandas array
    
            store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
        store.close()
            ----when queue exhausts - break-----
    

    最佳答案

    我做了一个非常类似的split-process-combine方法,使用多个进程创建中间文件,然后使用单个进程合并生成的文件。以下是一些获得更好性能的提示:

  • 通过传递index=False来关闭在编写文件时的索引编制,有关文档,请参见here。我相信PyTables会逐步更新索引,在这种情况下,索引是完全不必要的(因为您以后将合并它们)。仅索引最终文件。这样可以大大加快写作速度。
  • 您可以考虑根据查询的内容来更改默认的索引方案/级别(假设您遵循以下建议,不要创建太多数据列)。
  • 同样,在写入预合并的文件时不要创建压缩文件,而是在写入索引文件后(处于未压缩状态)创建它,因此这是您的最后一步。参见文档here。此外,使用--chunkshape=auto传递ptrepack非常重要,因为blosc将重新计算PyTables的块大小(例如,在单个块中读取/写入多少数据),因为它将考虑整个表。
  • RE压缩,此处的YMMV可能有所不同,具体取决于数据实际压缩的程度以及您正在执行的查询类型。我发现某些类型的数据可以更快地完全不进行压缩,尽管从理论上讲应该更好。您只需要进行实验(尽管我总是使用creating)。 Blosc仅具有一个压缩级别(对于级别1-9开启或对于级别0关闭)。因此,更改此选项不会更改任何内容。
  • 我基本上按索引顺序合并文件,方法是将预合并文件的子集读入内存(一个常数,只使用一定数量的内存),然后将它们一个接一个地追加到最终文件中。 (不是100%肯定会有所作为,但看起来效果很好)。
  • 您会发现,您的大部分时间都用data_columns=a_small_subset_of_columns索引了。
  • 此外,仅索引您实际需要的列!确保在写入每个文件时指定ojit_code。
  • 我发现写很多小文件更好,然后合并创建一个大文件,而不是写一些大文件,但是这里是YMMV。 (例如,假设100个100MB的预合并文件产生10GB的文件,而不是5个2GB的文件)。尽管这可能是我的处理管道的功能,因为我倾向于瓶颈在处理上,而不是实际的写作。
  • 我没有用过,但是听到了关于使用SSD(固态驱动器)的奇妙事情,即使这种事情相对来说很小。您可以使用一个获得一个数量级的加速(压缩可能会更改此结果)。
  • 关于python - Pytables/Pandas : Combining (reading?)多个HDF5存储按行划分,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22750228/

    10-15 00:22