我正在编写一个脚本,该脚本与multiprocessing同时处理一些mmap.Process并更新存储在mmap中并用互斥锁锁定的结果列表。
我写到结果列表的函数如下所示
def update_result(result_mmap, new_value, new_value_index, sema):
sema.acquire()
result_mmap.seek(0)
old_result = result_mmap.readline().split("\t")
old_result[new_value_index] = new_value
new_result = "\t".join(map(str, old_result))
result_mmap.resize(len(new_result))
result_mmap.seek(0)
result_mmap.write(new_result)
sema.release()
这有时会起作用,但有时,根据进程的执行顺序,似乎result_mmap的大小无法正确调整。我不确定从这里看哪里-我知道比赛条件存在,但我不知道为什么。
编辑:这是调用
update_result
的函数:def apply_function(mmapped_files, function, result_mmap, result_index, sema):
for mf in mmapped_files:
accumulator = int(mf.readline())
while True:
line = mf.readline()
if line is None or line == '':
break
num = int(line)
accumulator = function(num, accumulator)
update_result(result_mmap, result_index, inc, sema)
最佳答案
也许我错了,但是您确定信号量确实在进程之间起作用(它是系统互斥体吗?)。因为如果不是,进程将不会共享相同的内存空间。您可能要考虑使用的是线程库,以便线程使用相同的信号量。