本文介绍了在类方法Python中调用多重处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最初,我有一个类来存储一些处理后的值,然后将其与其他方法一起使用.

Initially, I have a class to store some processed values and re-use those with its other methods.

问题是,当我尝试将类方法划分为多个进程以加速运行时,python生成了进程,但是它似乎不起作用(正如我在任务管理器中看到的那样,只有1个进程在运行)并且结果从未传递.

The problem is when i tried to divide the class method into multiple process to speed up, python spawned processes but it seems didn't work (as I saw in Task Manager that only 1 process was running) and result is never delivered.

我进行了几次搜索,发现pathos.multiprocessing可以代替它,但是我想知道标准库是否可以解决这个问题?

I did couple of search and found that pathos.multiprocessing can do this instead but I wonder if standard library can solve this problems?

from multiprocessing import Pool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return t

a = A(2)

a.run(list(range(10)))

推荐答案

您的代码失败,因为它无法pickle实例方法(self.cal),当您通过以下方法生成多个进程时,Python会尝试这样做将它们映射到multiprocessing.Pool(嗯,有一种方法可以做到,但是它太复杂了,反正不是很有用)-因为没有共享内存访问,所以它必须打包"数据并将其发送给生成的开箱流程.如果您尝试腌制a实例,也会发生同样的事情.

Your code fails as it cannot pickle the instance method (self.cal), which is what Python attempts to do when you're spawning multiple processes by mapping them to multiprocessing.Pool (well, there is a way to do it, but it's way too convoluted and not extremely useful anyway) - since there is no shared memory access it has to 'pack' the data and send it to the spawned process for unpacking. The same would happen to you if you tried to pickle the a instance.

multiprocessing程序包中唯一可用的共享内存访问是一个鲜为人知的multiprocessing.pool.ThreadPool,因此,如果您真的想这样做:

The only shared memory access available in the multiprocessing package is a little known multiprocessing.pool.ThreadPool so if you really want to do this:

from multiprocessing.pool import ThreadPool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = ThreadPool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return rs

a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

但是这不会给您并行化,因为它实际上映射到确实可以访问共享内存的常规线程.您应该传递类/静态方法(如果需要调用它们),并传递希望它们使用的数据(在您的情况下为self.vl).如果需要跨进程共享该数据,则必须使用某些共享内存抽象(例如multiprocessing.Value),当然还要应用互斥锁.

But this will not give you parallelization as it essentially maps to your regular threads which do have access to the shared memory. You should pass class/static methods instead (if you need them called) accompanied with the data you want them to work with (in your case self.vl). If you need to share that data across processes you'll have to use some shared memory abstraction, like multiprocessing.Value, applying mutex along the way of course.

更新

我说您可以做到(并且有些模块或多或少都在这样做,例如检查pathos.multiprocessing),但我认为这样做不值得-当您到了必须要解决的问题时诱使您的系统执行您想要的操作,很可能您使用的是错误的系统,或者您应该重新考虑您的设计.但是,为了了解情况,以下是在多处理设置中执行所需操作的一种方法:

I said you could do it (and there are modules that more or less are doing it, check pathos.multiprocessing for example) but I don't think it's worth the trouble - when you come to a point where you have to trick your system into doing what you want, chances are you're either using a wrong system or you should rethink your design. But for the sake of informedness, here is one way to do what you want in a multiprocessing setting:

import sys
from multiprocessing import Pool

def parallel_call(params):  # a helper for calling 'remote' instances
    cls = getattr(sys.modules[__name__], params[0])  # get our class type
    instance = cls.__new__(cls)  # create a new instance without invoking __init__
    instance.__dict__ = params[1]  # apply the passed state to the new instance
    method = getattr(instance, params[2])  # get the requested method
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
    return method(*args)  # expand arguments, call our method and return the result

class A(object):

    def __init__(self, vl):
        self.vl = vl

    def cal(self, nb):
        return nb * self.vl

    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(parallel_call, self.prepare_call("cal", dt))
        t.close()
        return rs

    def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
        for arg in args:
            yield [self.__class__.__name__, self.__dict__, name, arg]

if __name__ == "__main__":  # important protection for cross-platform use
    a = A(2)
    print(a.run(list(range(10))))
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

我认为这是很容易解释的,但是总之它传递了类的名称,其当前状态(无信号,tho),要调用的所需方法以及用函数,在Pool中为每个进程调用. Python自动对所有这些数据进行酸洗和剔除,因此parallel_call所需要做的就是重建原始对象,在其中找到所需的方法,然后使用提供的参数对其进行调用.

I think it's pretty self explanatory how it works, but in short it passes the name of your class, its current state (sans signals, tho), a desired method to be called and arguments to invoke it with to a parallel_call function which is called for each process in the Pool. Python automatically pickles and unpickles all this data so all parallel_call needs to do is reconstruct the original object, find a desired method in it and call it with the provided param(s).

这样,我们仅传递数据,而不尝试传递活动对象,因此Python不会抱怨(在这种情况下,请尝试在类参数中添加对实例方法的引用,看看会发生什么)效果很好.

This way we're passing only the data without trying to pass active objects so Python doesn't complain (well, in this case, try adding a reference to a instance method to your class parameters and see what happens) and everything works just fine.

如果您想沉迷于魔术",可以使其看起来完全像您的代码(创建自己的Pool处理程序,从函数中选取名称并将名称发送到实际进程等),但是对于您的示例,这应该可以提供足够的功能.

If you want to go heavy on the 'magic' you can make it look exactly like your code (create your own Pool handler, pick up names from the functions and send the names to actual processes, etc.) but this should serve a sufficient function for your example.

但是,请记住,只有在共享一个静态"实例(一旦在多处理环境中开始调用它的初始状态便不会更改其初始状态)时,此方法才起作用.如果A.cal方法要更改vl属性的内部状态-它将仅影响其更改的实例(除非在两次调用之间调用Pool的主实例中更改).如果还希望共享状态,则可以在调用后升级parallel_call以接听instance.__dict__并将其与方法调用结果一起返回,然后在调用方必须更新本地使用返回的数据更改原始状态.但这还不够-您实际上必须创建一个共享的dict并处理所有互斥体,才能使所有进程同时访问它(您可以使用multiprocessing.Manager).

However, before you raise your hopes up, keep in mind that this will work only when sharing a 'static' instance (an instance that doesn't change its initial state once you start invoking it in a multiprocessing context). If the A.cal method is to change the internal state of the vl property - it would affect only the instance where it changes (unless it changes in the main instance that calls the Pool between calls). If you want to share the state as well, you can upgrade parallel_call to pick up instance.__dict__ after the call and return it together with the method call result, then on the calling side you'd have to update the local __dict__ with the returned data to change the original state. And that's not enough - you'd actually have to create a shared dict and handle all the mutex staff to have it concurrently accessed by all the processes (you can use multiprocessing.Manager for that).

所以,正如我所说的,麻烦多于其价值...

So, as I was saying, more trouble than its worth...

这篇关于在类方法Python中调用多重处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 19:33