TCC,基于业务层面的事物定义,粒度完全由业务自己控制,本质上还是补偿的思路,它把事物运行过程分为try-confirm-cancel阶段,每个阶段逻辑由业务代码控制

业务活动管理器控制业务活动的一致性,它登记业务活动中的操作,并在业务活动提交时确认所有的TCC型操作的confirm操作,在业务活动取消时调用所有TCC型操作的cancel操作

与2PC的区别,没有单独的准备阶段,try操作兼备资源操作与准备能力,try操作可以灵活选择业务资源锁定的粒度;

关于柔性事物,看下支付宝这个介绍:https://www.zhihu.com/question/31813039

支付宝所说的柔性事务分为:两阶段型、补偿型、异步确保型、最大努力通知型几种。

两阶段 - XA/JTA/JTS;

补偿型 - TCC, 在一个长事物中,一个由两台服务器一起参与的事物,服务器A发起事物,B参与事物,但B处理时间很长,如果按照ACDI的原则,要保持事物的隔离性一致性,服务器A中发起的事物中使用到的事物资源将会被锁定,不允许其他应用访问到事物过程中的中间结果,直到整个事物被提交或者回滚,就会导致事物A中的资源被长期锁定,系统的可用性将不可接受;对于这种情况,所以诞生了补偿型事物,服务器A的事物如果执行顺利,则事物A先行提交,如果B也执行顺利,则B也提交,整个事物完成,如果B失败,则B本身回滚,这时A已经被提交,所以需要执行一个补偿操作,将A已经提交的事物执行一个反操作,恢复到未执行前事物A的状态,这样牺牲了一定的隔离性和一致性,但提高了整体事物的可用性

异步确保型 - 将一些同步阻塞的事物操作变为异步操作,避免对数据库事物的争用

最大努力型 - 交易的消息通知;

1. 主动方在业务处理的同一个本地事务中,记录消息数据,提交后发送消息到被动方,成功后删除消息,消息补偿系统定期找到未成功发送的消息,补偿发送

2. 业务处理服务在业务事务提交前,向实时消息服务请求发送消息,实时消息服务只记录消息数据而不真正发送(未提交前,已存待发送),业务处理服务在业务事务提交后,向实时消息服务确认发送(提交后确认发送,回滚取消发送),消息状态确认系统定期找到未确认发送或回滚的消息,反向询问业务系统消息状态,业务系统根据消息ID或消息内容确认该消息是否有效

文章下评论中截取

柔性就是不依靠数据库本身的事物,通常是根据业务特性,在分库分表,业务单元化部署或跨不同业务场景下,通过业务层2PC时候校正,消息队列等方式,达到服务之间数据一致性的方式。利用业务上对于事物过程中不一致的容忍度,找到让事物最终一致的方法,寻求一种技术能力和业务诉求平衡的方法;

比如AB转账;A减少和B增加,刚性事物是要求这两个动作同时发生;柔性事物是先A减少,再B增加,分布式环境中难以保证同事增加和减少,但是只要保证A减少后,B能在可接受的范围内,最终加上,可就能最终一致;

事物拦截器 -> 事物管理器 -> 事物存储器 事物恢复job

From博文:

TCC-Trasaction有两个拦截器对@Compensable AOP切面(参与者try方法)进行拦截,透明化对参与者confirm/cancel方法调用,

TCC细读 - 2 核心实现-LMLPHP

可补偿事物拦截器,在try阶段,对事物的发起传播,在confirm/cancel阶段,对事物的提交或回滚,在远程调用服务的参与者时,会通过序列化方式传递事物给远程参与者

资源协调者拦截器,在try阶段,添加参与者到事物中,当事物上下文不存在时进行创建

如何在定义后拦截并实现事物管理?@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)

首先定义一个注解

package org.mengyun.tcctransaction.api;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method; /**
* Created by changmingxie on 10/25/15.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Compensable { public Propagation propagation() default Propagation.REQUIRED; public String confirmMethod() default ""; public String cancelMethod() default ""; public Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class; public boolean asyncConfirm() default false; public boolean asyncCancel() default false; class NullableTransactionContextEditor implements TransactionContextEditor { @Override
public TransactionContext get(Object target, Method method, Object[] args) {
return null;
} @Override
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) { }
} class DefaultTransactionContextEditor implements TransactionContextEditor {
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
return (TransactionContext) args[position];
}
return null;
} @Override
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) { int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
args[position] = transactionContext;
}
} public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) {
int position = -1;
for (int i = 0; i < parameterTypes.length; i++) {
if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) {
position = i;
break;
}
}
return position;
} public static TransactionContext getTransactionContextFromArgs(Object[] args) {
TransactionContext transactionContext = null;
for (Object arg : args) {
if (arg != null && org.mengyun.tcctransaction.api.TransactionContext.class.isAssignableFrom(arg.getClass())) {
transactionContext = (org.mengyun.tcctransaction.api.TransactionContext) arg;
}
}
return transactionContext;
}
}
}

定义拦截器

package org.mengyun.tcctransaction.interceptor;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut; /**
* Created by changmingxie on 10/30/15.
*/
@Aspect
public abstract class CompensableTransactionAspect { private CompensableTransactionInterceptor compensableTransactionInterceptor; public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
} @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() { } @Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
public abstract int getOrder();
}

具体实现

package org.mengyun.tcctransaction.spring;

import org.aspectj.lang.annotation.Aspect;
import org.mengyun.tcctransaction.TransactionManager;
import org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect;
import org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor;
import org.mengyun.tcctransaction.support.TransactionConfigurator;
import org.springframework.core.Ordered; /**
* Created by changmingxie on 10/30/15.
*/
@Aspect
public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered {
private TransactionConfigurator transactionConfigurator;
public void init() {
TransactionManager transactionManager = transactionConfigurator.getTransactionManager();
CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();
compensableTransactionInterceptor.setTransactionManager(transactionManager);
compensableTransactionInterceptor.setDelayCancelExceptions(transactionConfigurator.getRecoverConfig().getDelayCancelExceptions());
this.setCompensableTransactionInterceptor(compensableTransactionInterceptor);
} @Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
} public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
this.transactionConfigurator = transactionConfigurator;
}
}

定义补偿拦截器,拿到定义了的方法,生成对应的事物操作,事物管理器调用和交互

package org.mengyun.tcctransaction.interceptor;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.mengyun.tcctransaction.NoExistedTransactionException;
import org.mengyun.tcctransaction.SystemException;
import org.mengyun.tcctransaction.Transaction;
import org.mengyun.tcctransaction.TransactionManager;
import org.mengyun.tcctransaction.api.Compensable;
import org.mengyun.tcctransaction.api.Propagation;
import org.mengyun.tcctransaction.api.TransactionContext;
import org.mengyun.tcctransaction.api.TransactionStatus;
import org.mengyun.tcctransaction.common.MethodType;
import org.mengyun.tcctransaction.support.FactoryBuilder;
import org.mengyun.tcctransaction.utils.CompensableMethodUtils;
import org.mengyun.tcctransaction.utils.ReflectionUtils;
import org.mengyun.tcctransaction.utils.TransactionUtils; import java.lang.reflect.Method;
import java.util.Set; /**
* Created by changmingxie on 10/30/15.
*/
public class CompensableTransactionInterceptor { static final Logger logger = Logger.getLogger(CompensableTransactionInterceptor.class.getSimpleName()); private TransactionManager transactionManager; private Set<Class<? extends Exception>> delayCancelExceptions; public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
} public void setDelayCancelExceptions(Set<Class<? extends Exception>> delayCancelExceptions) {
this.delayCancelExceptions = delayCancelExceptions;
} public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
boolean asyncConfirm = compensable.asyncConfirm();
boolean asyncCancel = compensable.asyncCancel();
boolean isTransactionActive = transactionManager.isTransactionActive();
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
switch (methodType) {
case ROOT:
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
return pjp.proceed();
}
} private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Object returnValue = null;
Transaction transaction = null;
try {
transaction = transactionManager.begin();
try {
returnValue = pjp.proceed();
} catch (Throwable tryingException) {
if (!isDelayCancelException(tryingException)) {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
transactionManager.commit(asyncConfirm);
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
} private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Transaction transaction = null;
try {
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
case CONFIRMING:
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING: try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
return ReflectionUtils.getNullValue(method.getReturnType());
} private boolean isDelayCancelException(Throwable throwable) {
if (delayCancelExceptions != null) {
for (Class delayCancelException : delayCancelExceptions) {
Throwable rootCause = ExceptionUtils.getRootCause(throwable);
if (delayCancelException.isAssignableFrom(throwable.getClass())
|| (rootCause != null && delayCancelException.isAssignableFrom(rootCause.getClass()))) {
return true;
}
}
}
return false;
}
}

资源拦截器

package org.mengyun.tcctransaction.interceptor;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut; /**
* Created by changmingxie on 11/8/15.
*/
@Aspect
public abstract class ResourceCoordinatorAspect {
private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() { } @Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
} public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
} public abstract int getOrder();
}
package org.mengyun.tcctransaction.spring;

import org.aspectj.lang.annotation.Aspect;
import org.mengyun.tcctransaction.interceptor.ResourceCoordinatorAspect;
import org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor;
import org.mengyun.tcctransaction.support.TransactionConfigurator;
import org.springframework.core.Ordered; /**
* Created by changmingxie on 11/8/15.
*/
@Aspect
public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {
private TransactionConfigurator transactionConfigurator;
public void init() {
ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();
resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager());
this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor);
} @Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1;
} public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
this.transactionConfigurator = transactionConfigurator;
}
}
package org.mengyun.tcctransaction.interceptor;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.mengyun.tcctransaction.InvocationContext;
import org.mengyun.tcctransaction.Participant;
import org.mengyun.tcctransaction.Transaction;
import org.mengyun.tcctransaction.TransactionManager;
import org.mengyun.tcctransaction.api.Compensable;
import org.mengyun.tcctransaction.api.TransactionContext;
import org.mengyun.tcctransaction.api.TransactionStatus;
import org.mengyun.tcctransaction.api.TransactionXid;
import org.mengyun.tcctransaction.support.FactoryBuilder;
import org.mengyun.tcctransaction.utils.CompensableMethodUtils;
import org.mengyun.tcctransaction.utils.ReflectionUtils; import java.lang.reflect.Method; /**
* Created by changmingxie on 11/8/15.
*/
public class ResourceCoordinatorInterceptor {
private TransactionManager transactionManager;
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
} public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
switch (transaction.getStatus()) {
case TRYING:
enlistParticipant(pjp);
break;
case CONFIRMING:
break;
case CANCELLING:
break;
}
}
return pjp.proceed(pjp.getArgs());
} private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
if (method == null) {
throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
}
Compensable compensable = method.getAnnotation(Compensable.class); String confirmMethodName = compensable.confirmMethod();
String cancelMethodName = compensable.cancelMethod(); Transaction transaction = transactionManager.getCurrentTransaction();
TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
} Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes()); InvocationContext confirmInvocation = new InvocationContext(targetClass,
confirmMethodName,
method.getParameterTypes(), pjp.getArgs()); InvocationContext cancelInvocation = new InvocationContext(targetClass,
cancelMethodName,
method.getParameterTypes(), pjp.getArgs()); Participant participant =
new Participant(
xid,
confirmInvocation,
cancelInvocation,
compensable.transactionContextEditor());
transactionManager.enlistParticipant(participant);
}
}

一个事物对象有多个参与者

TCC细读 - 2 核心实现-LMLPHP

事物包含了多个参与者,操作包含在了参与者内部

package org.mengyun.tcctransaction;

import org.mengyun.tcctransaction.api.TransactionContext;
import org.mengyun.tcctransaction.api.TransactionStatus;
import org.mengyun.tcctransaction.api.TransactionXid;
import org.mengyun.tcctransaction.common.TransactionType; import javax.transaction.xa.Xid;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; /**
* Created by changmingxie on 10/26/15.
*/
public class Transaction implements Serializable { private static final long serialVersionUID = 7291423944314337931L; private TransactionXid xid; private TransactionStatus status; private TransactionType transactionType; private volatile int retriedCount = 0; private Date createTime = new Date(); private Date lastUpdateTime = new Date(); private long version = 1; private List<Participant> participants = new ArrayList<Participant>(); private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>(); public Transaction() { } public Transaction(TransactionContext transactionContext) {
this.xid = transactionContext.getXid();
this.status = TransactionStatus.TRYING;
this.transactionType = TransactionType.BRANCH;
} public Transaction(TransactionType transactionType) {
this.xid = new TransactionXid();
this.status = TransactionStatus.TRYING;
this.transactionType = transactionType;
} public void enlistParticipant(Participant participant) {
participants.add(participant);
}
public Xid getXid() {
return xid.clone();
}public void commit() { for (Participant participant : participants) {
participant.commit();
}
} public void rollback() {
for (Participant participant : participants) {
participant.rollback();
}
}
.....
}

每个参与者对象,包含了confirm/cancel的执行上下文以及terminator执行器

package org.mengyun.tcctransaction;

import org.mengyun.tcctransaction.api.TransactionContext;
import org.mengyun.tcctransaction.api.TransactionContextEditor;
import org.mengyun.tcctransaction.api.TransactionStatus;
import org.mengyun.tcctransaction.api.TransactionXid; import java.io.Serializable; /**
* Created by changmingxie on 10/27/15.
*/
public class Participant implements Serializable { private static final long serialVersionUID = 4127729421281425247L; private TransactionXid xid; private InvocationContext confirmInvocationContext; private InvocationContext cancelInvocationContext; private Terminator terminator = new Terminator(); Class<? extends TransactionContextEditor> transactionContextEditorClass; public Participant() { } public Participant(TransactionXid xid, InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
this.xid = xid;
this.confirmInvocationContext = confirmInvocationContext;
this.cancelInvocationContext = cancelInvocationContext;
this.transactionContextEditorClass = transactionContextEditorClass;
} public Participant(InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
this.confirmInvocationContext = confirmInvocationContext;
this.cancelInvocationContext = cancelInvocationContext;
this.transactionContextEditorClass = transactionContextEditorClass;
} public void setXid(TransactionXid xid) {
this.xid = xid;
} public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
} public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}

}

上下文中包含了执行所需的类,方法,参数类型,参数

package org.mengyun.tcctransaction;

import java.io.Serializable;

/**
* Created by changmingxie on 11/9/15.
*/
public class InvocationContext implements Serializable { private static final long serialVersionUID = -7969140711432461165L;
private Class targetClass; private String methodName; private Class[] parameterTypes; private Object[] args; }

执行器,反射调用执行

package org.mengyun.tcctransaction;

import org.mengyun.tcctransaction.api.TransactionContext;
import org.mengyun.tcctransaction.api.TransactionContextEditor;
import org.mengyun.tcctransaction.support.FactoryBuilder;
import org.mengyun.tcctransaction.utils.StringUtils; import java.io.Serializable;
import java.lang.reflect.Method; /**
* Created by changmingxie on 10/30/15.
*/
public class Terminator implements Serializable {
private static final long serialVersionUID = -164958655471605778L;
public Terminator() {
}
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
try {
Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
Method method = null;
method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
return method.invoke(target, invocationContext.getArgs());
} catch (Exception e) {
throw new SystemException(e);
}
}
return null;
}
}

在看事物管理器之前先看下周边的几个实现,

首先生成一个事物,先要生成一个唯一ID,本例中是这么生成的,TransactionID生成 UUID.randomUUID() 再处理

    private static byte[] uuidToByteArray(UUID uuid) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}

其次再看对事物对象的存储,基本就是基于模板方法的实现,接口中包含crud,缓存模板中定义了基本方法对cache的使用,cache也用的基本款,具体针对不同的存储介质有具体的存取实现,比如zk的目录/TCC,redis的TCC:,文本系统是/tcc,jdbc的表等

CacheBuilder.newBuilder().expireAfterAccess(expireDuration, TimeUnit.SECONDS).maximumSize(1000).build();

TCC细读 - 2 核心实现-LMLPHP

TransactionManager是对整个事物的发起,注册以及管理等

package org.mengyun.tcctransaction;

import org.apache.log4j.Logger;
import org.mengyun.tcctransaction.api.TransactionContext;
import org.mengyun.tcctransaction.api.TransactionStatus;
import org.mengyun.tcctransaction.common.TransactionType; import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService; /**
* Created by changmingxie on 10/26/15.
*/
public class TransactionManager { static final Logger logger = Logger.getLogger(TransactionManager.class.getSimpleName()); private TransactionRepository transactionRepository; private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>(); private ExecutorService executorService; public void setTransactionRepository(TransactionRepository transactionRepository) {
this.transactionRepository = transactionRepository;
} public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
} public TransactionManager() {
} public Transaction begin() {
Transaction transaction = new Transaction(TransactionType.ROOT);
transactionRepository.create(transaction);
registerTransaction(transaction);
return transaction;
} public Transaction propagationNewBegin(TransactionContext transactionContext) {
Transaction transaction = new Transaction(transactionContext);
transactionRepository.create(transaction);
registerTransaction(transaction);
return transaction;
} public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
if (transaction != null) {
transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
registerTransaction(transaction);
return transaction;
} else {
throw new NoExistedTransactionException();
}
} public void commit(boolean asyncCommit) {
final Transaction transaction = getCurrentTransaction();
transaction.changeStatus(TransactionStatus.CONFIRMING);
transactionRepository.update(transaction);
if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();
executorService.submit(new Runnable() {
@Override
public void run() {
commitTransaction(transaction);
}
});
logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
} catch (Throwable commitException) {
logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
} else {
commitTransaction(transaction);
}
} public void rollback(boolean asyncRollback) {
final Transaction transaction = getCurrentTransaction();
transaction.changeStatus(TransactionStatus.CANCELLING);
transactionRepository.update(transaction);
if (asyncRollback) {
try {
executorService.submit(new Runnable() {
@Override
public void run() {
rollbackTransaction(transaction);
}
});
} catch (Throwable rollbackException) {
logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
} else {
rollbackTransaction(transaction);
}
} private void commitTransaction(Transaction transaction) {
try {
transaction.commit();
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
} private void rollbackTransaction(Transaction transaction) {
try {
transaction.rollback();
transactionRepository.delete(transaction);
} catch (Throwable rollbackException) {
logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
} public Transaction getCurrentTransaction() {
if (isTransactionActive()) {
return CURRENT.get().peek();
}
return null;
} public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty();
} private void registerTransaction(Transaction transaction) {
if (CURRENT.get() == null) {
CURRENT.set(new LinkedList<Transaction>());
}
CURRENT.get().push(transaction);
} public void cleanAfterCompletion(Transaction transaction) {
if (isTransactionActive() && transaction != null) {
Transaction currentTransaction = getCurrentTransaction();
if (currentTransaction == transaction) {
CURRENT.get().pop();
} else {
throw new SystemException("Illegal transaction when clean after completion");
}
}
} public void enlistParticipant(Participant participant) {
Transaction transaction = this.getCurrentTransaction();
transaction.enlistParticipant(participant);
transactionRepository.update(transaction);
}
}
05-11 04:32