分布式事务
事务简介
- 事务是用来保证一组数据操作的完整性和一致性
- 事务必须满足ACID的四大特性(待补全)
- 事务具有四种隔离级别(待补全)
- 事务具有七种传播行为(待补全)
什么是分布式事务
分布式事务就是将多个节点的事务看成一个整体处理。
分布式事务由事务参与者、资源服务器、事务管理器等组成,常见例子有,支付、下订单等。
实现思路
两段式事务
请求阶段:协调者向参与者询问是否可以进行事务提交操作,然后开始等待参与者的响应。
提交阶段:在该阶段,协调者将基于第一个阶段的投票结果进行决策:提交或取消。当且仅当所有的参与者同意提交,事务协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者回滚事务。
缺点:1)当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态;2)当协调者出错,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作;3)假如在第二阶段中,假如协调者发出commit消息后宕机,接收到这条消息的参与者宕机,此时则无法判断事务状态,无法确定是否已被提交;
三段式事务
事务询问 -> 执行事务预提交 -> 进行事务提交或者事务回滚
降低了参与者的阻塞范围,但引入了新问题:在参与者接收到precommit后,网络出现问题,参与者和协调者无法通行,在这种情况下,参与者依然会执行事务的提交。
基于XA的分布式事务
缺点:1)性能较差;2)很多nosql不支持XA协议;
基于消息的最终一致性方案
缺点:属于强一致性事务,会存在资源浪费
TCC编程式补偿性事务
TCC事务是柔性事务,在try阶段要对资源做预留,在confirm或cancel阶段释放资源,与基于消息事务对比,TCC的时效性更好。
TCC模型是把锁的粒度完全交给业务处理,它分为三个阶段:
- Try阶段主要是对业务系统做检测及资源预留;
- 如果try阶段所有业务资源都预留成功,则执行confirm,否则执行cancel;
- confirm:不做任务业务检查,仅使用预留的资源执行业务操作,失败会重试;
- cancel:取消执行业务操作,释放预留的资源,失败会重试;
举例
以简单的电商系统为例,小明在淘宝上花100元买了一本书,获赠10个积分,产生如下操作:
- 订单系统创建商品订单;
- 支付系统接受小明的支付;
- 库存系统扣减产品库存;
- 会员系统给小明账户增加会员积分;
这几个动作需要作为一个事务执行,要同时成功或者同时撤销。如果采用TCC事务模式,那么各个系统需要改造为如下状态:
1)订单系统
try:创建一个订单,状态显示为“待支付”;
confirm:更新订单的状态为“已完成”;
cancel:更新订单的状态为“已取消”;
2)支付系统
try:假设小明账户中有1000元,冻结小明账户中的100元,此时小明看到的余额依然是1000元;
confirm:将账户余额变为900元,并清除冻结记录;
concel:清除冻结记录;
3)库存系统
try:假设库存中还生10本书,冻结其中的一本书,现实库存依然有10本书;
confirm:将剩余库存更新为9本书,并清除冻结记录;
cancel:清除冻结记录;
4)会员系统
try:假设小明原因积分为3000,给小明账户预增加10积分,账户显示的积分依然是3000分;
confirm:将账户积分更新为3010,并清除预增加记录;
cancel:清除预增加记录;
缺点:TCC 事务模型对业务方侵入较大,需要业务方把功能的实现上由一个接口拆分为三个,开发成本较高。
同时 TCC 事务为了解决异步网络中的通信失败或超时带来的异常情况,要求业务方在设计实现上要遵循三个策略:
允许空回滚:原因是异常发生在阶段 1 时,部分参与方没有收到 try 请求从而触发整个事务的 cancel 操作,try 失败或者没有执行 try 操作的参与方收到 cancel 请求时,要进行空回滚操作;
保持幂等性:原因是异常发生在阶段 2 时,比如网络超时,则会重复调用参与方的 confirm/cancel 方法,因此需要这两个方法实现上保证幂等性;
防止资源悬挂:原因网络异常导致两个阶段无法保证严格的顺序执行,出现参与方侧 try 请求比 cancel 请求更晚到达的情况,cancel 会执行空回滚而确保事务的正确性,但是此时 try 方法也不可以再被执行;
分布式事务框架
TCC-Transaction分析
仓库:https://github.com/changmingxie/tcc-transaction
使用方法
- 在需要提供分布式事务支持的接口方法上添加
@Compensable
;
- 在对应的接口实现方法上也添加 @Compensable,并添加注解参数
confirmMethod
, cancelMethod
和 transactionContextEditor
;
- 实现对应的
confirmMethod
和 cancelMethod
(必须和 try 方法在同一个类中);
注意:
- 在分布式事务框架中,不要轻易在业务层捕获所有异常,只有在抛出异常的情况下,分布式事务框架才知道该业务是执行失败的,继而执行
cancelMethod
;
- 使用 TCC-Transaction 时,confirm 和 cancel 的幂等性问题需要人为代码保证;
- TCC 的数据库应该和业务数据库分开,以保证分布式事务的正常进行;
源码分析
tcc的事务并不是数据库的事务,而是应用层的事务,Transaction如下:
public class Transaction implements Serializable { private static final long serialVersionUID = 7291423944314337931L; private TransactionXid xid; private TransactionStatus status; private TransactionType transactionType; private volatile int retriedCount = 0; private Date createTime = new Date(); private Date lastUpdateTime = new Date(); private long version = 1; private List<Participant> participants = new ArrayList<Participant>(); private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>(); ... }
|
CompensableTransactionAspect是一个AOP切面类,@Pointcut
将 @Compensable
注解标记为切入点,其签名为compensableService()
。@Around
表示在compensableService()
之前和之后调用 interceptCompensableMethod()
。
@Aspect public abstract class CompensableTransactionAspect { private CompensableTransactionInterceptor compensableTransactionInterceptor; public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) { this.compensableTransactionInterceptor = compensableTransactionInterceptor; } @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void compensableService() { } @Around("compensableService()") public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { return compensableTransactionInterceptor.interceptCompensableMethod(pjp); } public abstract int getOrder(); }
|
CompensableTransactionInterceptor是事务拦截器,具有以下作用:
- 将事务区分为ROOT事务和PROVIDER分支事务;
- 不断地修改数据库内的状态(初始化事务、修改事务状态);
- 修改和清除事务管理区中的事务队列;
- 并没有执行目标对象方法,pjp.proceed() 其实是交给了下一个拦截器 ResourceCoordinatorInterceptor;
public class CompensableMethodContext { ProceedingJoinPoint pjp = null; Method method = null; Compensable compensable = null;
Propagation propagation = null; TransactionContext transactionContext = null;
public CompensableMethodContext(ProceedingJoinPoint pjp) { this.pjp = pjp; this.method = getCompensableMethod(); this.compensable = method.getAnnotation(Compensable.class); this.propagation = compensable.propagation(); this.transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()); } ... } public class CompensableTransactionInterceptor { public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp); boolean isTransactionActive = transactionManager.isTransactionActive(); if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) { throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName()); } switch (compensableMethodContext.getMethodRole(isTransactionActive)) { case ROOT: return rootMethodProceed(compensableMethodContext); case PROVIDER: return providerMethodProceed(compensableMethodContext); default: return pjp.proceed(); } }
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Object returnValue = null; Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>(); allDelayCancelExceptions.addAll(this.delayCancelExceptions); allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions())); try {
transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity()); try { returnValue = compensableMethodContext.proceed(); } catch (Throwable tryingException) { if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) { logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException); transactionManager.rollback(asyncCancel); } throw tryingException; } transactionManager.commit(asyncConfirm); } finally { transactionManager.cleanAfterCompletion(transaction); } return returnValue; }
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) { case TRYING: transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext()); return compensableMethodContext.proceed(); case CONFIRMING: try { transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); transactionManager.commit(asyncConfirm); } catch (NoExistedTransactionException excepton) { } break; case CANCELLING: try { transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); transactionManager.rollback(asyncCancel); } catch (NoExistedTransactionException exception) { } break; } } finally { transactionManager.cleanAfterCompletion(transaction); } Method method = compensableMethodContext.getMethod(); return ReflectionUtils.getNullValue(method.getReturnType()); }
private boolean isDelayCancelException(Throwable throwable, Set<Class<? extends Exception>> delayCancelExceptions) { if (delayCancelExceptions != null) { for (Class delayCancelException : delayCancelExceptions) { Throwable rootCause = ExceptionUtils.getRootCause(throwable); if (delayCancelException.isAssignableFrom(throwable.getClass()) || (rootCause != null && delayCancelException.isAssignableFrom(rootCause.getClass()))) { return true; } } } return false; } }
|
TransactionManager
public class TransactionManager { private TransactionRepository transactionRepository; private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>(); private ExecutorService executorService; ...
public Transaction begin(Object uniqueIdentify) { Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT); transactionRepository.create(transaction); registerTransaction(transaction); return transaction; } public Transaction propagationNewBegin(TransactionContext transactionContext) { Transaction transaction = new Transaction(transactionContext); transactionRepository.create(transaction); registerTransaction(transaction); return transaction; }
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException { Transaction transaction = transactionRepository.findByXid(transactionContext.getXid()); if (transaction != null) { transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus())); registerTransaction(transaction); return transaction; } else { throw new NoExistedTransactionException(); } } public void commit(boolean asyncCommit) { final Transaction transaction = getCurrentTransaction(); transaction.changeStatus(TransactionStatus.CONFIRMING); transactionRepository.update(transaction); if (asyncCommit) { try { Long statTime = System.currentTimeMillis(); executorService.submit(new Runnable() { @Override public void run() { commitTransaction(transaction); } }); logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime)); } catch (Throwable commitException) { logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException); throw new ConfirmingException(commitException); } } else { commitTransaction(transaction); } }
private void commitTransaction(Transaction transaction) { try { transaction.commit(); transactionRepository.delete(transaction); } catch (Throwable commitException) { logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException); throw new ConfirmingException(commitException); } }
public void rollback(boolean asyncRollback) { final Transaction transaction = getCurrentTransaction(); transaction.changeStatus(TransactionStatus.CANCELLING); transactionRepository.update(transaction); if (asyncRollback) { try { executorService.submit(new Runnable() { @Override public void run() { rollbackTransaction(transaction); } }); } catch (Throwable rollbackException) { logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException); throw new CancellingException(rollbackException); } } else { rollbackTransaction(transaction); } } private void registerTransaction(Transaction transaction) { if (CURRENT.get() == null) { CURRENT.set(new LinkedList<Transaction>()); } CURRENT.get().push(transaction); }
public void cleanAfterCompletion(Transaction transaction) { if (isTransactionActive() && transaction != null) { Transaction currentTransaction = getCurrentTransaction(); if (currentTransaction == transaction) { CURRENT.get().pop(); if (CURRENT.get().size() == 0) { CURRENT.remove(); } } else { throw new SystemException("Illegal transaction when clean after completion"); } } } ... }
|
ResourceCoordinatorAspect:主要是为了设置事务的参与者
@Aspect public abstract class ResourceCoordinatorAspect { private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor; @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void transactionContextCall() { } @Around("transactionContextCall()") public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp); } public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) { this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor; } public abstract int getOrder(); }
|
ResourceCoordinatorInterceptor:主要处理 try 阶段的事情,在 try 阶段,就将所有的“资源”封装完成并交给事务管理器。然后事务管理器修改数据库状态。
“资源”指“事务资源”,即事务的参与者:confirm上下文,cancel上下文,分支事务信息。
public class ResourceCoordinatorInterceptor { private TransactionManager transactionManager; public void setTransactionManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; }
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { Transaction transaction = transactionManager.getCurrentTransaction(); if (transaction != null) { switch (transaction.getStatus()) { case TRYING: enlistParticipant(pjp); break; case CONFIRMING: break; case CANCELLING: break; } } return pjp.proceed(pjp.getArgs()); }
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException { Method method = CompensableMethodUtils.getCompensableMethod(pjp); if (method == null) { throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName())); } Compensable compensable = method.getAnnotation(Compensable.class); String confirmMethodName = compensable.confirmMethod(); String cancelMethodName = compensable.cancelMethod(); Transaction transaction = transactionManager.getCurrentTransaction(); TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) { FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs()); } Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes()); InvocationContext confirmInvocation = new InvocationContext(targetClass, confirmMethodName, method.getParameterTypes(), pjp.getArgs()); InvocationContext cancelInvocation = new InvocationContext(targetClass, cancelMethodName, method.getParameterTypes(), pjp.getArgs()); Participant participant = new Participant( xid, confirmInvocation, cancelInvocation, compensable.transactionContextEditor()); transactionManager.enlistParticipant(participant); } }
|
此时经过两个拦截器后,才调用到目标对象方法,即对应try逻辑的被切方法。