假设我有一个非常大的文本文件,其中包含许多我想撤消的行。而且我不在乎最后的命令。输入文件包含西里尔符号。我使用multiprocessing
在多个内核上进行处理。
我写了这样的程序:
# task.py
import multiprocessing as mp
POOL_NUMBER = 2
lock_read = mp.Lock()
lock_write = mp.Lock()
fi = open('input.txt', 'r')
fo = open('output.txt', 'w')
def handle(line):
# In the future I want to do
# some more complicated operations over the line
return line.strip()[::-1] # Reversing
def target():
while True:
try:
with lock_read:
line = next(fi)
except StopIteration:
break
line = handle(line)
with lock_write:
print(line, file=fo)
pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
p.start()
for p in pool:
p.join()
fi.close()
fo.close()
该程序失败,并出现以下错误:
Process Process-2:
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte
另一方面,如果我设置
POOL_NUMBER = 1
,一切正常。但是,如果我想获得整体性能,那是没有意义的。为什么会发生该错误?我该如何解决?
我使用
Python 3.5.2
。我使用以下脚本生成了数据:
# gen_file.py
from random import randint
LENGTH = 100
SIZE = 100000
def gen_word(length):
return ''.join(
chr(randint(ord('а'), ord('я')))
for _ in range(length)
)
if __name__ == "__main__":
with open('input.txt', 'w') as f:
for _ in range(SIZE):
print(gen_word(LENGTH), file=f)
最佳答案
这里的问题是从多个进程读取文件无法正常工作,您无法在进程之间共享open
对象。
您可以创建一个全局current_line
变量,并且每次读取文件并处理当前行时,都不理想。
这是一种不同的方法,使用进程池和map
方法,我遍历文件,并针对每一行将目标方法加入队列:
from multiprocessing import Lock
from multiprocessing import Pool
import time
import os
POOL_NUMBER = 8
def target(line):
# Really need some processing here
for _ in range(2**10):
pass
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
t0 = time.time()
processed_lines = pool.map(target, fi.readlines())
print('Total time', time.time() - t0)
with open('output.txt', 'w') as fo:
for processed_line in processed_lines:
fo.writelines(processed_line)
在我的机器上有8个进程:
Total time 1.3367934226989746
并通过1个过程:
Total time 4.324501991271973
如果您的目标函数受CPU限制,则效果最佳。另一种方法是将文件拆分为
POOL_NUMBER
块,并使每个进程将已处理的数据块(带锁!)写入输出文件。另一种方法是创建一个主进程,该主进程负责其余进程的写入工作,here是一个示例。
编辑
发表评论后,我认为您无法将文件放入内存中。
为此,您可以仅遍历将逐行读入内存的文件对象。但是比我们需要修改一些代码:
POOL_NUMBER = 8
CHUNK_SIZE = 50000
def target(line):
# This is not a measurable task, since most of the time wil spent on writing the data
# if you have a CPU bound task, this code will make sense
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
processed_lines = []
with open('input.txt', 'r') as fi:
t0 = time.time()
for line in fi:
processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't
if len(processed_lines) == CHUNK_SIZE:
with open('output.txt', 'w') as fo: # reading the file line by line
for processed_line in processed_lines:
fo.writelines(processed_line.get())
processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
print('Total time', time.time() - t0)
请记住,您可以使用
CHUNK_SIZE
变量来控制使用的内存量。对我来说,每个过程最多5000个,大约1万个。P.S
我认为最好将大文件拆分为较小的文件,这样您就可以解决文件的读/写锁定,并使其可扩展以进行处理(即使在其他计算机上也是如此!)