问题描述
>>> import concurrent.futures
>>> from collections import namedtuple
>>> #1. Initialise namedtuple here
>>> # tm = namedtuple("tm", ["pk"])
>>> class T:
... #2. Initialise named tuple here
... #tm = namedtuple("tm", ["pk"])
... def __init__(self):
... #3: Initialise named tuple here
... tm = namedtuple("tm", ["pk"])
... self.x = {'key': [tm('value')]}
... def test1(self):
... with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
... results = executor.map(self.test, ["key"])
... return results
... def test(self, s):
... print(self.x[s])
...
>>> t = T().test1()
这里卡住了.
^CTraceback (most recent call last):
File "<stdin>", line 1, in <module>
Process ForkProcess-1:
File "<stdin>", line 10, in test1
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 623, in __exit__
self.shutdown(wait=True)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 681, in shutdown
self._queue_management_thread.join()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1044, in join
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
self._wait_for_tstate_lock()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1060, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
如果我在类外部(在#1中)初始化命名的元组,则可以正常工作.有人可以让我知道如果我按照#2或#3进行初始化是什么问题吗?
If I initialise the named tuple outside of the class (in #1), in that case, this works fine. Could someone please let me know what is the issue if I initialise as per #2 or #3 ?
推荐答案
您无需更改初始化namedtuple的位置.您正在更改创建命名元组 class 的位置.
You're not changing where you initialize the namedtuple. You're changing where you create the namedtuple class.
当您创建名为"x"的namedtuple类时,在模块"y"中使用 collections.namedtuple
,将其 __ module __
设置为'y'
,并将其 __ qualname __
设置为'x'
.酸洗和解酸依赖于此类实际在这些属性指示的 y.x
位置中可用,但在示例2和3的情况下则不是.
When you create a namedtuple class named "x" in module "y" with collections.namedtuple
, its __module__
is set to 'y'
and its __qualname__
is set to 'x'
. Pickling and unpickling relies on this class actually being available in the y.x
location indicated by these attributes, but in cases 2 and 3 of your example, it's not.
Python无法腌制namedtuple,这会中断与工作人员的进程间通信.在工作进程中执行 self.test
依赖于对 self.test
进行酸洗并在工作进程中解开其副本,如果 self.x
是无法腌制的类的实例.
Python can't pickle the namedtuple, which breaks inter-process communication with the workers. Executing self.test
in a worker process relies on pickling self.test
and unpickling a copy of it in the worker process, and that can't happen if self.x
is an instance of a class that can't be pickled.
这篇关于在少数情况下,将collections.namedtuple与ProcessPoolExecutor一起使用会陷入困境的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!