我有一个非常简单的 concurrent.futures.ProcessPoolExecutor
实现——类似于(使用 Python 3.6):
files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
list(executor.map(processor.process, files))
虽然
processor
是许多可用处理器类中的任何一个的实例,但它们都共享 process
方法,大致如下所示:def process(self, file):
log.debug(f"Processing source file {file.name}.")
with DBConnection(self.db_url) as session:
file = session.merge(file)
session.refresh(file)
self._set_file(file)
timer = perf_counter()
try:
self.records = self._get_records()
self._save_output()
except Exception as ex:
log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
self.error_time = time.time()
self.records = None
else:
process_duration = perf_counter() - timer
log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
file.process_duration = process_duration
session.commit()
_get_records
和 _save_output
方法的实现因类而异,但我的问题是处理错误。我故意测试它,以便这两种方法中的一种耗尽内存,但我希望上面的 except
块能够捕获它并移动下一个文件——这正是我运行代码时发生的情况单进程。如果我如上所述使用
ProcessPoolExecutor
,它会引发 BrokenProcessPool
异常并终止所有执行:Traceback (most recent call last):
File "/vagrant/myapp/myapp.py", line 94, in _process
list(executor.map(processor.process, files))
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
for element in iterable:
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
我当然可以在调用代码中捕获
BrokenProcessPool
,但我更愿意在内部处理错误并继续处理下一个文件。我还尝试使用标准的
multiprocessing.Pool
对象,如下所示:with multiprocessing.Pool() as pool:
pool.map(processor.process, files)
在这种情况下,行为甚至更奇怪:在开始处理前两个文件后,这会引发内存不足错误,它继续处理后面的文件,这些文件较小,因此可以完全处理。然而,
except
块显然永远不会被触发(没有日志消息,没有 error_time
),并且应用程序只是挂起,既没有完成也没有做任何事情,直到被手动杀死。我希望
try..except
块可以使每个进程自包含,在不影响主应用程序的情况下处理自己的错误。任何想法如何实现这一目标? 最佳答案
因此,经过大量调试(并归功于@RomanPerekhrest 建议检查 executor
对象),我找到了原因。如问题中所述,测试数据由多个文件组成,其中两个文件非常大(每个文件超过 100 万行 CSV)。这两个都导致我的测试机器(一个 2GB 虚拟机)窒息,但方式不同——第一个更大,导致常规的内存不足错误,将由 except
处理,第二个只是导致 sigkill
.无需过多探索,我怀疑较大的文件在读取时根本无法放入内存(在 _get_records
方法中完成),而较小的文件可以,但随后对其进行操作(在 _save_output
中完成) caused the overflow 并终止了进程.
我的解决方案是简单地捕获 BrokenProcessPool
异常并将问题告知用户;我还添加了一个在一个进程中运行处理任务的选项,在这种情况下,任何过大的文件都会被简单地标记为有错误:
files = get_files()
processor = get_processor_instance()
results = []
if args.nonconcurrent:
results = list(map(processor.process, files))
else:
with concurrent.futures.ProcessPoolExecutor() as executor:
try:
results = list(executor.map(processor.process, files))
except concurrent.futures.process.BrokenProcessPool as ex:
raise MyCustomProcessingError(
f"{ex} This might be caused by limited system resources. "
"Try increasing system memory or disable concurrent processing "
"using the --nonconcurrent option."
)
关于Python concurrent.futures : handling exceptions in child processes,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47237112/