我正在使用SQLAlchemy和多处理。我还使用了scoped_session,它避免了共享相同的会话,但是我发现了一个错误及其解决方案,但我不知道为什么会发生。

您可以在下面看到我的代码:

db.py

engine = create_engine(connection_string)

Session = sessionmaker(bind=engine)
DBSession = scoped_session(Session)


script.py

from multiprocessing import Pool, current_process
from db import DBSession

def process_feed(test):
    session = DBSession()
    print(current_process().name, session)

def run():
    session = DBSession()
    pool = Pool()
    print(current_process().name, session)
    pool.map_async(process_feed, [1, 2]).get()

if __name__ == "__main__":
    run()


当我运行script.py时,输出为:

MainProcess <sqlalchemy.orm.session.Session object at 0xb707b14c>
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb707b14c>
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb707b14c>


请注意,会话对象在主进程及其工作进程(子进程)中是相同的0xb707b14c

但是如果我更改前两行的顺序run():

def run():
    pool = Pool() # <--- Now pool is instanced in the first line
    session = DBSession()  # <--- Now session is instanced in the second line
    print(current_process().name, session)
    pool.map_async(process_feed, [1, 2]).get()


然后我再次运行script.py的输出是:

MainProcess <sqlalchemy.orm.session.Session object at 0xb66907cc>
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb669046c>
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb66905ec>


现在,会话实例有所不同。

最佳答案

要了解为什么会发生这种情况,您需要了解scoped_sessionPool的实际作用。 scoped_session保留会话注册表,以便进行以下操作


第一次调用DBSession时,它将在注册表中为您创建一个Session对象
随后,如果满足必要条件(即同一线程,会话尚未关闭),则它不会创建新的Session对象,而是将您先前创建的Session对象返回给您


创建Pool时,它将在__init__方法中创建工作程序。 (请注意,在__init__中启动工作进程没有任何基础。同样有效的实现可以等到首次需要工作程序之后再启动它们,这在您的示例中将表现出不同的行为。)这种情况发生(在Unix上),父进程会为每个工作进程派生自己,这涉及操作系统将当前正在运行的进程的内存复制到新进程中,因此您将在完全相同的位置上获得完全相同的对象。

将这两者放在一起,在第一个示例中,您将在分叉之前创建一个Session,在创建Pool时将其复制到所有工作进程中,从而得到相同的标识,而在第二个示例中,您将延迟在工作进程启动之后创建Session对象,从而导致不同的标识。

重要的是要注意,尽管Session对象共享相同的id,但它们不是同一对象,在某种意义上,如果您在父进程中对Session进行了任何更改,它们将不会反映在子进程。由于分叉,它们恰好都共享相同的内存地址。但是,诸如连接之类的OS级资源是共享的,因此,如果在session之前在Pool()上运行查询,则将在连接池中为您创建一个连接,然后将其分支到子进程中。然后,如果您尝试在子进程中执行查询,则会遇到奇怪的错误,因为您的进程在相同的确切连接上相互破坏!

对于Windows,上面没有什么意义,因为Windows没有fork()

10-07 13:02
查看更多