设置:我有一个函数preprocess(data, predicate)和一个谓词列表,可能如下所示:

preds = [lambda x: x < 1,
         lambda x: x < 2,
         lambda x: x < 3,
         lambda x: x < 42]

编辑:我可能应该更精确一些,因为我认为1,2,3,42显然是可以识别的例子,但它似乎太含蓄了。实际上,我正在做一些nlp,data是单词列表,一个谓词看起来像lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower()))。我想测试不同的谓词,以评估哪些表现最好。
这是我真正想做的。对每个谓词并行调用preprocess
编辑:因为这是一个预处理步骤,所以我需要的是返回的,以继续工作。
我希望我能做但遗憾的是不能:
pool = Pool(processes=4)
pool.map(lambda p: preprocess(data, p), preds)

据我所知,这是因为传递给pool.map的所有内容都必须是可pickle的。在this问题中,有两个建议的解决方案,第一个(可接受的答案)似乎是不切实际的,而secound似乎在python 2.7中不起作用,我正在使用python2.7,尽管在注释中建议它通过pythonic metaphor起作用。
我的问题是pool.map是否是正确的方法,如果是,如何做?或者我应该尝试另一种方法吗?
我知道有很多问题是关于我的,尽管我花了一些时间搜索,但我没有找到答案。另外,如果我的代码风格很笨拙,请随意指出。我看到有人觉得preprocess很奇怪,我可能应该使用functools.partial。
提前谢谢。

最佳答案

在这个简单的例子中,您可以简单地修改preprocess函数来接受threshold属性。类似:

def preprocess(data, threshold):
    def predicate(x):
        return x < threshold
    return old_preprocess(data, predicate)

现在在您的preds列表中,您可以简单地放置可选择的整数:
preds = [1,2,3,42]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))

您可以使用operator模块扩展它以选择操作员:
def preprocess(data, pred):
    threshold, op = pred
    def predicate(x):
        return op(x, threshold)
    return old_preprocess(data, predicate)

import operator as op
preds = [(1, op.lt), (2, op.gt), (3, op.ge), (42, op.lt)]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))

用任意谓词扩展它会变得更加困难。可能最简单的方法是使用marshal模块,它能够将函数的代码转换为bytes对象并返回。
类似:
real_preds = [marshal.dumps(pred.__code__) for pred in preds]

然后preprocess应该重新构建谓词函数:
import types

def preprocess(data, pred):
    pred = types.FunctionType(marshal.loads(pred), globals())

这是最后一个建议的最低限度:
>>> from multiprocessing import Pool
>>> import marshal
>>> import types
>>> def preprocess(pred):
...     pred = types.FunctionType(marshal.loads(pred), globals())
...     return pred(2)
...
>>> preds = [lambda x: x < 1,
...          lambda x: x <2,
...          lambda x: x < 3,
...          lambda x: x < 42]
>>> real_preds = [marshal.dumps(pred.__code__) for pred in preds]
>>> pool = Pool(processes=4)
>>> pool.map(preprocess, real_preds)
[False, False, True, True]

注意pool.map的参数必须是可选择的。这意味着您不能使用lambda作为Pool.map的第一个参数:
>>> pool.map(lambda x: preprocess(x), real_preds)
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.3/threading.py", line 639, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.3/threading.py", line 596, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.3/multiprocessing/pool.py", line 351, in _handle_tasks
    put(task)
  File "/usr/lib/python3.3/multiprocessing/connection.py", line 206, in send
    ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

关于“isPool.map是不是正确的工具?”我认为这在很大程度上取决于数据的大小。使用多重处理增加了相当多的开销,所以即使你“让它工作”,也有很大的机会不值得。特别是,在您编辑的问题中,您为谓词设置了一个更“真实”的场景:
lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower()))

我相信这个谓词没有足够的时间让它使用Pool.map。显然,这取决于w的大小和要映射的元素的数量。
使用这个谓词进行非常快速的测试,我发现当Pool.map的长度大约为35000个字符时,使用w开始变得更快。如果w小于1000,那么使用Pool比平原map慢15倍(用256个字符串来检查)。如果字符串是60000,则Pool要快一点)。
注意,如果w很长,那么使用def而不是lambda是值得的,并且避免了w.lower()的双重计算。如果要使用普通map或要使用Pool.map

10-06 05:19
查看更多