我正在编写一个脚本,该脚本与multiprocessing同时处理一些mmap.Process并更新存储在mmap中并用互斥锁锁定的结果列表。

我写到结果列表的函数如下所示

def update_result(result_mmap, new_value, new_value_index, sema):
    sema.acquire()
    result_mmap.seek(0)
    old_result = result_mmap.readline().split("\t")
    old_result[new_value_index] = new_value
    new_result = "\t".join(map(str, old_result))
    result_mmap.resize(len(new_result))
    result_mmap.seek(0)
    result_mmap.write(new_result)
    sema.release()


这有时会起作用,但有时,根据进程的执行顺序,似乎result_mmap的大小无法正确调整。我不确定从这里看哪里-我知道比赛条件存在,但我不知道为什么。

编辑:这是调用update_result的函数:

def apply_function(mmapped_files, function, result_mmap, result_index, sema):
    for mf in mmapped_files:
        accumulator = int(mf.readline())
        while True:
            line = mf.readline()
            if line is None or line == '':
                break
            num = int(line)
            accumulator = function(num, accumulator)
        update_result(result_mmap, result_index, inc, sema)

最佳答案

也许我错了,但是您确定信号量确实在进程之间起作用(它是系统互斥体吗?)。因为如果不是,进程将不会共享相同的内存空间。您可能要考虑使用的是线程库,以便线程使用相同的信号量。

07-26 03:29