本文介绍了如何根据特定列的值将多个大文件分成多个小块,从而对一个大文件进行多进程多线程处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经为生物过程编写了python程序.

I have written a python program for a biological process https://codereview.stackexchange.com/questions/186396/solve-the-phase-state-between-two-haplotype-blocks-using-markov-transition-proba .

如果您查看该程序,则可以看到该程序一次从两个连续的行(或键,值)计算数据需要花费大量时间.我没有将整个代码放在这里,但是为了简单起见,我创建了一个模拟文件和模拟程序(如下所示),它们的行为在最简单的级别上相似.在此模拟程序中,我正在计算len(vals)列并将其写回到输出文件.

If you look into that program you can see that the program takes a lots of time in computing data from two consecutive lines (or keys, vals) at a time. I am not putting the whole code here, but for simplicity I am creating a mock file and mock program (given below) which behaves similarly at simplest level. In this mock program I am calculating, say len(vals) column and writing it back to an output file.

由于在原始程序中执行for (k1, v1) and (k2, v2) ....时计算是受CPU/GPU约束的(上方链接),因此我想对数据分析进行多处理/多线程处理 -1) strong>以最快的方式读取内存中的整个数据 2)通过唯一的chr字段 3)将数据划分为多个数据块 4)将其写回到文件中. 那么,我该怎么做?

Since the computation is CPU/GPU bound while doing for (k1, v1) and (k2, v2) .... in the original program (above link), I want to multiprocess/thread the data analyses by - 1) reading whole data in memory in a fastest possible way 2) divide the data into chunks by unique chr field 3) do the computation 4) write it back to a file. So, How would I do it?

在给定的模拟文件中,计算太简单了,无法绑定到GPU/CPU,但是我只想知道如果需要的话,该怎么做.

In the given mock file, computation is too simple to be GPU/CPU bound, but I just want to know how I can do it if need be.

注意::我有太多人问我要达到什么目的-我正在尝试对给定的问题进行多进程/线程处理.如果我把我原来的整个大型程序放在这里,没有人会看它.因此,让我们锻炼一下这个小文件和一个小的python程序.

Note: I had too many people asking what am I trying to achieve - I am trying to multiprocess/thread the given problem. If I put my original whole big program here, nobody is going to look at it. So, lets workout this small file and the small python program.

以下是我的代码和数据:

my_data = '''chr\tpos\tidx\tvals
2\t23\t4\tabcd
2\t25\t7\tatg
2\t29\t8\tct
2\t35\t1\txylfz
3\t37\t2\tmnost
3\t39\t3\tpqr
3\t41\t6\trtuv
3\t45\t5\tlfghef
3\t39\t3\tpqr
3\t41\t6\trtu
3\t45\t5\tlfggg
4\t25\t3\tpqrp
4\t32\t6\trtu
4\t38\t5\tlfgh
4\t51\t3\tpqr
4\t57\t6\trtus
'''


def manipulate_lines(vals):
    vals_len = len(vals[3])
    return write_to_file(vals[0:3], vals_len)

def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('\t'.join(['\t'.join(a), str(b), '\n']))
    to_file.close()

def main():
    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n']))
    to_file.close()

    data = my_data.rstrip('\n').split('\n')


    for lines in data:
        if lines.startswith('chr'):
            continue
        else:
            lines = lines.split('\t')
        manipulate_lines(lines)


if __name__ == '__main__':
    main()

推荐答案

使用多个进程处理数据时要解决的问题是保持顺序. Python使用multiprocessing.Pool提出了一种相当不错的处理方式,可以使用multiprocessing.Pool来对输入数据进行map处理.然后,这将有助于按顺序返回结果.

An issue to handle when using multiple processes to handle data, is to preserve order. Python has come up with a rather nice way of handling this, using a multiprocessing.Pool, which can be used to map the processes over the input data. This will then take care of returning the results in order.

但是,处理可能仍会混乱,因此要正确使用它,只能在子进程中运行处理,而不能进行IO访问.因此,要在您的情况下使用此代码,需要对代码进行小的重写,使所有IO操作都在主过程中发生:

However, the processing may still be out of order, so to use it properly, only processing, and no IO access should be run in the subprocesses. Therefore, to use this in your case, a small rewrite of your code needs to be performed, that have all IO operations happening in the main process:

from multiprocessing import Pool
from time import sleep
from random import randint

my_data = '''chr\tpos\tidx\tvals
2\t23\t4\tabcd
2\t25\t7\tatg
2\t29\t8\tct
2\t35\t1\txylfz
3\t37\t2\tmnost
3\t39\t3\tpqr
3\t41\t6\trtuv
3\t45\t5\tlfghef
3\t39\t3\tpqr
3\t41\t6\trtu
3\t45\t5\tlfggg
4\t25\t3\tpqrp
4\t32\t6\trtu
4\t38\t5\tlfgh
4\t51\t3\tpqr
4\t57\t6\trtus
'''

def manipulate_lines(vals):
    sleep(randint(0, 2))
    vals_len = len(vals[3])
    return vals[0:3], vals_len

def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('\t'.join(['\t'.join(a), str(b), '\n']))
    to_file.close()

def line_generator(data):
    for line in data:
        if line.startswith('chr'):
            continue
        else:
           yield line.split('\t')

def main():
    p = Pool(5)

    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n']))
    to_file.close()

    data = my_data.rstrip('\n').split('\n')

    lines = line_generator(data)
    results = p.map(manipulate_lines, lines)

    for result in results:
        write_to_file(*result)

if __name__ == '__main__':
    main()

此程序不会按照其不同的chr值分割列表,而是直接在最多5个子参数(针对Pool的子过程)中直接从列表中逐项处理条目.

This program does not split the list after its different chr values, but instead it processes entry by entry, directly from the list in maximally 5 (argument to Pool) sub-processes.

为了显示数据仍处于预期的顺序,我在manipulate_lines函数中添加了随机睡眠延迟.这说明了这一概念,但可能无法给出正确的加速效果,因为休眠的进程允许另一个进程并行运行,而繁重的进程将在其所有运行时间上使用CPU.

To show that the data is still in the expected order, I added a random sleep delay to the manipulate_lines function. This shows the concept but may not give a correct view of the speedup, since a sleeping process allows another one to run in parallel, whereas a compute-heavy process will use the CPU for all of its run time.

可以看出,一旦map调用返回,就必须完成对文件的写入,这确保了所有子进程都已终止并返回了结果.这种通信在幕后有相当大的开销,因此要使它受益,计算部分必须比写阶段要长得多,并且它必须不会产生太多的数据来写入文件

As can be seen, the writing to file has then to be done, once the map call returns, which assures that all subprocesses has been terminated and returned their results. There is quite some overhead for this communication behind the scene, so for this to be beneficial, the compute part must be substantially longer than the write phase, and it must not generate too much data to write to file.

此外,我还打破了生成器中的for循环.这样可以根据需要提供对multiprocessing.Pool的输入.另一种方法是对data列表进行预处理,然后将该列表直接传递给Pool.不过,我发现生成器解决方案更好,并且具有较小的峰值内存消耗.

In addition, I have also broken out the for loop in a generator. This is so that input to the multiprocessing.Pool is available upon request. Another way would be to pre-process the data list and then pass that list directly to the Pool. I find the generator solution to be nicer, though, and have smaller peak memory consumption.

另外,关于多线程与多处理的评论;只要执行大量计算操作,就应该使用多重处理,至少从理论上讲,它可以使进程在不同的计算机上运行.此外,在cPython中(最常用的Python实现),线程遇到了另一个问题,即全局解释器锁(GIL).这意味着一次只能执行一个线程,因为解释器会阻止对所有其他线程的访问. (有一些例外,例如,使用numpy之类的用C编写的模块时.在这种情况下,可以在执行numpy计算时释放GIL,但通常情况并非如此.)因此,线程主要用于程序所在的情况被困在等待缓慢,乱序的IO. (插座,端子输入等)

Also, a comment on multithreading vs multiprocessing; as long as you do compute-heavy operations, you should use multiprocessing, which, at least in theory, allows the processes to run on different machines. In addition, in cPython - the most used Python implementation - threads hit upon another issue, which is the global interpreter lock (GIL).This means that only one thread can execute at a time, since the interpreter blocks access for all other threads. (There are some exceptions, e.g. when using modules written in C, like numpy. In these cases the GIL can be released while doing numpy calculations, but in general this is not the case.) Thus, threads are mainly for situations where your program is stuck waiting for slow, out-of-order, IO. (Sockets, terminal input, et c.)

这篇关于如何根据特定列的值将多个大文件分成多个小块,从而对一个大文件进行多进程多线程处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 02:47