搜索
您的当前位置:首页seata源码分析(AT)-事务提交和回滚

seata源码分析(AT)-事务提交和回滚

来源:乌哈旅游

一、事务会回滚

在业务方法中加了@GlobalTransactional过后,开启全局事务,其实就是一个aop,这个已经说过了,现在要说的是在这个aop GlobalTransactionalInterceptor中真正的处理是先开启事务,业务逻辑的真正处理,最后根据是否抛出异常来决定是提交还是回滚,所以这里先分析回滚

1、客户端的回滚发起点

io.seata.tm.api.TransactionalTemplate#rollbackTransaction

private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
    triggerBeforeRollback();
    //远程调用TC进行分布式事务的回滚
    tx.rollback();
    triggerAfterRollback();
    // 3.1 Successfully rolled back
    throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}

2、服务端的处理

io.seata.server.coordinator.DefaultCoordinator#doGlobalRollback

/**
 * 分布式事务的回滚
 * @param request    the request
 * @param response   the response
 * @param rpcContext the rpc context
 * @throws TransactionException
 */
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                RpcContext rpcContext) throws TransactionException {
    response.setGlobalStatus(core.rollback(request.getXid()));
}

io.seata.server.coordinator.DefaultCore#rollback

public GlobalStatus rollback(String xid) throws TransactionException {
    //注册分支事务,先通过全局事xid找到全局事务和分支事务
    //1.先通过全局事务XID找到去global_table查询到全局事务对象
    //2.通过查询出来的xid对象去查询branch_talbe是否有分支事务,最后将这两个事务对象封装到GlobalSession中
    //这里传入的true代表全局事务和分支事务都查询出来,全局事务是一条数据,分支事务可能会多条是一个list
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    //如果没有查询到全局事务对象,则直接返回完成
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus
    boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
        //设置本事务的active状态为false
        globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
        //如果目前的全局事务状态为Begin开始状态,那么修改全局事务的状态为回滚中,修改数据表global_table的status的值为Rollbacking
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            //修改global_table的status的值为Rollbacking
            globalSession.changeStatus(GlobalStatus.Rollbacking);
            return true;
        }
        return false;
    });
    if (!shouldRollBack) {
        return globalSession.getStatus();
    }

    //真正的回滚逻辑
    doGlobalRollback(globalSession, false);
    //返回回滚的逻辑
    return globalSession.getStatus();
}

io.seata.server.coordinator.DefaultCore#doGlobalRollback

public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start rollback event
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));

    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
    } else {
        //globalSession.getReverseSortedBranches()得到的是所有分支
        for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
            BranchStatus currentBranchStatus = branchSession.getStatus();
            //如果分支事务是在一阶段失败的,调用removeBranch
            if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                globalSession.removeBranch(branchSession);
                continue;
            }
            try {
                //branchRollback回滚分支事务,这里是server端,所以这里的分支事务的回滚是调用的远程进行回滚的
                //而远程回滚就是使用的undo log日志表来回滚的
                BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                switch (branchStatus) {
                    //PhaseTwo_Rollbacked表示远程回滚成功
                    case PhaseTwo_Rollbacked:
                        //删除分支事务信息,就是删除branch_table和释放锁
                        //1.释放锁,删除lock_table中的行锁信息;
                        //2.删除分支事务,branch_table
                        globalSession.removeBranch(branchSession);
                        LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        continue;
                    case PhaseTwo_RollbackFailed_Unretryable:
                        //远程回滚失败,修改全局事务状态为RollbackFailed
                        SessionHelper.endRollbackFailed(globalSession);
                        LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        return false;
                    default:
                        LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        if (!retrying) {
                            globalSession.queueToRetryRollback();
                        }
                        return false;
                }
            } catch (Exception ex) {
                StackTraceLogger.error(LOGGER, ex,
                    "Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
                    new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
                if (!retrying) {
                    globalSession.queueToRetryRollback();
                }
                throw new TransactionException(ex);
            }
        }

        // In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
        // transaction registration when rolling back.
        // 1. New branch transaction and rollback branch transaction have no data association
        // 2. New branch transaction has data association with rollback branch transaction
        // The second query can solve the first problem, and if it is the second problem, it may cause a rollback
        // failure due to data changes.
        GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
        if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
            LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    if (success) {
        //如果分支事务回滚成功,删除全局事务
        //1.先修改全局事务global_table的状态为Rollbacked
        //2.删除全局事务global_table
        SessionHelper.endRollbacked(globalSession);

        // rollbacked event 发布事件
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
            globalSession.getStatus()));

        LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
    }
    return success;
}

简单来说:
1、获取所有的分支事务
2、远程回滚分支事务(远程回滚undo log日志);
3、删除本地分支事务branch_table和释放锁(lock_table);
4、删除全局事务。

3、客户端的回滚逻辑

客户端的netty处理器是在RMClient初始化的,RmClient是在spring自动装配注入的

com.alibaba.cloud.seata.GlobalTransactionAutoConfiguration#globalTransactionScanner
》io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet
》》io.seata.spring.annotation.GlobalTransactionScanner#initClient
》》》io.seata.rm.RMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
    //设置handler处理器,DefaultRMHandler.get()是通过spi机制获取到所有的RM处理器
    //spi:io.seata.rm.AbstractRMHandler
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
    rmNettyRemotingClient.init();
}

io.seata.rm.AbstractRMHandler#doBranchRollback

/**
 * Do branch rollback.
 * 分支事务回滚
 * @param request  the request
 * @param response the response
 * @throws TransactionException the transaction exception
 */
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
    }
    //这里就是调用本地的逻辑,简单来说就是获取undo日志表中获取前置镜像进行回滚,然后删除undo log日志
    //里面的处理逻辑有点复杂,简单来说就是
    //1.取出undo log日志;
    //2.判断是否可以回滚,要进行对比,对比后置镜像是否和业务数据表的数据是一致的,不一致的不能回滚,证明被其他线程修改了数据;
    //3.回滚日志,根据不同操作删除、插入、修改业务数据表的数据;
    //4.最后要删除undo log日志。
    //如果回滚成功,返回状态PhaseTwo_Rollbacked
    BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
        applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch Rollbacked result: " + status);
    }
}

io.seata.rm.datasource.DataSourceManager#branchRollback

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        //分支事务回滚,这里就很简单,就是从undo log日志表里面取出要回滚的记录,也就是前置镜像进行回滚,回滚完成要删除undo log日志
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        StackTraceLogger.info(LOGGER, te,
            "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
            new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;

}

二、事务的提交

事务的提交入口和回滚一样,也是在拦截器GlobalTransactionalInterceptor中的,和回滚的逻辑一样,只是远程调用TC进行提交事务的处理不一样
,但是事务的提交和回滚的逻辑差别太大了,因为提交事务采用了异步处理,首先TM端发起的事务提交,TC端收到过后只是修改了事务的状态为异步提交,然后就返回给TM端了,而TC端开启了一个异步调度任务,每1s循环一次,取出所有的异步提交的事务列表,然后循环开始提交,提交的逻辑也是发送给TM端(客户端),而客户端接受到过后又放入到一个异步队列中,在TM端又开了一个异步调度线程,一直从这个队列中取出TC端发过来的提交事务对象,然后开始进行异步提交,异步提交也是批量提交,每次删除1000条分支事务,逻辑也很简单,就是删除客户端的undo log日志了。

1、客户端处理

io.seata.tm.api.DefaultGlobalTransaction#commit、

public void commit() throws TransactionException {
    //参与者无法进行提交,只有TM端才能进行提交
    if (role == GlobalTransactionRole.Participant) {
        // Participant has no responsibility of committing
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNotNull();
    //调用transactionManager.commit(xid);进行提交,提交失败可以重试
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend();
        }
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("[{}] commit status: {}", xid, status);
    }
}

2、服务端的处理

2.1、服务端的异步处理点1(修改事务状态为异步提交)

io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit

/**
 * 全局事务的提交
 * @param request    the request
 * @param response   the response
 * @param rpcContext the rpc context
 * @throws TransactionException
 */
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
    throws TransactionException {
    response.setGlobalStatus(core.commit(request.getXid()));
}

io.seata.server.coordinator.DefaultCore#commit

public GlobalStatus commit(String xid) throws TransactionException {
    /**
     * 根据xid找到全局事务对象和所有的分支事务list
     * 全局事务从global_table中查询
     * 分支事务从branch_list查询
     */
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus

    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        // Highlight: Firstly, close the session, then no more branch can be registered.
        //设置全局事务状态active为false
        //释放所有的分支事务锁,简单来说就是删除lock_table的行锁信息
        globalSession.closeAndClean();
        //如果当前的事务状态为开始,则开始提交事务,这里的提交事务其实不是真正的提交事务,提交事务是需要删除分支事务、全局事务和远程删除
        //客户端业务的undo log日志记录的,但是这里的提交事务只是把全局事务global_table中的事务状态修改了2次
        //第一次修改为begin(1),第二次修改为AsyncCommitting(8) 异步提交状态(?为什么这里要修改2次)
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            //如果支持异步提交事务的话(AT模式默认是异步提交),那么异步提交这里只是将全局事务的状态修改为异步提交状态8
            //后面有一个线程在真正的异步提交,这个线程是DefaultCoordinator中的一个异步调度线程ScheduledThreadPoolExecutor处理的,是在init方法中初始化的
            if (globalSession.canBeCommittedAsync()) {
                //修改两次全局事务的状态
                globalSession.asyncCommit();
                return false;
            } else {
                //如果不支持异步提交,这里修改为提交状态Committing(2)
                globalSession.changeStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    if (shouldCommit) {
        //如果不是异步提交的事务,向远程提交一个事务提交的请求,远程接受到过后放入到队列异步提交
        boolean success = doGlobalCommit(globalSession, false);
        if (success && !globalSession.getBranchSessions().isEmpty()) {
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
            return globalSession.getStatus();
        }
    } else {
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}

io.seata.server.session.GlobalSession#asyncCommit

public void asyncCommit() throws TransactionException {
    this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
    //修改全局事务状态为begin(1)
    SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
    //修改全局事务状态为AsyncCommitting(8)
    this.changeStatus(GlobalStatus.AsyncCommitting);
}

2.2、服务端的异步处理点2(开启调度任务获取异步提交事务)

init方法是在Server的mian方法中调用的

public void init() {
    //开启一些调度任务,这些调度任务循环执行的,处理一些异步任务
    retryRollbacking.scheduleAtFixedRate(() -> {
        try {
            //处理回滚的记录,可以异步进行处理
            handleRetryRollbacking();
        } catch (Exception e) {
            LOGGER.info("Exception retry rollbacking ... ", e);
        }
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    retryCommitting.scheduleAtFixedRate(() -> {
        try {
            //处理重试提交的
            handleRetryCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception retry committing ... ", e);
        }
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    asyncCommitting.scheduleAtFixedRate(() -> {
        try {
            //处理异步提交的 1s执行一次
            handleAsyncCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception async committing ... ", e);
        }
    }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    timeoutCheck.scheduleAtFixedRate(() -> {
        try {
            timeoutCheck();
        } catch (Exception e) {
            LOGGER.info("Exception timeout checking ... ", e);
        }
    }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    undoLogDelete.scheduleAtFixedRate(() -> {
        try {
            undoLogDelete();
        } catch (Exception e) {
            LOGGER.info("Exception undoLog deleting ... ", e);
        }
    }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

handleAsyncCommitting

/**
 * Handle async committing.
 * 异步提交任务,可能这里为了性能考虑,为了减少一些rcp的同步调用
 * 所以这里采用的异步处理
 */
protected void handleAsyncCommitting() {
    //获取所有的全局事务信息和分支事务信息(一个全局事务可能绑定多个分支事务)
    Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager()
        .allSessions();
    if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
        return;
    }
    for (GlobalSession asyncCommittingSession : asyncCommittingSessions) {
        try {
            // Instruction reordering in DefaultCore#asyncCommit may cause this situation
            //如果全局事务的状态不是AsyncCommitting不处理
            if (GlobalStatus.AsyncCommitting != asyncCommittingSession.getStatus()) {
                continue;
            }
            asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            //处理事务的异步提交
            core.doGlobalCommit(asyncCommittingSession, true);
        } catch (TransactionException ex) {
            LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
        }
    }
}

io.seata.server.coordinator.DefaultCore#doGlobalCommit

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start committing event
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));

    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
        //循环所有的分支事务的session
        for (BranchSession branchSession : globalSession.getSortedBranches()) {
            // if not retrying, skip the canBeCommittedAsync branches
            //判断是否能够异步提交
            if (!retrying && branchSession.canBeCommittedAsync()) {
                continue;
            }

            BranchStatus currentStatus = branchSession.getStatus();
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
                globalSession.removeBranch(branchSession);
                continue;
            }
            try {
                //调用远程进行事务的提交,其实这里的远程就是调用的客户端的netty服务,告诉客户端可以删除业务表undo log日志,完成了事务的提交
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                switch (branchStatus) {
                    case PhaseTwo_Committed:
                        //删除成移除本地分支事务branch_table
                        globalSession.removeBranch(branchSession);
                        continue;
                    case PhaseTwo_CommitFailed_Unretryable:
                        //远程删除失败则报错,如果不是异步提交,修改状态CommitFailed
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error(
                                "Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
                            continue;
                        } else {
                           // 修改分支事务状态CommitFailed
                            SessionHelper.endCommitFailed(globalSession);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                    default:
                        if (!retrying) {
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
                            LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                branchSession.getBranchId(), branchStatus);
                            continue;
                        } else {
                            LOGGER.error(
                                "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                }
            } catch (Exception ex) {
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                    new String[] {branchSession.toString()});
                if (!retrying) {
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
        }
        if (globalSession.hasBranch()) {
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    if (success && globalSession.getBranchSessions().isEmpty()) {
        //删除全局事务global_table
        SessionHelper.endCommitted(globalSession);

        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
            globalSession.getStatus()));

        LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
    }
    return success;
}

3、客户端的处理

服务的getCore(branchSession.getBranchType()).branchCommit就调用到客户了,所以这里就要取看客户端的AbstractRMHandler处理了

3.1、客户端的同步处理

io.seata.rm.AbstractRMHandler#doBranchCommit

/**
 * Do branch commit.
 *接受TC发过来的异步提交请求
 * @param request  the request
 * @param response the response
 * @throws TransactionException the transaction exception
 */
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
    }
    /**
     * 这里其实调用了AsyncWorker的一个队列,将异步提交的任务放入到了一个队列中
     * 然后AsyncWorker有一个线程一直监听到这个队列(ASYNC_COMMIT_BUFFER),所以这里放入队列,AsyncWorker中的线程一直取这个队列中的数据
     * 取到就开始处理提交,就是删除undo log日志的内容
     */
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
        applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch commit result: " + status);
    }

}

客户端doBranchCommit其实就是将TC端发过来的分支事务提交(其实就是一个简单的删除undo log日志,因为事务完成了要删除这个log,所以seata这里采用的是异步来处理的,免得影响性能)
getResourceManager().branchCommit的调用线路为:

io.seata.rm.DefaultResourceManager#branchCommit
》io.seata.rm.datasource.DataSourceManager#branchCommit
》》io.seata.rm.datasource.AsyncWorker#branchCommit

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    //将异步提交的对象放入到队列ASYNC_COMMIT_BUFFER中,init中的有一个调度线程一直在对ASYNC_COMMIT_BUFFER取提交的数据,当取到数据过后
    //进行任务的执行(undo log日志的删除)
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
    }
    return BranchStatus.PhaseTwo_Committed;
}

3.2、客户端的异步处理

客户端的异步处理和服务端的处理模式很像,就是用了一个异步调度线程去处理,上面的同步已经看了,TC端发过来的事务提交数据放入到了队列ASYNC_COMMIT_BUFFER中,所以客户端的异步线程的调度任务也是1s执行一次,执行的时候就从ASYNC_COMMIT_BUFFER中获取事务对象,当获取到了就开始处理,当然处理就比较简单了,处理的就是删除undo log日志。
io.seata.rm.datasource.AsyncWorker#init

/**
 * Init.
 */
public synchronized void init() {
    LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
    ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
    //启动一个schedule调度任务处理分支事务的提交
    timerExecutor.scheduleAtFixedRate(() -> {
        try {

            doBranchCommits();

        } catch (Throwable e) {
            LOGGER.info("Failed at async committing ... {}", e.getMessage());

        }
    }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}

io.seata.rm.datasource.AsyncWorker#doBranchCommits

private void doBranchCommits() {
    if (ASYNC_COMMIT_BUFFER.isEmpty()) {
        return;
    }

    Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
    List<Phase2Context> contextsGroupedByResourceId;
    while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
        //从队列中取出要提交的分支事务包装对象
        Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
        //从分支事务提交的包装对象中取出所有的resourceIds
        contextsGroupedByResourceId = CollectionUtils.computeIfAbsent(mappedContexts, commitContext.resourceId, key -> new ArrayList<>());
        contextsGroupedByResourceId.add(commitContext);
    }

    for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
        Connection conn = null;
        DataSourceProxy dataSourceProxy;
        try {
            try {
                DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
                    .getResourceManager(BranchType.AT);
                dataSourceProxy = resourceManager.get(entry.getKey());
                if (dataSourceProxy == null) {
                    throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
                }
                conn = dataSourceProxy.getPlainConnection();
            } catch (SQLException sqle) {
                LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                continue;
            }
            contextsGroupedByResourceId = entry.getValue();
            Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
            Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
            //循环删除undo log日志内容
            for (Phase2Context commitContext : contextsGroupedByResourceId) {
                xids.add(commitContext.xid);
                branchIds.add(commitContext.branchId);
                int maxSize = Math.max(xids.size(), branchIds.size());
                /**
                 * 这里是批量删除,批量删除默认的大小是UNDOLOG_DELETE_LIMIT_SIZE=1000
                 * 每次等branchIds和xids满了1000过后进行删除,否则都只是将xid和bid放入集合中
                 * 每当集合满了1000就批量删除
                 */
                if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                    try {
                        //删除undo 日志的内容,通过spi(io.seata.rm.datasource.undo.UndoLogManager)
                        //找到当前数据库是那种数据,比如mysql,然后调用batchDeleteUndoLog批量删除
                        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
                            xids, branchIds, conn);
                    } catch (Exception ex) {
                        LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                    }
                    xids.clear();
                    branchIds.clear();
                }
            }

            //如果集合为空,表示在循环中已经全部删除了
            if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                return;
            }

            try {
                //这里删除xids和branchids中剩余没有删除的undo log日志
                UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
                    branchIds, conn);
            } catch (Exception ex) {
                LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
            }

            if (!conn.getAutoCommit()) {
                conn.commit();
            }
        } catch (Throwable e) {
            LOGGER.error(e.getMessage(), e);
            try {
                if (conn != null) {
                    conn.rollback();
                }
            } catch (SQLException rollbackEx) {
                LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
            }
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                }
            }
        }
    }
}

三、seata工作原理

四、seata源码结构图

因篇幅问题不能全部显示,请点此查看更多更全内容

Top