本文介绍了SQLAlchemy,惯用的Python方式可序列化的事务隔离和重试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

PostgreSQL和SQL定义了。如果您将事务隔离到此级别,则冲突的并发事务将中止并需要重试。

PostgreSQL and SQL defines a Serializable transaction isolation level. If you isolate transactions to this level, conflicting concurrent transactions abort and need retrying.

我熟悉从Plone / Zope世界进行的事务重试的概念,整个HTTP请求发生交易冲突时可以重播。使用SQLAlchemy(以及可能使用)可能如何实现类似的功能?我试图阅读zope.sqlalchemy和的文档,但这对我来说并不明显。

I am familiar with the concept of transaction retries from Plone / Zope world where the entire HTTP request can be replayed in the case there is a transaction conflict. How similar functionality could be achieved with SQLAlchemy (and potentially with zope.sqlalchemy)? I tried to read the documentation of zope.sqlalchemy and Zope transaction manager, but this is not obvious the me.

我特别想要这样的东西:

Specially I want something like this:

  # Try to do the stuff, if it fails because of transaction conflict do again until retry count is exceeded
  with transaction.manager(retries=3):
        do_stuff()

  # If we couldn't get the transaction through even after 3 attempts, fail with a horrible exception


推荐答案

因此,在戳了大约两个星期并且没有现成的解决方案之后,我想出了自己的解决方案。

So, after poking around two weeks and getting no off-the-shelf solution I came up with my own.

这里是一个 ConflictResolver 类,它提供 managed_transaction 函数装饰器。您可以使用装饰器将功能标记为可重试。即如果在运行该函数时出现数据库冲突错误,则会再次运行该函数,现在,更多的希望导致冲突错误的数据库事务将完成。

Here is a ConflictResolver class which provides managed_transaction function decorator. You can use the decorator to mark functions to be retryable. I.e. if there is an database conflict error when running the function, the function is run again, now with more hopes the db transaction which caused the conflict error would have finished.

源代码在这里:

该单元测试将其覆盖在这里:

The unit tests to cover it are here: https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

仅Python 3.4 +。

Python 3.4+ only.

"""Serialized SQL transaction conflict resolution as a function decorator."""

import warnings
import logging
from collections import Counter

from sqlalchemy.orm.exc import ConcurrentModificationError
from sqlalchemy.exc import OperationalError


UNSUPPORTED_DATABASE = "Seems like we might know how to support serializable transactions for this database. We don't know or it is untested. Thus, the reliability of the service may suffer. See transaction documentation for the details."

#: Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy
DATABASE_COFLICT_ERRORS = []

try:
    import psycopg2.extensions
except ImportError:
    pass
else:
    DATABASE_COFLICT_ERRORS.append((psycopg2.extensions.TransactionRollbackError, None))

# ORA-08177: can't serialize access for this transaction
try:
    import cx_Oracle
except ImportError:
    pass
else:
    DATABASE_COFLICT_ERRORS.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177))

if not DATABASE_COFLICT_ERRORS:
    # TODO: Do this when cryptoassets app engine is configured
    warnings.warn(UNSUPPORTED_DATABASE, UserWarning, stacklevel=2)

#: XXX: We need to confirm is this the right way for MySQL, SQLIte?
DATABASE_COFLICT_ERRORS.append((ConcurrentModificationError, None))


logger = logging.getLogger(__name__)


class CannotResolveDatabaseConflict(Exception):
    """The managed_transaction decorator has given up trying to resolve the conflict.

    We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing.
    """


class ConflictResolver:

    def __init__(self, session_factory, retries):
        """

        :param session_factory: `callback()` which will give us a new SQLAlchemy session object for each transaction and retry

        :param retries: The number of attempst we try to re-run the transaction in the case of transaction conflict.
        """
        self.retries = retries

        self.session_factory = session_factory

        # Simple beancounting diagnostics how well we are doing
        self.stats = Counter(success=0, retries=0, errors=0, unresolved=0)

    @classmethod
    def is_retryable_exception(self, e):
        """Does the exception look like a database conflict error?

        Check for database driver specific cases.

        :param e: Python Exception instance
        """

        if not isinstance(e, OperationalError):
            # Not an SQLAlchemy exception
            return False

        # The exception SQLAlchemy wrapped
        orig = e.orig

        for err, func in DATABASE_COFLICT_ERRORS:
            # EXception type matches, now compare its values
            if isinstance(orig, err):
                if func:
                    return func(e)
                else:
                    return True

        return False

    def managed_transaction(self, func):
        """SQL Seralized transaction isolation-level conflict resolution.

        When SQL transaction isolation level is its highest level (Serializable), the SQL database itself cannot alone resolve conflicting concurrenct transactions. Thus, the SQL driver raises an exception to signal this condition.

        ``managed_transaction`` decorator will retry to run everyhing inside the function

        Usage::

            # Create new session for SQLAlchemy engine
            def create_session():
                Session = sessionmaker()
                Session.configure(bind=engine)
                return Session()

            conflict_resolver = ConflictResolver(create_session, retries=3)

            # Create a decorated function which can try to re-run itself in the case of conflict
            @conflict_resolver.managed_transaction
            def myfunc(session):

                # Both threads modify the same wallet simultaneously
                w = session.query(BitcoinWallet).get(1)
                w.balance += 1

            # Execute the conflict sensitive code inside a managed transaction
            myfunc()

        The rules:

        - You must not swallow all exceptions within ``managed_transactions``. Example how to handle exceptions::

            # Create a decorated function which can try to re-run itself in the case of conflict
            @conflict_resolver.managed_transaction
            def myfunc(session):

                try:
                    my_code()
                except Exception as e:
                    if ConflictResolver.is_retryable_exception(e):
                        # This must be passed to the function decorator, so it can attempt retry
                        raise
                    # Otherwise the exception is all yours

        - Use read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees e.g. for displaying the total balance.

        - Never do external actions, like sending emails, inside ``managed_transaction``. If the database transaction is replayed, the code is run twice and you end up sending the same email twice.

        - Managed transaction section should be as small and fast as possible

        - Avoid long-running transactions by splitting up big transaction to smaller worker batches

        This implementation heavily draws inspiration from the following sources

        - http://stackoverflow.com/q/27351433/315168

        - https://gist.github.com/khayrov/6291557
        """

        def decorated_func():

            # Read attemps from app configuration
            attempts = self.retries

            while attempts >= 0:

                session = self.session_factory()
                try:
                    result = func(session)
                    session.commit()
                    self.stats["success"] += 1
                    return result

                except Exception as e:
                    if self.is_retryable_exception(e):
                        session.close()
                        self.stats["retries"] += 1
                        attempts -= 1
                        if attempts < 0:
                            self.stats["unresolved"] += 1
                            raise CannotResolveDatabaseConflict("Could not replay the transaction {} even after {} attempts".format(func, self.retries)) from e
                        continue
                    else:
                        session.rollback()
                        self.stats["errors"] += 1
                        # All other exceptions should fall through
                        raise

        return decorated_func

这篇关于SQLAlchemy,惯用的Python方式可序列化的事务隔离和重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-23 04:36
查看更多