• https://cloud.tencent.com/developer/article/1497631

    一、如何整合进Spring中的

    默认大家对Spring都比较了解了,这里只说结果。都知道接口是不能被实例化的,那么接口是如何成为Bean的呢?

    1.1 如何知道哪些是Mybatis的接口呢?

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    @Import(MapperScannerRegistrar.class)
    @Repeatable(MapperScans.class)
    public @interface MapperScan 
    {}

    public class MapperScannerRegistrar implements ImportBeanDefinitionRegistrarResourceLoaderAware {}

    MapperScannerRegistrar 会在配置类解析时候拿到MapperScan注解信息,并解析里面的参数。生成一个 MapperScannerConfigurer 信息。 从源码中能看到Mybatis的很多配置信息,都会被注入到MapperScannerConfigurer中。

    public class MapperScannerConfigurer
        implements BeanDefinitionRegistryPostProcessorInitializingBeanApplicationContextAwareBeanNameAware 
    {}

    实现自BeanDefinitionRegistryPostProcessor会前置,拿到MapperScan中的basePackage,最终通过ClassPathMapperScanner扫描并添加到 BeanDefinitionRegistry中。

    到这里这种方式就能知道哪些是Mybatis中的Mapper接口了。

    还有第二种方式当发现Spring容器中没有MapperScannerConfigurer。会自动注入一个

    会直接指定哪些类被Mapper修饰,就将他生成Bean。 第09篇:Spring声明式事务的实现方式-LMLPHP

    好了,到这里就知道如何来确定那些接口是要生成Mybatis接口的了。下面看下个问题。

    1.2 Mapper接口是如何变成Spring Bean的?

    接口是不能被实例化的,但是在Spring中如何想让接口实例化就可以使用 FactoryBean + 动态代理的方式,实现接口类的实例化。

    二、Spring在哪里声明的SqlSession的实现逻辑?

    通过Mybatis的学习知道SqlSession一共有2个包装类。SqlSessionManager和SqlSessionTemplate。那么SqlSession是在哪里指定用哪个的呢? 答案就在 MapperFactoryBean

    public class MapperFactoryBean<Textends SqlSessionDaoSupport implements FactoryBean<T{
      private SqlSessionTemplate sqlSessionTemplate;
       
         public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) {
           if (this.sqlSessionTemplate == null || sqlSessionFactory != this.sqlSessionTemplate.getSqlSessionFactory()) {
             this.sqlSessionTemplate = createSqlSessionTemplate(sqlSessionFactory);
           }
         }
       
         @SuppressWarnings("WeakerAccess")
         protected SqlSessionTemplate createSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
           return new SqlSessionTemplate(sqlSessionFactory);
         }
    }

    三、Spring中声明式事务的实现方式是怎样的

    看了Mybatis中事务这一章节,知道如果使用了SqlSessionTemplate,那么事务的权限就外包给了Spring。那么Spring中事务怎么处理的呢? 终于进入正题了。Spring中提供两种事务的能力。

    3.1 声明式事务

    使用 Transactional 修饰方法,其主要实现是使用切面实现。

    这里我们先来思考下,通过前面的学习知道事务的最底层实现是jdbc驱动来实现的。

    那么切面中要想实现,就必须保证切面中的线程执行的数据库操作,一定是同一个SqlSession这样才能在方法正常执行时候做commit,异常时候做rollback操作。

    那我们看下他是如何保证切面中的数据库操作一定是同一个SqlSession的吧。这部分逻辑就在 SqlSessionTemplate 中。

    3.2 编程式事务

    编程是事务需要实现者自己来管理事务的,Spring提供的扩展接口类是 CallbackPreferringPlatformTransactionManager。如果发现容器中默认的事务管理类是这个 就直接调动全局的这个事务管理方法。如果不是就自己来处理。这种设计的好处是,事务管理器既可以做关系型数据库的事务管理,也可以满足一些特定场景的事务控制(eg: 给Kafka的逻辑做一个事务管理)。

    四、Spring中如何处理嵌套事务的?

    什么是嵌套事务,举一个伪代码的例子。下面 saveUser 代码中有2个Mapper。但是有几个SqlSession呢?

    UserMapper userMapper;

    RegistroyMapper registoryMapper;

    @Transactional(rollbackFor = {Throwable.classRuntimeException.classExecutionException.class})
    public void save(User user)
    {
       userMapper.save(user);
    }

    @Transactional(rollbackFor = {Throwable.classRuntimeException.classExecutionException.class})
    public void saveUser(String userName,Strign password)
    {
       User user = registoryMapper.regis(userName,password);
       save(user);
    }

    通过上面的学习我们了解到如果是Spring来管理的事务是一个线程对应一个SqlSession。所以说上面伪代码中的两个Mapper 其实是用的同一个SqlSession,这样才能保证是在同一个事务中。核心代码逻辑就在这里 SqlSessionUtils#getSqlSession。 从Spring中的事务管理器中获取 SqlSession。是否使用同一个事务,外包给Spring容器去托管。这就给Spring提供了很多可以发挥的空间。 比如说传播机制等。

    public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
          PersistenceExceptionTranslator exceptionTranslator)
     
    {

        notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
        notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);

        SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);

        SqlSession session = sessionHolder(executorType, holder);
        if (session != null) {
          return session;
        }

        LOGGER.debug(() -> "Creating a new SqlSession");
        session = sessionFactory.openSession(executorType);

        registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

        return session;
      }

    五、Spring中事务的传播方式是如何实现的?

    思考传播机制如何实现

    首先我们先思考下传播机制是如何实现的,因为我们知道 要保证是同一个事务,那么一定是同一个SqlSession,这样才能保证是同一个事务。 而如果要新开事务,就要先将当前线程绑定的SqlSession等事务信息,给挂起,那么是如何进行挂起的呢? SqlSession又是如何跟线程绑定的呢?

    5.1 SqlSession是如何跟线程绑定的呢?

    通过TransactionSynchronizationManager中的ThreadLocal跟线程绑定(new NamedThreadLocal<>("Transactional resources"))。注意: 如果主线程下创建子线程是不能绑定上的。

    private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
          PersistenceExceptionTranslator exceptionTranslator, SqlSession session)
     
    {
            SqlSessionHolder holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
            TransactionSynchronizationManager.bindResource(sessionFactory, holder);
            TransactionSynchronizationManager
                .registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
            holder.setSynchronizedWithTransaction(true);
            holder.requested();
      }

    5.2 事务是如何嵌套的?

    答案就在 TransactionAspectSupport#TransactionInfo 中。一个事务注解对应一个TransactionInfo,如果出现嵌套 就会生成一个事务链。如下图一样。

    当里层的事务处理完成后会执行清理动作,同时在将第一个的事务在进行恢复跟线程绑定。

            private void restoreThreadLocalStatus() {
       // Use stack to restore old transaction TransactionInfo.
       // Will be null if none was set.
       transactionInfoHolder.set(this.oldTransactionInfo);
      }

    5.3 事务是如何挂起的?

    前面知道每一个 @Transaction 注解会对应一个 TransactionAspectSupport#TransactionInfo。而事务挂起后,会先跟线程进行解绑。 然后挂起的事务 SuspendedResourcesHolder 会被添加在 TransactionStatus 中。

    挂起的数据保存在哪里

    protected final class TransactionInfo {
            // 事务管理器
      @Nullable
      private final PlatformTransactionManager transactionManager;
            // 事务信息
      @Nullable
      private final TransactionAttribute transactionAttribute;
            // 切面点
      private final String joinpointIdentification;
      // DefaultTransactionStatus
      @Nullable
      private TransactionStatus transactionStatus; 
      @Nullable
      private TransactionInfo oldTransactionInfo;
    }

    public class DefaultTransactionStatus extends AbstractTransactionStatus {
     @Nullable
     private final Object transaction;
     private final boolean newTransaction;
     private final boolean newSynchronization;
     private final boolean readOnly;
     private final boolean debug;
     @Nullable
     private final Object suspendedResources;
    }   

    如何进行挂起的

    TransactionSynchronization 事务同步器,为了解决事务的传播方式

    SqlSessionSynchronization 也是跟当前线程绑定的

     // 挂起时候,将SqlSessionHolder与当前线程进行解绑
     @Override
     public void suspend() {
       if (this.holderActive) {
         LOGGER.debug(() -> "Transaction synchronization suspending SqlSession [" + this.holder.getSqlSession() + "]");
         TransactionSynchronizationManager.unbindResource(this.sessionFactory);
       }
     }

     /**
      * 恢复时候重新跟当前线程绑定
      */

     @Override
     public void resume() {
       if (this.holderActive) {
         LOGGER.debug(() -> "Transaction synchronization resuming SqlSession [" + this.holder.getSqlSession() + "]");
         TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
       }
     }

    5.4 传播方式具体实现

    下面这段代码就是事务注解的切面处理类,Spring事务的所有逻辑和扩展支持都在这里。

    首先我们先看整体的逻辑

    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
       final InvocationCallback invocation)
     throws Throwable 
    {

      // If the transaction attribute is null, the method is non-transactional.
      TransactionAttributeSource tas = getTransactionAttributeSource();
      // 获取被事务注解标记的事务信息
      final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
      // 根据事务注解上指定的事务管理器名称,去系统中获取,如果没有就拿系统中默认的事务管理器
      final PlatformTransactionManager tm = determineTransactionManager(txAttr);
      // 切面拦截点: com.alibaba.purchase.domain.replenish.impl.ReplenishDomainWriteServiceImpl.mockSave
      final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
            // 这里只看关系型数据的的事务逻辑。CallbackPreferringPlatformTransactionManager是具有回调性质的事务管理器,多用于处理自定的事务
      if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
       // Standard transaction demarcation with getTransaction and commit/rollback calls.
       // 获取事务的信息,包含传播方式
       TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
       Object retVal = null;
       try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        retVal = invocation.proceedWithInvocation();
       }
       catch (Throwable ex) {
        // target invocation exception
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
       }
       finally {
        cleanupTransactionInfo(txInfo);
       }
       commitTransactionAfterReturning(txInfo);
       return retVal;
      }

    这里只看传播机制吧。AbstractPlatformTransactionManager#handleExistingTransaction

         /**
      * Create a TransactionStatus for an existing transaction.
      */

     private TransactionStatus handleExistingTransaction(
       TransactionDefinition definition, Object transaction, boolean debugEnabled)

       throws TransactionException 
    {
            // TransactionDefinition.PROPAGATION_NEVER(总是非事务地执行,如果存在一个活动事务,则抛出异常)就直接阻断报错
      if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
       throw new IllegalTransactionStateException(
         "Existing transaction found for transaction marked with propagation 'never'");
      }
            // TransactionDefinition.PROPAGATION_NOT_SUPPORTED 总是非事务地执行,并挂起任何存在的事务
      if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
       if (debugEnabled) {
        logger.debug("Suspending current transaction");
       }
       Object suspendedResources = suspend(transaction);
       boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
       // 数据暂存在TransactionSynchronizationManager#synchronizations同步器中
       return prepareTransactionStatus(
         definition, nullfalse, newSynchronization, debugEnabled, suspendedResources);
      }
            // 总是开启一个新的事务。如果一个事务已经存在,则将这个存在的事务挂起。
      if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
       if (debugEnabled) {
        logger.debug("Suspending current transaction, creating new transaction with name [" +
          definition.getName() + "]");
       }
       SuspendedResourcesHolder suspendedResources = suspend(transaction);
       try {
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
          definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
       }
       catch (RuntimeException | Error beginEx) {
        resumeAfterBeginException(transaction, suspendedResources, beginEx);
        throw beginEx;
       }
      }
            // 如果有事务存在,则运行在一个嵌套的事务中. 如果没有活动事务则按TransactionDefinition.PROPAGATION_REQUIRED 属性执行
      if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
       if (!isNestedTransactionAllowed()) {
        throw new NestedTransactionNotSupportedException(
          "Transaction manager does not allow nested transactions by default - " +
          "specify 'nestedTransactionAllowed' property with value 'true'");
       }
       if (debugEnabled) {
        logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
       }
       if (useSavepointForNestedTransaction()) {
        // Create savepoint within existing Spring-managed transaction,
        // through the SavepointManager API implemented by TransactionStatus.
        // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
        DefaultTransactionStatus status =
          prepareTransactionStatus(definition, transaction, falsefalse, debugEnabled, null);
        // 使用当前事务,并增加当前事务的一次引用。  
        status.createAndHoldSavepoint();
        return status;
       }
       else {
        // Nested transaction through nested begin and commit/rollback calls.
        // Usually only for JTA: Spring synchronization might get activated here
        // in case of a pre-existing JTA transaction.
        // 没有新建一个事务
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
          definition, transaction, true, newSynchronization, debugEnabled, null);
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
       }
      }

      // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
      if (debugEnabled) {
       logger.debug("Participating in existing transaction");
      }
      if (isValidateExistingTransaction()) {
       if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
        Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
        if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
         Constants isoConstants = DefaultTransactionDefinition.constants;
         throw new IllegalTransactionStateException("Participating transaction with definition [" +
           definition + "] specifies isolation level which is incompatible with existing transaction: " +
           (currentIsolationLevel != null ?
             isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
             "(unknown)"));
        }
       }
       if (!definition.isReadOnly()) {
        if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
         throw new IllegalTransactionStateException("Participating transaction with definition [" +
           definition + "] is not marked as read-only but existing transaction is");
        }
       }
      }
      // 
      boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
      return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
     }

    5.5 嵌套事务如何知道是否要提交

    当两个Mapper中使用的是同一个SqlSession,那么会不会第二个事务在执行后,就直接commit了呢,此时第一个事务有一次commit。导致异常呢?

    解决方案在这里 DefaultTransactionStatus

    第二个事务状态中

    而下面代码中会做校验,只需要同步时候才会提交事务。

    protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
      if (status.isNewSynchronization()) {
       if (status.isDebug()) {
        logger.trace("Triggering beforeCommit synchronization");
       }
       TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
      }
    }

    第一个事务状态中

    5.6 这样设计是否线程安全

    线程安全只有在多线程环境下才会出现。那么这里一定会有多线程问题。而事务是跟线程进行绑定的,所以这里虽然有多线程但是不会有线程安全问题。

    但是这里我们看源码线程绑定时候使用的ThreadLocal,所以你在线程中创建子线程或者是线程中使用线程池,这里的事务都不会共享的。

    本文由 mdnice 多平台发布

    07-14 11:38