本文介绍了python中的多进程仅使用一个进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用python学习多处理.我编写了一个简单的代码,该文件应该从txt输入文件向每个进程提供1000行.我的主要功能是读取一行,将其拆分,然后对字符串中的元素执行一些非常简单的操作.最终,结果应该写在输出文件中.

I am trying to learn multiprocessing with python.I wrote a simple code that should feed each process with 1000 lines from a txt input file. My main function reads a line, splits it and then performs some very simple operations with the elements in the string. Eventually the results should be written in an output file.

当我运行它时,会正确地产生4个进程,但是实际上只有一个进程在最少的CPU上运行.结果,该代码非常慢,并且首先违反了使用多处理的目的.我认为我没有这个问题中的全局列表问题( python multiprocessing apply_async仅使用一个进程),我认为我的函数在这种情况下不算太琐碎().

When I run it, 4 processes are correctly spawned, but only one process is actually running with minimal CPU. As a result the code is very slow and defies the purpose to use multiprocessing in the first place.I think I don't have a global list problem like in this question (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU).

我无法理解自己在做什么,对您的帮助/建议表示感谢.这是基本代码:

I can't understand what I'm doing wrong, any help/suggestion is appreciated. Here's the basic code:

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist

def grouper(n, iterable, padvalue=None):
    return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")

    p = multiprocessing.Pool(4)

    for chunk in grouper(1000, my_data):
            results = p.map(myfunction, chunk)
            for r in results:
                with open (r"my_output_file","ab") as outfile:
                   outfile.write(r)

编辑我按照建议修改了我的代码(删除了冗余数据预处理).但是,问题似乎仍然存在.

EDITI modified my code following the suggestions (deleting redundant data pre-processing). However, the problem seems to be still there.

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")

    p = multiprocessing.Pool(4)

    results = p.map(myfunction, chunk, chunksize=1000)
        for r in results:
            with open (r"my_output_file","ab") as outfile:
                outfile.write(r)

推荐答案

根据您的代码段,我想我会像这样将文件分成8部分,然后由4个工作人员进行计算( 为什么要8个块和4个工人?这只是我为示例所做的随机选择.):

According to your snippet of code I guess I would do something like this to chunk the file in 8 parts and then make the computation to be done by 4 workers (why 8 chunks and 4 workers ? Just a random choice I made for the example.) :

from multiprocessing import Pool
import itertools

def myfunction(lines):
    returnlist = []
    for line in lines:
        list_of_elem = line.split(",")
        elem_id = list_of_elem[1]
        elem_to_check = list_of_elem[5]
        ids = list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
            returnlist.append(",".join(
                [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))

    return returnlist

def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(itertools.islice(it, size)), ())

if __name__ == "__main__":
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")

    prep = [strings for strings in chunk(my_data, round(len(my_data) / 8))]
    with Pool(4) as p:
        res = p.map(myfunction, prep)

    result = res.pop(0)
    _ = list(map(lambda x: result.extend(x), res))
    print(result)  # ... or do something with the result

这是假设您确信所有行的格式都相同,并且不会导致任何错误.

Edit :This is assuming you are confident all lines are formatted in the same way and will cause no error.

根据您的评论,通过在不使用multiprocessing的情况下对其进行测试或以相当大/难看的方式使用try/except来几乎确定该功能/文件内容中的问题可能很有用将产生输出( except 或结果):

According to your comments it might be useful to see what is the problem in your function/the content of your file by testing it without multiprocessing or using try/except in a pretty large/ugly way to be almost sure that an output will be produced (either the exception or the result) :

def myfunction(lines):
    returnlist = []
    for line in lines:
        try:
            list_of_elem = line.split(",")
            elem_id = list_of_elem[1]
            elem_to_check = list_of_elem[5]
            ids = list_of_elem[2].split("|")

            for x in itertools.permutations(ids,2):
                returnlist.append(",".join(
                    [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
        except Exception as err:
            returnlist.append('I encountered error {} on line {}'.format(err, line))

    return returnlist

这篇关于python中的多进程仅使用一个进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-13 07:55