我有一个fun()
生成器的函数。我想检查生成器是否为空,并且由于我想节省尽可能多的运行时间,所以我
不要将其转换为列表,并检查其是否为空。
相反,我这样做:
def peek(iterable):
try:
first = next(iterable)
except StopIteration:
return None
return first, itertools.chain([first], iterable)
我使用这样的多重处理:
def call_generator_obj(ret_val):
next = peek(ret_val)
if next is not None:
return False
else:
return True
def main():
import multiprocessing as mp
pool = mp.Pool(processes=mp.cpu_count()-1)
# for loop over here
ret_val = fun(args, kwargs)
results.append(pool.apply(call_generator_obj, args=(ret_val,))
# the above line throws me the error
据我所知,腌制是将内存中的某些对象转换为字节流时进行的,而我在任何函数中都在执行类似的操作。
回溯:(在尖线之后)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 253, in apply
return self.apply_async(func, args, kwds).get()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 608, in get
raise self._value
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle generator objects
最佳答案
据我所知,腌制是在将内存中的某些对象转换为字节流时进行的,我认为我在这里没有做任何类似的事情。
好吧,你正在这样做。
您不能在进程之间直接传递Python值。即使是最简单的变量,也会在该进程的内存空间中的某个位置保留一个指向结构的指针,而仅将该指针复制到另一个进程将给您一个段错误或垃圾,这取决于另一个进程中的同一内存空间是未映射还是映射到完全不同的东西。像发电机这样复杂的东西(基本上是活动堆栈框架)将更加不可能。multiprocessing
解决的方法是透明腌制您传递给它的所有内容。函数及其参数及其返回值都需要被腌制。
如果您想知道它是如何工作的:Pool
本质上是通过使Queue
处于父put
的任务(基本上是(func, args)
对)以及孩子的get
任务关闭。 Queue
本质上是通过调用pickle.dumps(value)
然后将结果写入管道或其他进程间通信机制来工作的。