版本:1.1.0b2 |发布日期:2016年7月1日

SQLAlchemy 1.1文档

事务和连接管理

管理交易

新构建的Session可以被认为处于“开始”状态。在这种状态下,Session尚未与任何可能与之关联的Engine对象建立任何连接或事务状态。

然后,Session接收请求以操作数据库连接。Typically, this means it is called upon to execute SQL statements using a particular Engine, which may be via Session.query(), Session.execute(), or within a flush operation of pending data, which occurs when such state exists and Session.commit() or Session.flush() is called.

当收到这些请求时,遇到的每个新的Engine都与由Session维护的正在进行的事务状态相关联。当第一个Engine被操作时,Session可以说已经离开了“开始”状态并且进入了“事务性”状态。对于遇到的每个Engine,都有一个Connection与它关联,它是通过Engine.contextual_connect()方法获取的。如果ConnectionSession直接关联(参见Joining a Session into an External Transaction (such as for test suites)这个),它被直接添加到事务状态。

对于每个ConnectionSession还维护一个Transaction对象,该对象通过调用Connection.begin()在每个Connection上,或者如果Session对象已经使用标志twophase=True建立,则TwoPhaseTransaction通过Connection.begin_twophase()获取。这些事务全部被提交或回滚,对应于Session.commit()Session.rollback()方法的调用。如果适用,提交操作还将在所有事务中调用TwoPhaseTransaction.prepare()方法。

When the transactional state is completed after a rollback or commit, the Session releases all Transaction and Connection resources, and goes back to the “begin” state, which will again invoke new Connection and Transaction objects as new requests to emit SQL statements are received.

下面的例子说明了这个生命周期:

engine = create_engine("...")
Session = sessionmaker(bind=engine)

# new session.   no connections are in use.
session = Session()
try:
    # first query.  a Connection is acquired
    # from the Engine, and a Transaction
    # started.
    item1 = session.query(Item).get(1)

    # second query.  the same Connection/Transaction
    # are used.
    item2 = session.query(Item).get(2)

    # pending changes are created.
    item1.foo = 'bar'
    item2.bar = 'foo'

    # commit.  The pending changes above
    # are flushed via flush(), the Transaction
    # is committed, the Connection object closed
    # and discarded, the underlying DBAPI connection
    # returned to the connection pool.
    session.commit()
except:
    # on rollback, the same closure of state
    # as that of commit proceeds.
    session.rollback()
    raise

使用SAVEPOINT

SAVEPOINT事务(如果由基础引擎支持)可以使用begin_nested()方法描述:

Session = sessionmaker()
session = Session()
session.add(u1)
session.add(u2)

session.begin_nested() # establish a savepoint
session.add(u3)
session.rollback()  # rolls back u3, keeps u1 and u2

session.commit() # commits u1 and u2

begin_nested() may be called any number of times, which will issue a new SAVEPOINT with a unique identifier for each call. 对于每个begin_nested()调用,都必须发出相应的rollback()commit()(但是请注意,如果返回值用作上下文管理器,即在with语句中,则此退回/提交由上下文管理器在退出上下文时发出,因此不应该显式添加。)

当调用begin_nested()时,无条件地发出flush()(不管autoflush设置如何)。这样当一个rollback()发生时,会话的完整状态就会过期,从而导致所有后续的属性/实例访问引用Session正好在begin_nested()被调用之前。

begin_nested(), in the same manner as the less often used begin() method, returns a transactional object which also works as a context manager. 它可以简洁地用于单个记录插入,以便捕获唯一的约束例外:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

自动提交模式

The example of Session transaction lifecycle illustrated at the start of Managing Transactions applies to a Session configured in the default mode of autocommit=False. Constructing a Session with autocommit=True produces a Session placed into “autocommit” mode, where each SQL statement invoked by a Session.query() or Session.execute() occurs using a new connection from the connection pool, discarding it after results have been iterated. Session.flush()操作仍然发生在单个事务的范围内,尽管在Session.flush()操作完成之后该事务被关闭。

警告

“autocommit” mode should not be considered for general use. 如果使用的话,它应该总是与Session.begin()Session.commit()的用法相结合,以确保事务的分界。

在划定的事务之外执行查询是传统的使用模式,并且在某些情况下会导致并发连接检出。

在没有划分的事务的情况下,Session无法就自动屏蔽何时发生以及何时应该发生自动过期做出适当的决定,因此这些功能应该被禁用autoflush =假, expire_on_commit = False

“自动提交”的现代用法是为了在“开始”状态发生时需要特别控制的框架集成。可以使用Session.begin()方法将autocommit=True配置的会话置于“begin”状态。After the cycle completes upon Session.commit() or Session.rollback(), connection and transaction resources are released and the Session goes back into “autocommit” mode, until Session.begin() is called again:

Session = sessionmaker(bind=engine, autocommit=True)
session = Session()
session.begin()
try:
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'
    session.commit()
except:
    session.rollback()
    raise

Session.begin()方法还返回一个事务标记,它与Python with语句兼容:

Session = sessionmaker(bind=engine, autocommit=True)
session = Session()
with session.begin():
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'

与自动提交一起使用Subtransactions

一个子事务指示Session.begin()方法与subtransactions=True标志的结合使用。这产生了一个非事务性的分隔结构,允许将调用嵌套到begin()commit()其目的是允许构建可以在事务内运行的代码,而不管任何启动事务的外部代码,以及在已经划定事务的块内。

subtransactions=True is generally only useful in conjunction with autocommit, and is equivalent to the pattern described at Nesting of Transaction Blocks, where any number of functions can call Connection.begin() and Transaction.commit() as though they are the initiator of the transaction, but in fact may be participating in an already ongoing transaction:

# method_a starts a transaction and calls method_b
def method_a(session):
    session.begin(subtransactions=True)
    try:
        method_b(session)
        session.commit()  # transaction is committed here
    except:
        session.rollback() # rolls back the transaction
        raise

# method_b also starts a transaction, but when
# called from method_a participates in the ongoing
# transaction.
def method_b(session):
    session.begin(subtransactions=True)
    try:
        session.add(SomeObject('bat', 'lala'))
        session.commit()  # transaction is not committed yet
    except:
        session.rollback() # rolls back the transaction, in this case
                           # the one that was initiated in method_a().
        raise

# create a Session and call method_a
session = Session(autocommit=True)
method_a(session)
session.close()

子进程使用Session.flush()进程来确保刷新操作发生在事务中,而不管自动提交。当自动提交被禁用时,它仍然有用,因为它强制Session进入“挂起回滚”状态,因为失败的刷新不能在中间操作中恢复,最终用户仍然维持“整个交易的范围“。

启用两阶段提交

对于支持两阶段操作的后端(当前MySQL和PostgreSQL),会话可以被指示使用两阶段提交语义。这将协调跨数据库的事务提交,以便在所有数据库中提交或回滚事务。您还可以prepare()与SQLAlchemy管理的事务进行交互的会话。要使用两阶段事务,请在会话中设置标志twophase=True

engine1 = create_engine('postgresql://db1')
engine2 = create_engine('postgresql://db2')

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User:engine1, Account:engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

设置事务隔离级别

Isolation refers to the behavior of the transaction at the database level in relation to other transactions occurring concurrently. 有四种众所周知的隔离模式,通常Python DBAPI允许通过明确的API或通过特定于数据库的调用来基于每个连接来设置这些模式。

SQLAlchemy’s dialects support settable isolation modes on a per-Engine or per-Connection basis, using flags at both the create_engine() level as well as at the Connection.execution_options() level.

当使用ORM Session时,它作为引擎和连接的外观,但不直接暴露事务隔离。所以为了影响事务隔离级别,我们需要根据需要对EngineConnection进行操作。

设置隔离引擎范围

要全局设置具有特定隔离级别的Sessionsessionmaker,请使用create_engine.isolation_level参数:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql://scott:tiger@localhost/test",
    isolation_level='REPEATABLE_READ')

maker = sessionmaker(bind=eng)

session = maker()

设置单个会话的隔离

当我们创建新的Session时,无论是直接使用构造函数,还是调用sessionmaker生成的可调用对象,都可以通过bind直接的论点,覆盖已有的绑定。我们可以将它与Engine.execution_options()方法结合使用,以生成原始Engine的副本,以添加此选项:

session = maker(
    bind=engine.execution_options(isolation_level='SERIALIZABLE'))

对于Sessionsessionmaker配置了多个“绑定”的情况,我们可以重新指定binds参数,或者if我们只想替换特定的绑定,我们可以使用Session.bind_mapper()Session.bind_table()方法:

session = maker()
session.bind_mapper(
    User, user_engine.execution_options(isolation_level='SERIALIZABLE'))

我们也可以使用下面的个人交易方法。

设置单个事务的隔离

关于隔离级别的一个重要警告是,在事务已经开始的Connection上,无法安全地修改该设置。数据库无法更改正在进行的事务的隔离级别,某些DBAPI和SQLAlchemy方言在此区域中具有不一致的行为。有些可能会隐式地发出一个ROLLBACK,有些可能会隐含地发出一个COMMIT,有些可能会在下一个事务之前忽略这个设置。因此,如果在交易已经开始时设置了这个选项,SQLAlchemy会发出警告。Session对象并不为我们提供一个Connection用于交易尚未开始的交易。So here, we need to pass execution options to the Session at the start of a transaction by passing Session.connection.execution_options provided by the Session.connection() method:

from sqlalchemy.orm import Session

sess = Session(bind=engine)
sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

# work with session

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

上面,我们首先使用构造函数或sessionmaker生成一个Session然后我们通过调用Session.connection()来明确地设置一个事务的开始,它提供了在事务开始之前将被传递给连接的执行选项。If we are working with a Session that has multiple binds or some other custom scheme for Session.get_bind(), we can pass additional arguments to Session.connection() in order to affect how the bind is procured:

sess = my_sesssionmaker()

# set up a transaction for the bind associated with
# the User mapper
sess.connection(
    mapper=User,
    execution_options={'isolation_level': 'SERIALIZABLE'})

# work with session

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

Session.connection.execution_options参数仅在针对事务中特定绑定的第一次调用Session.connection()时才被接受。如果事务已经在目标连接上开始,则会发出警告:

>>> session = Session(eng)
>>> session.execute("select 1")
<sqlalchemy.engine.result.ResultProxy object at 0x1017a6c50>
>>> session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
sqlalchemy/orm_session.py:310: SAWarning: Connection is already established
for the given bind; execution_options ignored

版本0.9.9新增:Session.connection.execution_options参数添加到Session.connection()

使用事件跟踪事务状态

有关会话事务状态更改的可用事件挂钩的概述,请参阅Transaction Events部分。

将会话加入外部事务(例如测试套件)

If a Connection is being used which is already in a transactional state (i.e. has a Transaction established), a Session can be made to participate within that transaction by just binding the Session to that Connection. 通常的基本原理是一个测试套件,允许ORM代码使用Session自由地工作,包括调用Session.commit()的能力,其中整个数据库交互被回滚:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine('postgresql://...')

class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

在上面,我们发布了Session.commit()以及Transaction.rollback()这是利用Connection对象维护subtransactions的能力或嵌套的begin / commit-or-rollback对的例子,其中只有最外层的begin / commit对实际上提交事务,或者如果最外面的块回滚,则所有事情都回滚。

支持回滚测试

除了需要在测试本身范围内实际调用Session.rollback()的测试之外,上述配方适用于任何类型的数据库启用测试。上面的方法可以被扩展,使得Session总是运行在每个事务开始时建立的SAVEPOINT范围内的所有操作,以便测试还可以将“事务”回滚为同时还保留在一个更大的“交易”范围内,这个交易从来没有提交过,使用两个额外的事件:

from sqlalchemy import event


class SomeTest(TestCase):

    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = connection.begin()

        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)

        # start the session in a SAVEPOINT...
        self.session.begin_nested()

        # then each time that SAVEPOINT ends, reopen it
        @event.listens_for(self.session, "after_transaction_end")
        def restart_savepoint(session, transaction):
            if transaction.nested and not transaction._parent.nested:

                # ensure that state is expired the way
                # session.commit() at the top level normally does
                # (optional step)
                session.expire_all()

                session.begin_nested()

    # ... the tearDown() method stays the same