本文介绍了如何使用 Python 多处理创建同步对象?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不知道如何制作同步的 Python 对象.我有一个叫做 Observation 的类和一个叫做 Variable 的类,基本上看起来像(代码被简化以显示本质):

I am trouble figuring out how to make a synchronized Python object. I have a class called Observation and a class called Variable that basically looks like (code is simplified to show the essence):

class Observation:
    def __init__(self, date, time_unit, id, meta):
        self.date = date
        self.time_unit = time_unit
        self.id = id
        self.count = 0
        self.data = 0

    def add(self, value):
        if isinstance(value, list):
            if self.count == 0:
                self.data = []
            self.data.append(value)
        else:
            self.data += value
        self.count += 1


class Variable:
    def __init__(self, name, time_unit, lock):
        self.name = name
        self.lock = lock
        self.obs = {}
        self.time_unit = time_unit

    def get_observation(self, id, date, meta):
        self.lock.acquire()
        try:
            obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
            self.obs[id] = obs
        finally:
            self.lock.release()
        return obs

    def add(self, date, value, meta={}):
        self.lock.acquire()
        try:
            obs = self.get_observation(id, date, meta)
            obs.add(value)
            self.obs[id] = obs
        finally:
            self.lock.release()

这是我设置多处理部分的方式:插件 = 在别处定义的函数任务 = JoinableQueue()结果 = JoinableQueue()mgr = 经理()锁 = mgr.RLock()var = Variable('foobar', 'year', lock)

This is how I setup the multiprocessing part: plugin = function defined somewhere else tasks = JoinableQueue() result = JoinableQueue() mgr = Manager() lock = mgr.RLock() var = Variable('foobar', 'year', lock)

for person in persons:
    tasks.put(Task(plugin, var, person))

代码应该如何工作的示例:

Example of how the code is supposed to work:

我有一个名为 var 的变量实例,我想向 var 添加一个观察:

I have an instance of Variable called var and I want to add an observation to var:

today = datetime.datetime.today()
var.add(today, 1)

因此,Variable 的 add 函数查看是否已经存在该日期的观察,如果存在,则返回该观察,否则创建一个新的观察实例.通过调用 obs.add(value) 添加一个观察值而不是实际值.我主要关心的是我想确保不同的进程不会为同一日期创建多个 Observation 实例,这就是我锁定它的原因.

So, the add function of Variable looks whether there already exists an observation for that date, if it does then it returns that observation else it creates a new instance of Observation. Having found an observation than the actual value is added by the call obs.add(value). My main concern is that I want to make sure that different processes are not creating multiple instances of Observation for the same date, that's why I lock it.

Variable 的一个实例被创建并在使用多处理库的不同进程之间共享,并且是多个 Observation 实例的容器.上面的代码不起作用,我得到了错误:

One instance of Variable is created and is shared between different processes using the multiprocessing library and is the container for numerous instances of Observation. The above code does not work, I get the error:

运行时错误:锁定对象应该只通过进程间共享继承

但是,如果我在启动不同进程之前实例化一个 Lock 对象并将其提供给 Variable 的构造函数,那么似乎我会遇到竞争条件,因为所有进程似乎都在等待对方.

However, if I instantiate a Lock object before launching the different processes and supply it to the constructor of Variable then it seems that I get a race condition as all processes seem to be waiting for each other.

最终目的是不同进程可以更新对象Variable中的obs变量.我需要它是线程安全的,因为我不仅仅是在适当的地方修改字典,而是添加新元素并增加现有变量.obs 变量是一个包含大量 Observation 实例的字典.

The ultimate goal is that different processes can update the obs variable in the object Variable. I need this to be threadsafe because I am not just modifying the dictionary in place but adding new elements and incrementing existing variables. the obs variable is a dictionary that contains a bunch of instances of Observation.

如何在多个多处理进程之间共享一个 Variable 实例的情况下进行同步?非常感谢您的认知盈余!

How can I make this synchronized where I share one single instance of Variable between numerous multiprocessing processes? Thanks so much for your cognitive surplus!

更新 1:
* 我正在使用多处理锁,并且我已经更改了源代码以显示这一点.
* 我已更改标题以更准确地捕捉问题
* 我已经用同步替换了 theadsafe,我混淆了这两个术语.

UPDATE 1:
* I am using multiprocessing Locks and I have changed the source code to show this.
* I have changed the title to more accurately capture the problem
* I have replaced theadsafe with synchronization where I was confusing the two terms.

感谢 Dmitry Dvoinikov 指出我!

Thanks to Dmitry Dvoinikov for pointing me out!

我仍然不确定的一个问题是在哪里实例化 Lock?这应该发生在类内部还是在初始化多进程并将其作为参数之前?答案:应该在课外发生.

One question that I am still not sure about is where do I instantiate Lock? Should this happen inside the class or before initializing the multiprocesses and give it as an argument? ANSWER: Should happen outside the class.

更新 2:
* 通过将 Lock 的初始化移到类定义之外并使用管理器,我修复了Lock 对象只能通过继承在进程之间共享"错误.
* 最后一个问题,现在一切正常,除了当我将 Variable 实例放入队列时,它似乎没有更新,并且每次我从队列中获取它时,它都不包含我在前一次迭代中添加的观察.这是唯一让我困惑的事情:(

UPDATE 2:
* I fixed the 'Lock objects should only be shared between processes through inheritance' error by moving the initialization of the Lock outside the class definition and using a manager.
* Final question, now everything works except that it seems that when I put my Variable instance in the queue then it does not get updated, and everytime I get it from the queue it does not contain the observation I added in the previous iteration. This is the only thing that is confusing me :(

更新 3:
最终的解决方案是将 var.obs 字典设置为 mgr.dict() 的实例,然后使用自定义序列化程序.很高兴与同样为此苦苦挣扎的人分享代码.

UPDATE 3:
The final solution was to set the var.obs dictionary to an instance of mgr.dict() and then to have a custom serializer. Happy tho share the code with somebody who is struggling with this as well.

推荐答案

您不是在谈论线程安全,而是在谈论不同进程之间的同步,这是完全不同的事情.总之,开始

You are talking not about thread safety but about synchronization between separate processes and that's entirely different thing. Anyway, to start

不同进程可以更新对象Variable中的obs变量.

暗示 Variable 在共享内存中,您必须显式地将对象存储在那里,毫无疑问,本地实例对单独的进程变得可见.这里:

implies that Variable is in shared memory, and you have to explicitly store objects there, by no magic a local instance becomes visible to separate process. Here:

数据可以使用值或数组存储在共享内存映射中

然后,您的代码片段缺少重要的导入部分.无法判断您是否实例化了正确的 multiprocessing.Lock,而不是 multithreading.Lock.您的代码未显示您创建流程和传递数据的方式.

Then, your code snippet is missing crucial import section. No way to tell whether you instantiate the right multiprocessing.Lock, not multithreading.Lock. Your code doesn't show the way you create processes and pass data around.

因此,我建议您了解线程和进程之间的区别,您是否真的需要一个包含多个进程的应用程序的共享内存模型并检查规范.

Therefore, I'd suggest that you realize the difference between threads and processes, whether you truly need a shared memory model for an application which contains multiple processes and examine the spec.

这篇关于如何使用 Python 多处理创建同步对象?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 00:15