假设我有一个非常大的文本文件,其中包含许多我想撤消的行。而且我不在乎最后的命令。输入文件包含西里尔符号。我使用multiprocessing在多个内核上进行处理。

我写了这样的程序:

# task.py

import multiprocessing as mp


POOL_NUMBER = 2


lock_read = mp.Lock()
lock_write = mp.Lock()

fi = open('input.txt', 'r')
fo = open('output.txt', 'w')

def handle(line):
    # In the future I want to do
    # some more complicated operations over the line
    return line.strip()[::-1]  # Reversing

def target():
    while True:
        try:
            with lock_read:
                line = next(fi)
        except StopIteration:
            break

        line = handle(line)

        with lock_write:
            print(line, file=fo)

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
    p.start()
for p in pool:
    p.join()

fi.close()
fo.close()

该程序失败,并出现以下错误:
Process Process-2:
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte

另一方面,如果我设置POOL_NUMBER = 1,一切正常。但是,如果我想获得整体性能,那是没有意义的。

为什么会发生该错误?我该如何解决?

我使用Python 3.5.2

我使用以下脚本生成了数据:
# gen_file.py

from random import randint


LENGTH = 100
SIZE = 100000


def gen_word(length):
    return ''.join(
        chr(randint(ord('а'), ord('я')))
        for _ in range(length)
    )


if __name__ == "__main__":
    with open('input.txt', 'w') as f:
        for _ in range(SIZE):
            print(gen_word(LENGTH), file=f)

最佳答案

这里的问题是从多个进程读取文件无法正常工作,您无法在进程之间共享open对象。

您可以创建一个全局current_line变量,并且每次读取文件并处理当前行时,都不理想。

这是一种不同的方法,使用进程池和map方法,我遍历文件,并针对每一行将目标方法加入队列:

from multiprocessing import Lock
from multiprocessing import Pool
import time
import os

POOL_NUMBER = 8

def target(line):
    # Really need some processing here
    for _ in range(2**10):
        pass
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
    t0 = time.time()
    processed_lines = pool.map(target, fi.readlines())
    print('Total time', time.time() - t0)

    with open('output.txt', 'w') as fo:
        for processed_line in processed_lines:
            fo.writelines(processed_line)

在我的机器上有8个进程:Total time 1.3367934226989746
并通过1个过程:Total time 4.324501991271973
如果您的目标函数受CPU限制,则效果最佳。另一种方法是将文件拆分为POOL_NUMBER块,并使每个进程将已处理的数据块(带锁!)写入输出文件。

另一种方法是创建一个主进程,该主进程负责其余进程的写入工作,here是一个示例。

编辑

发表评论后,我认为您无法将文件放入内存中。
为此,您可以仅遍历将逐行读入内存的文件对象。但是比我们需要修改一些代码:
POOL_NUMBER = 8
CHUNK_SIZE = 50000

def target(line):
    # This is not a measurable task, since most of the time wil spent on writing the data
    # if you have a CPU bound task, this code will make sense
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
processed_lines = []

with open('input.txt', 'r') as fi:
    t0 = time.time()
    for line in fi:
        processed_lines.append(pool.apply_async(target, (line,)))  # Keep a refernce to this task, but don't

        if len(processed_lines) == CHUNK_SIZE:
            with open('output.txt', 'w') as fo:  # reading the file line by line
                for processed_line in processed_lines:
                    fo.writelines(processed_line.get())
            processed_lines = []  # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
    print('Total time', time.time() - t0)

请记住,您可以使用CHUNK_SIZE变量来控制使用的内存量。对我来说,每个过程最多5000个,大约1万个。

P.S

我认为最好将大文件拆分为较小的文件,这样您就可以解决文件的读/写锁定,并使其可扩展以进行处理(即使在其他计算机上也是如此!)

09-27 02:05
查看更多