在业务方法中加了@GlobalTransactional过后,开启全局事务,其实就是一个aop,这个已经说过了,现在要说的是在这个aop GlobalTransactionalInterceptor中真正的处理是先开启事务,业务逻辑的真正处理,最后根据是否抛出异常来决定是提交还是回滚,所以这里先分析回滚
io.seata.tm.api.TransactionalTemplate#rollbackTransaction
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
//远程调用TC进行分布式事务的回滚
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
io.seata.server.coordinator.DefaultCoordinator#doGlobalRollback
/**
* 分布式事务的回滚
* @param request the request
* @param response the response
* @param rpcContext the rpc context
* @throws TransactionException
*/
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(core.rollback(request.getXid()));
}
io.seata.server.coordinator.DefaultCore#rollback
public GlobalStatus rollback(String xid) throws TransactionException {
//注册分支事务,先通过全局事xid找到全局事务和分支事务
//1.先通过全局事务XID找到去global_table查询到全局事务对象
//2.通过查询出来的xid对象去查询branch_talbe是否有分支事务,最后将这两个事务对象封装到GlobalSession中
//这里传入的true代表全局事务和分支事务都查询出来,全局事务是一条数据,分支事务可能会多条是一个list
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
//如果没有查询到全局事务对象,则直接返回完成
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
//设置本事务的active状态为false
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
//如果目前的全局事务状态为Begin开始状态,那么修改全局事务的状态为回滚中,修改数据表global_table的status的值为Rollbacking
if (globalSession.getStatus() == GlobalStatus.Begin) {
//修改global_table的status的值为Rollbacking
globalSession.changeStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
if (!shouldRollBack) {
return globalSession.getStatus();
}
//真正的回滚逻辑
doGlobalRollback(globalSession, false);
//返回回滚的逻辑
return globalSession.getStatus();
}
io.seata.server.coordinator.DefaultCore#doGlobalRollback
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
//globalSession.getReverseSortedBranches()得到的是所有分支
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
//如果分支事务是在一阶段失败的,调用removeBranch
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
continue;
}
try {
//branchRollback回滚分支事务,这里是server端,所以这里的分支事务的回滚是调用的远程进行回滚的
//而远程回滚就是使用的undo log日志表来回滚的
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
switch (branchStatus) {
//PhaseTwo_Rollbacked表示远程回滚成功
case PhaseTwo_Rollbacked:
//删除分支事务信息,就是删除branch_table和释放锁
//1.释放锁,删除lock_table中的行锁信息;
//2.删除分支事务,branch_table
globalSession.removeBranch(branchSession);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
continue;
case PhaseTwo_RollbackFailed_Unretryable:
//远程回滚失败,修改全局事务状态为RollbackFailed
SessionHelper.endRollbackFailed(globalSession);
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
if (!retrying) {
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex,
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
if (!retrying) {
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
}
// In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
// transaction registration when rolling back.
// 1. New branch transaction and rollback branch transaction have no data association
// 2. New branch transaction has data association with rollback branch transaction
// The second query can solve the first problem, and if it is the second problem, it may cause a rollback
// failure due to data changes.
GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
}
if (success) {
//如果分支事务回滚成功,删除全局事务
//1.先修改全局事务global_table的状态为Rollbacked
//2.删除全局事务global_table
SessionHelper.endRollbacked(globalSession);
// rollbacked event 发布事件
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
globalSession.getStatus()));
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
}
return success;
}
简单来说:
1、获取所有的分支事务
2、远程回滚分支事务(远程回滚undo log日志);
3、删除本地分支事务branch_table和释放锁(lock_table);
4、删除全局事务。
客户端的netty处理器是在RMClient初始化的,RmClient是在spring自动装配注入的
com.alibaba.cloud.seata.GlobalTransactionAutoConfiguration#globalTransactionScanner
》io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet
》》io.seata.spring.annotation.GlobalTransactionScanner#initClient
》》》io.seata.rm.RMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
//设置handler处理器,DefaultRMHandler.get()是通过spi机制获取到所有的RM处理器
//spi:io.seata.rm.AbstractRMHandler
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
io.seata.rm.AbstractRMHandler#doBranchRollback
/**
* Do branch rollback.
* 分支事务回滚
* @param request the request
* @param response the response
* @throws TransactionException the transaction exception
*/
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
//这里就是调用本地的逻辑,简单来说就是获取undo日志表中获取前置镜像进行回滚,然后删除undo log日志
//里面的处理逻辑有点复杂,简单来说就是
//1.取出undo log日志;
//2.判断是否可以回滚,要进行对比,对比后置镜像是否和业务数据表的数据是一致的,不一致的不能回滚,证明被其他线程修改了数据;
//3.回滚日志,根据不同操作删除、插入、修改业务数据表的数据;
//4.最后要删除undo log日志。
//如果回滚成功,返回状态PhaseTwo_Rollbacked
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
io.seata.rm.datasource.DataSourceManager#branchRollback
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
//分支事务回滚,这里就很简单,就是从undo log日志表里面取出要回滚的记录,也就是前置镜像进行回滚,回滚完成要删除undo log日志
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
事务的提交入口和回滚一样,也是在拦截器GlobalTransactionalInterceptor中的,和回滚的逻辑一样,只是远程调用TC进行提交事务的处理不一样
,但是事务的提交和回滚的逻辑差别太大了,因为提交事务采用了异步处理,首先TM端发起的事务提交,TC端收到过后只是修改了事务的状态为异步提交,然后就返回给TM端了,而TC端开启了一个异步调度任务,每1s循环一次,取出所有的异步提交的事务列表,然后循环开始提交,提交的逻辑也是发送给TM端(客户端),而客户端接受到过后又放入到一个异步队列中,在TM端又开了一个异步调度线程,一直从这个队列中取出TC端发过来的提交事务对象,然后开始进行异步提交,异步提交也是批量提交,每次删除1000条分支事务,逻辑也很简单,就是删除客户端的undo log日志了。
io.seata.tm.api.DefaultGlobalTransaction#commit、
public void commit() throws TransactionException {
//参与者无法进行提交,只有TM端才能进行提交
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
//调用transactionManager.commit(xid);进行提交,提交失败可以重试
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit
/**
* 全局事务的提交
* @param request the request
* @param response the response
* @param rpcContext the rpc context
* @throws TransactionException
*/
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.commit(request.getXid()));
}
io.seata.server.coordinator.DefaultCore#commit
public GlobalStatus commit(String xid) throws TransactionException {
/**
* 根据xid找到全局事务对象和所有的分支事务list
* 全局事务从global_table中查询
* 分支事务从branch_list查询
*/
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
//设置全局事务状态active为false
//释放所有的分支事务锁,简单来说就是删除lock_table的行锁信息
globalSession.closeAndClean();
//如果当前的事务状态为开始,则开始提交事务,这里的提交事务其实不是真正的提交事务,提交事务是需要删除分支事务、全局事务和远程删除
//客户端业务的undo log日志记录的,但是这里的提交事务只是把全局事务global_table中的事务状态修改了2次
//第一次修改为begin(1),第二次修改为AsyncCommitting(8) 异步提交状态(?为什么这里要修改2次)
if (globalSession.getStatus() == GlobalStatus.Begin) {
//如果支持异步提交事务的话(AT模式默认是异步提交),那么异步提交这里只是将全局事务的状态修改为异步提交状态8
//后面有一个线程在真正的异步提交,这个线程是DefaultCoordinator中的一个异步调度线程ScheduledThreadPoolExecutor处理的,是在init方法中初始化的
if (globalSession.canBeCommittedAsync()) {
//修改两次全局事务的状态
globalSession.asyncCommit();
return false;
} else {
//如果不支持异步提交,这里修改为提交状态Committing(2)
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
//如果不是异步提交的事务,向远程提交一个事务提交的请求,远程接受到过后放入到队列异步提交
boolean success = doGlobalCommit(globalSession, false);
if (success && !globalSession.getBranchSessions().isEmpty()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
io.seata.server.session.GlobalSession#asyncCommit
public void asyncCommit() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
//修改全局事务状态为begin(1)
SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
//修改全局事务状态为AsyncCommitting(8)
this.changeStatus(GlobalStatus.AsyncCommitting);
}
init方法是在Server的mian方法中调用的
public void init() {
//开启一些调度任务,这些调度任务循环执行的,处理一些异步任务
retryRollbacking.scheduleAtFixedRate(() -> {
try {
//处理回滚的记录,可以异步进行处理
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(() -> {
try {
//处理重试提交的
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
asyncCommitting.scheduleAtFixedRate(() -> {
try {
//处理异步提交的 1s执行一次
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(() -> {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
handleAsyncCommitting
/**
* Handle async committing.
* 异步提交任务,可能这里为了性能考虑,为了减少一些rcp的同步调用
* 所以这里采用的异步处理
*/
protected void handleAsyncCommitting() {
//获取所有的全局事务信息和分支事务信息(一个全局事务可能绑定多个分支事务)
Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager()
.allSessions();
if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
return;
}
for (GlobalSession asyncCommittingSession : asyncCommittingSessions) {
try {
// Instruction reordering in DefaultCore#asyncCommit may cause this situation
//如果全局事务的状态不是AsyncCommitting不处理
if (GlobalStatus.AsyncCommitting != asyncCommittingSession.getStatus()) {
continue;
}
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//处理事务的异步提交
core.doGlobalCommit(asyncCommittingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
}
}
}
io.seata.server.coordinator.DefaultCore#doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
//循环所有的分支事务的session
for (BranchSession branchSession : globalSession.getSortedBranches()) {
// if not retrying, skip the canBeCommittedAsync branches
//判断是否能够异步提交
if (!retrying && branchSession.canBeCommittedAsync()) {
continue;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
continue;
}
try {
//调用远程进行事务的提交,其实这里的远程就是调用的客户端的netty服务,告诉客户端可以删除业务表undo log日志,完成了事务的提交
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed:
//删除成移除本地分支事务branch_table
globalSession.removeBranch(branchSession);
continue;
case PhaseTwo_CommitFailed_Unretryable:
//远程删除失败则报错,如果不是异步提交,修改状态CommitFailed
if (globalSession.canBeCommittedAsync()) {
LOGGER.error(
"Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
continue;
} else {
// 修改分支事务状态CommitFailed
SessionHelper.endCommitFailed(globalSession);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
continue;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
new String[] {branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
}
if (globalSession.hasBranch()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
}
if (success && globalSession.getBranchSessions().isEmpty()) {
//删除全局事务global_table
SessionHelper.endCommitted(globalSession);
// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
globalSession.getStatus()));
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
服务的getCore(branchSession.getBranchType()).branchCommit就调用到客户了,所以这里就要取看客户端的AbstractRMHandler处理了
io.seata.rm.AbstractRMHandler#doBranchCommit
/**
* Do branch commit.
*接受TC发过来的异步提交请求
* @param request the request
* @param response the response
* @throws TransactionException the transaction exception
*/
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
/**
* 这里其实调用了AsyncWorker的一个队列,将异步提交的任务放入到了一个队列中
* 然后AsyncWorker有一个线程一直监听到这个队列(ASYNC_COMMIT_BUFFER),所以这里放入队列,AsyncWorker中的线程一直取这个队列中的数据
* 取到就开始处理提交,就是删除undo log日志的内容
*/
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
客户端doBranchCommit其实就是将TC端发过来的分支事务提交(其实就是一个简单的删除undo log日志,因为事务完成了要删除这个log,所以seata这里采用的是异步来处理的,免得影响性能)
getResourceManager().branchCommit的调用线路为:
io.seata.rm.DefaultResourceManager#branchCommit
》io.seata.rm.datasource.DataSourceManager#branchCommit
》》io.seata.rm.datasource.AsyncWorker#branchCommit
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
//将异步提交的对象放入到队列ASYNC_COMMIT_BUFFER中,init中的有一个调度线程一直在对ASYNC_COMMIT_BUFFER取提交的数据,当取到数据过后
//进行任务的执行(undo log日志的删除)
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
}
return BranchStatus.PhaseTwo_Committed;
}
客户端的异步处理和服务端的处理模式很像,就是用了一个异步调度线程去处理,上面的同步已经看了,TC端发过来的事务提交数据放入到了队列ASYNC_COMMIT_BUFFER中,所以客户端的异步线程的调度任务也是1s执行一次,执行的时候就从ASYNC_COMMIT_BUFFER中获取事务对象,当获取到了就开始处理,当然处理就比较简单了,处理的就是删除undo log日志。
io.seata.rm.datasource.AsyncWorker#init
/**
* Init.
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
//启动一个schedule调度任务处理分支事务的提交
timerExecutor.scheduleAtFixedRate(() -> {
try {
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... {}", e.getMessage());
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
io.seata.rm.datasource.AsyncWorker#doBranchCommits
private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.isEmpty()) {
return;
}
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
List<Phase2Context> contextsGroupedByResourceId;
while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
//从队列中取出要提交的分支事务包装对象
Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
//从分支事务提交的包装对象中取出所有的resourceIds
contextsGroupedByResourceId = CollectionUtils.computeIfAbsent(mappedContexts, commitContext.resourceId, key -> new ArrayList<>());
contextsGroupedByResourceId.add(commitContext);
}
for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
DataSourceProxy dataSourceProxy;
try {
try {
DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
.getResourceManager(BranchType.AT);
dataSourceProxy = resourceManager.get(entry.getKey());
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
}
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
contextsGroupedByResourceId = entry.getValue();
Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
//循环删除undo log日志内容
for (Phase2Context commitContext : contextsGroupedByResourceId) {
xids.add(commitContext.xid);
branchIds.add(commitContext.branchId);
int maxSize = Math.max(xids.size(), branchIds.size());
/**
* 这里是批量删除,批量删除默认的大小是UNDOLOG_DELETE_LIMIT_SIZE=1000
* 每次等branchIds和xids满了1000过后进行删除,否则都只是将xid和bid放入集合中
* 每当集合满了1000就批量删除
*/
if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
try {
//删除undo 日志的内容,通过spi(io.seata.rm.datasource.undo.UndoLogManager)
//找到当前数据库是那种数据,比如mysql,然后调用batchDeleteUndoLog批量删除
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
xids, branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
xids.clear();
branchIds.clear();
}
}
//如果集合为空,表示在循环中已经全部删除了
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
try {
//这里删除xids和branchids中剩余没有删除的undo log日志
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容