已知pathos.multiprocessing在Python中优于multiprocessing库,因为前者使用dill而不是pickle并且可以序列化更多的函数和其他内容。

但是,当使用pool.map()pathos结果以行方式写入文件时,会遇到一些麻烦。如果ProcessPool中的所有进程将结果逐行写入单个文件中,则它们将相互干扰,同时写入一些行并破坏工作。通过使用普通的multiprocessing包,我能够使进程写入其自己的单独文件,并以当前进程ID命名,如下所示:

example_data = range(100)
def process_point(point):
    output = "output-%d.gz" % mpp.current_process().pid
    with gzip.open(output, "a+") as fout:
        fout.write('%d\n' % point**2)


然后,此代码运行良好:

import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)


但是这段代码没有:

from pathos import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)


并抛出AttributeError

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-10-a6fb174ec9a5> in <module>()
----> 1 pool.map(process_point, example_data)

/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in map(self, func, iterable, chunksize)
    128         '''
    129         assert self._state == RUN
--> 130         return self.mapAsync(func, iterable, chunksize).get()
    131
    132     def imap(self, func, iterable, chunksize=1):

/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in get(self, timeout)
    371             return self._value
    372         else:
--> 373             raise self._value
    374
    375     def _set(self, i, obj):

AttributeError: 'module' object has no attribute 'current_process'


current_process()中没有pathos,我找不到任何类似的内容。有任何想法吗?

最佳答案

我是pathos作者。虽然您的答案适用于这种情况,但最好使用multiprocessing中的pathos分支,该分支位于相当钝的位置:pathos.helpers.mp

这为您提供了与multiprocessing的一对一映射,但具有更好的序列化。因此,您将使用pathos.helpers.mp.current_process

抱歉,这既没有记录,也没有明显……我应该至少改善这两个问题之一。

关于python - 与pathos.multiprocessing并行安全地写入文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32568514/

10-10 07:26