本文介绍了Python 多处理安全地写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解决一个涉及大量子问题的大型数值问题,并且我正在使用 Python 的多处理模块(特别是 Pool.map)将不同的独立子问题拆分到不同的核心上.每个子问题都涉及计算许多子子问题,如果它们尚未被任何进程计算过,我试图通过将它们存储到文件中来有效地记住这些结果,否则跳过计算并只从文件中读取结果.

I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.

我遇到了文件的并发问题:不同的进程有时会检查是否已经计算了子问题(通过查找将存储结果的文件),如果没有,运行计算,然后尝试同时将结果写入同一个文件.我如何避免写这样的冲突?

I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?

推荐答案

@GP89 ​​提到了一个很好的解决方案.使用队列将写入任务发送到对文件具有唯一写入权限的专用进程.所有其他工作人员都具有只读访问权限.这将消除冲突.这是一个使用 apply_async 的示例,但它也适用于 map:

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '
')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()

这篇关于Python 多处理安全地写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 06:31