在“一次写入,多次读取”工作流程中,我经常使用FastExport实用程序解析从Teradata转储的大型文本文件(20GB-60GB),并使用Pandas将其加载到Pytables中。我正在使用多处理程序对文本文件进行分块,然后将它们分发到不同的进程中,以根据每行5MM左右的行数拆分一个.H5文件,以支持并行写入。并行写入多个hdf5文件大约需要12分钟,而对于25MM行x 64列写入单个hdf5文件则需要2 22分钟。
%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop
%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop
对于编写按行分割的多个h5文件的情况,我最终将拥有多个结构相同的文件,希望将它们合并到一个h5file根目录/数据目录/表中
为了测试组合功能,下面是代码片段:
import tables as tb
import pandas as pd
tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')
filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']
for f in filenames:
s=pd.HDFStore(f)
df=s.select('data')
store.append(key='data',value=df,format='t',chunksize=200000)
store.close()
这是%timeit结果:
1 loops, best of 3: 8min 22s per loop
从根本上讲,这大部分时间都是我通过并行编写多个h5文件获得的。我有一个两部分的问题:
根据Jeff的建议进行更新:
关于删除合并前文件写入过程中的索引编制和压缩的大量建议。删除索引,压缩并将每个合并前文件的最大行数设置为1MM行后:
%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 9min 37s per loop
这比以前快了2分钟多一点,几乎和我解析数据的速度一样快。将数据列设置为所需的字段后(本例中为3):
for f in filenames:
s=pd.HDFStore(f)
df=s.select('data')
dc=df.columns[1:4]
store.append(key='data',value=df,format='t',data_columns=dc)
这比以前慢了大约2分钟:
1 loops, best of 3: 10min 23s per loop
。从上面的代码中删除压缩后,我得到1 loops, best of 3: 8min 48s per loop
(几乎与第一次尝试压缩和没有数据列索引相同)。为了让您大致了解压缩的工作原理,未压缩的存储约为13.5GB,而使用blosc
的压缩版本约为3.7GB。总而言之,我的过程使用
18 minutes 15 seconds
创建了一个合并的未压缩hdf5文件。与单个文件写入(压缩)相比,此速度约为4 minutes 7 seconds
更快。这将我带到问题的第二部分,如果我不合并文件并使用预合并文件以map/combine方式处理,那是解决这个问题的合理方法吗?我应该如何考虑实现这一点?
为了进行全面披露,我使用的是Pandas版本的
0.12.0
,Pytables版本的3.0.0
,我的数据处理工作流程如下(伪代码):def generate_chunks_from_text_file(reader,chunksize=50000):
""" generator that yields processed text chunks """
for i, line in enumerate(reader.readlines()):
----process data and yield chunk -----
def data_reader(reader,queue):
""" read data from file and put it into a queue for multiprocessing """
for chunk in self.generate_chunks_from_text_file(reader):
queue.put(chunk) # put data in the queue for the writer
def data_processor(queue,filename,dtype,min_size):
"""" subprocess that reads the next value in the queue and writes hdf store. """
store=pd.HDFStore(filename)
while True:
results = queue.get()
array=np.array(results,dtype=dt) # convert to numpy array
df = pd.DataFrame(array) #covert to pandas array
store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
store.close()
----when queue exhausts - break-----
最佳答案
我做了一个非常类似的split-process-combine方法,使用多个进程创建中间文件,然后使用单个进程合并生成的文件。以下是一些获得更好性能的提示:
index=False
来关闭在编写文件时的索引编制,有关文档,请参见here。我相信PyTables
会逐步更新索引,在这种情况下,索引是完全不必要的(因为您以后将合并它们)。仅索引最终文件。这样可以大大加快写作速度。 --chunkshape=auto
传递ptrepack
非常重要,因为blosc
将重新计算PyTables的块大小(例如,在单个块中读取/写入多少数据),因为它将考虑整个表。 creating
)。 Blosc仅具有一个压缩级别(对于级别1-9开启或对于级别0关闭)。因此,更改此选项不会更改任何内容。 data_columns=a_small_subset_of_columns
索引了。 关于python - Pytables/Pandas : Combining (reading?)多个HDF5存储按行划分,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22750228/