分佈式事務(Seata)原理 詳解篇,建議收藏
前言
在之前的系列中,我們講解了關於Seata基本介紹和實際應用,今天帶來的這篇,就給大家分析一下Seata的源碼是如何一步一步實現的。讀源碼的時候我們需要俯瞰起全貌,不要去扣一個一個的細節,這樣我們學習起來會快捷而且有效率,我們學習源碼需要掌握的是整體思路和核心點。
首先 Seata
客户端啟動一般分為以下幾個流程:
- 自動加載Bean屬性和配置信息
- 初始化TM
- 初始化RM
- 初始化分佈式事務客户端完成,完成代理數據庫配置
- 連接TC(Seata服務端),註冊RM和TM
- 開啟全局事務
在這篇源碼的講解中,我們主要以AT模式為主導,官網也是主推AT模式,我們在上篇的文章中也講解過,感興趣的小夥伴可以去看一看分佈式事務(Seata) 四大模式詳解,在官網中也提供了對應的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在這裏我們只是做一些簡單的介紹,AT模式主要分為兩個階段:
一階段:
- 解析SQL,獲取SQL類型(CRUD)、表信息、條件(where) 等相關信息
- 查詢前鏡像(改變之前的數據),根據解析得到的條件信息,生成查詢語句,定位數據
- 執行業務SQL,更新數據
- 查詢後鏡像(改變後的數據),根據前鏡像的結果,通過主鍵都給你為數據
- 插入回滾日誌,將前後鏡像數據以及業務SQL等信息,組織成一條回滾日誌記錄,插入到undo Log表中
- 提交前,向TC註冊分支,申請全局鎖
- 本地事務提交,業務數據的更細膩和生成的undoLog一起提交
- 將本地事務提交的結果通知給TC
二階段:
如果TC收到的是回滾請求
- 開啟本地事務,通過XID和BranchID查找到對應的undo Log記錄
- 根據undoLog中的前鏡像和業務SQL的相關信息生成並執行回滾語句
- 提交本地事務,將本地事務的執行結果(分支事務回滾的信息)通知給TC
如果沒問題,執行提交操作
- 收到TC分支提交請求,將請求放入到一個異步任務的隊列中,馬上返回提交成功的結果給TC
- 異步任務階段的分支提交請求刪除undoLog中記錄
源碼入口
接下來,我們就需要從官網中去下載源碼,下載地址:https://seata.io/zh-cn/blog/download.html,選擇 source
即可,下載完成之後,通過IDEA打開項目。
源碼下載下來之後,我們應該如何去找入口呢?首先我們需要找到對應引入的 Seata
包 spring-alibaba-seata
,我們在回想一下,我們開啟事務的時候,是不是添加過一個@GlobalTransactional
的註解,這個註解就是我們入手的一個點,我們在 spring.factories
中看到有一個 GlobalTransactionAutoConfiguration
,這個就是我們需要關注的點,也就是我們源碼的入口
在 GlobalTransactionAutoConfiguration
中我們找到一個用Bean注入的方法 globalTransactionScanner
,這個就是全局事務掃描器,這個類型主要負責加載配置,注入相關的Bean
這裏給大家展示了當前GlobalTransactionScanner的類關係圖,其中我們現在繼承了Aop的AbstractAutoProxyCreator類型,在這其中有一個重點方法,這個方法就是判斷Bean對象是否需要代理,是否需要增強。
```java @Configuration @EnableConfigurationProperties(SeataProperties.class) public class GlobalTransactionAutoConfiguration {
//全局事務掃描器
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = applicationContext.getEnvironment()
.getProperty("spring.application.name");
String txServiceGroup = seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
seataProperties.setTxServiceGroup(txServiceGroup);
}
// 構建全局掃描器,傳入參數:應用名、事務分組名,失敗處理器
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
} ```
在這其中我們要關心的是 GlobalTransactionScanner
這個類型,這個類型掃描 @GlobalTransactional
註解,並對代理方法進行攔截增強事務的功能。我們就從源碼中搜索這個GlobalTransactionScanner
類,看看裏面具體是做了什麼
```java / * The type Global transaction scanner. * 全局事務掃描器 * @author slievrly */ public class GlobalTransactionScanner //AbstractAutoProxyCreator AOP動態代理 增強Bean extends AbstractAutoProxyCreator / * ConfigurationChangeListener: 監聽器基準接口 * InitializingBean: Bean初始化 * ApplicationContextAware: Spring容器 * DisposableBean: Spring 容器銷燬 */ implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
private final String applicationId;//服務名
private final String txServiceGroup;//事務分組
private void initClient() { //啟動日誌 if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } //檢查應用名以及事務分組名,為空拋出異常IllegalArgumentException if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don't recommend you to use default tx-service-group's value provided by seata", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM //初始化TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM //初始化RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); }
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
private void initClient() { //啟動日誌 if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } //檢查應用名以及事務分組名,為空拋出異常IllegalArgumentException if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don't recommend you to use default tx-service-group's value provided by seata", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); }
//檢查應用名以及事務分組名,為空拋出異常IllegalArgumentException
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
//代理增強,Spring 所有的Bean都會經過這個方法
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
//檢查bean和beanName
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
//加鎖防止併發
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
//檢查是否為TCC模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
//如果啟用useTccFence 失敗 ,則初始化TCC清理任務
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
//如果是,添加TCC攔截器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
//不是TCC
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判斷是否有相關事務註解,如果沒有不進行代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
//發現存在全局事務註解標註的Bean對象,添加攔截器
if (globalTransactionalInterceptor == null) {
//添加攔截器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
//檢查是否為代理對象
if (!AopUtils.isAopProxy(bean)) {
//不是代理對象,調用父級
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//是代理對象,反射獲取代理類中已經存在的攔截器組合,然後添加到這個集合中
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
} ```
InitializingBean
:中實現了一個 afterPropertiesSet()
方法,在這個方法中,調用了initClient()
AbstractAutoProxyCreator
:APO動態代理,在之前的的Nacos和Sentiel中都有這個代理類,AOP在我們越往深入學習,在學習源碼的會見到的越來越多,越來越重要,很多相關代理,都是通過AOP進行增強,在這個類中,我們需要關注有一個wrapIfNecessary()
方法, 這個方法主要是判斷被代理的bean或者類是否需要代理增強,在這個方法中會調用GlobalTransactionalInterceptor.invoke()
進行帶來增強。
具體代碼如下:
```java public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
this.order =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
if (degradeCheck) {
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
degradeCheckPeriod = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
degradeCheckAllowTimes = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
startDegradeCheck();
}
}
this.initDefaultGlobalTransactionTimeout();
}
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//獲取執行的方法
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//獲取GlobalTransactional(全局事務)、GlobalLock(全局鎖)元數據
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//GlobalLock會將本地事務的執行納入Seata分佈式事務的管理,共同競爭全局鎖
//保證全局事務在執行的時候,本地事務不可以操作全局事務的記錄
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//獲取全局鎖
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
//執行全局事務
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
//執行全局鎖
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
} ```
具體流程圖如下所示:
核心源碼
在上面我們講解到 GlobalTransactionalInterceptor
作為全局事務攔截器,一旦執行攔截,就會進入invoke方法,其中,我們會做 @GlobalTransactional
註解的判斷,如果有這個註解的存在,會執行全局事務和全局鎖,再執行全局事務的時候會調用 handleGlobalTransaction
全局事務處理器,獲取事務信息,那我們接下來就來看一下 GlobalTransactionalInterceptor.handleGlobalTransaction
到底是如何執行全局事務的
```java Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true; try { return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); }
//獲取事務名稱,默認獲取方法名
public String name() {
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
/**
* 解析GlobalTransation註解屬性,封裝對對象
* @return
*/
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
//獲取超時時間,默認60秒
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
//構建事務信息對象
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);//超時時間
transactionInfo.setName(name());//事務名稱
transactionInfo.setPropagation(aspectTransactional.getPropagation());//事務傳播
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校驗或佔用全局鎖重試間隔
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校驗或佔用全局鎖重試次數
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
//其他構建信息
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
//執行異常
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
```
在這裏我們,主要關注一個重點方法 execute()
,這個方法主要用來執行事務的具體流程:
- 獲取事務信息
- 執行全局事務
- 發生異常全局回滾,各個數據通過UndoLog進行事務補償
- 全局事務提交
- 清除所有資源
這個位置也是一個非常核心的一個位置,因為我們所有的業務進來以後都會去走這個位置,具體源碼如下所示:
```java public Object execute(TransactionalExecutor business) throws Throwable { // 1. Get transactionInfo //獲取事務信息 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. //獲取當前事務,主要獲取XID GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
//根據配置的不同事務傳播行為,執行不同的邏輯
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
//如果當前事務為空,創建一個新的事務
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
//開始執行全局事務
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 執行當前業務邏輯
//1、在TC註冊當前分支事務,TC會在branch_table中插入一條分支事務數據
//2、執行本地update語句,並在執行前後查詢數據狀態,並把數據前後鏡像存入到undo_log中
//3、遠程調用其他應用,遠程應用接收到XID,也會註冊分支事務,寫入branch_table以及本地undo_log表
//4、會在lock_table表中插入全局鎖數據(一個分支一條)
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
//發生異常全局回滾,每個事務通過undo_log表進行事務補償
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
//全局提交
commitTransaction(tx);
return rs;
} finally {
//5. clear
//清理所有資源
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
```
這其中的第三步和第四步其實在向 TC(Seata-Server)發起全局事務的提交或者回滾,在這裏我們首先關注執行全局事務的 beginTransaction()
方法
```java // 向TC發起請求,採用模板模式 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); //對TC發起請求 tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
}
}
``
在來關注其中,向TC發起請求的
tx.begin()方法,而調用
begin()方法的類為:
DefaultGlobalTransaction`
java
@Override
public void begin(int timeout, String name) throws TransactionException {
//判斷調用者是否為TM
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
//獲取XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//綁定XID
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
再來看一下 transactionManager.begin()
方法,這個時候使用的是 DefaultTransactionManager.begin
默認的事務管理者,來獲取XID,傳入事務相關的信息 ,最好TC返回對應的全局事務XID,它調用的是DefaultTransactionManager.begin()
方法
java
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//發送請求得到響應
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
//返回XID
return response.getXid();
}
在這裏我們需要關注一個syncCall
,在這裏採用的是Netty通訊方式
java
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// 通過Netty發送請求
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
具體圖解如下:
在這裏我們需要重點了解 GlobalTransactionScanner
這個類型,在這個類型中繼承了一些接口和抽象類,這個類主要作用就是掃描有註解的Bean,並做AOP增強。
-
ApplicationContextAware
:繼承這個類型以後,需要實現其方法setApplicationContext()
,當Spring啟動完成以後,會自動調用這個類型,將ApplicationContext
給bean
,也就是説,GlobalTransactionScanner
能夠很自然的使用Spring環境 -
InitializingBean
: 繼承這個接口,需要實現afterPropertiesSet()
,但凡是繼承這個接口的類,在初始化的時候,當所有的properties
設置完成以後,會執行這個方法 -
DisposableBean
: 這個類,實現了一個destroy()
這個方法是在銷燬的時候去調用 -
AbstractAutoProxyCreator
: 這個類是Spring實現AOP的一種方式,本質上是一個BeanPostProcessor
,在Bean初始化至去年,調用內部createProxy()
,創建一個Bean的AOP代理Bean並返回,對Bean進行增強。
Seata數據源代理
在上面的環節中,我們講解了Seata AT模式2PC的執行流程,那麼現在我們就來帶大家瞭解一下關於AT數據源代理的信息,這也是AT模式中非常關鍵的一個重要知識點,大家可以拿起小本子,記下來。
首先AT模式的核心主要分為一下兩個 - 開啟全局事務,獲取全局鎖。 - 解析SQL並寫入undoLog中。
關於第一點我們已經分析清楚了,第二點就是關於AT模式如何解析SQL並寫入undoLog中,但是在這之前,我們需要知道Seata是如何選擇數據源,並進行數據源代理的。雖然全局事務攔截成功後最終還是執行了業務方法進行SQL提交和操作,但是由於Seata對數據源進行了代理,所以SQL的解析和undoLog的操作,是在數據源代理中進行完成的。
數據源代理是Seata中一個非常重要的知識點,在分佈式事務運行過程中,undoLog的記錄、資源的鎖定,用户都是無感知的,因為這些操作都是數據源的代理中完成了,恰恰是這樣,我們才要去了解,這樣不僅有利於我們瞭解Seata的核心操作,還能對以後源碼閲讀有所幫助,因為其實很多底層代碼都會去使用這樣用户無感知的方式(代理)去實現。
同樣,我們在之前的尋找源碼入口的時候,通過我們項目中引入的jar找到一個 SeataAutoConfiguration
類,我們在裏面找到一個SeataDataSourceBeanPostProcessor()
,這個就是我們數據源代理的入口方法
我們進入SeataDataSourceBeanPostProcessor
類裏面,發現繼承了一個 BeanPostProcessor
,這個接口我們應該很熟悉,這個是Sprng的拓展接口,所有的Bean對象,都有進入兩個方法 postProcessAfterInitialization()
和 postProcessBeforeInitialization()
這兩個方法都是由 BeanPostProcessor
提供的,這兩個方法,一個是初始化之前執行Before
。一個是在初始化之後執行After
,主要用來對比我們的的Bean是否為數據源代理對象。
在這裏我們需要關注到一個postProcessAfterInitialization.proxyDataSource()
方法,這個裏面
```java private Object proxyDataSource(Object originBean) { DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean); if (this.useJdkProxy) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean)); } else { return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean)); }
}
```
這裏有一個DataSourceProxy
代理對象,我們需要看的就是這個類,這個就是我們數據庫代理的對象,我們從我們下載的源碼項目中,搜索這個代理對象,當我們打開這個類的目錄時發現,除了這個,還有ConnectionProxy
連接對象、StatementProxy
、PreparedStatementProxy
SQL執行對象,這些都被Seata進行了代理,為什麼要對這些都進行代理,代理的目的其實為了執行Seata的業務邏輯,生成undoLog,全局事務的開啟,事務的提交回滾等操作
DataSourceProxy
具體做了什麼,主要功能有哪些,我們來看一下。他在源碼中是如何體現的,我們需要關注的是init()
```java public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
private String resourceGroupId;
private void init(DataSource dataSource, String resourceGroupId) {
//資源組ID,默認是“default”這個默認值
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
//根據原始數據源得到JDBC連接和數據庫類型
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
} else if (JdbcConstants.MARIADB.equals(dbType)) {
dbType = JdbcConstants.MYSQL;
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
initResourceId();
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
//如果配置開關打開,會定時在線程池不斷更新表的元數據緩存信息
tableMetaExecutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
} ```
從上面我們可以看出,他主要做了以下幾點的增強:
- 給每個數據源標識資源組ID
- 如果打開配置,會有一個定時線程池定時更新表的元數據信息並緩存到本地
- 生成代理連接
ConnectionProxy
對象
在這三個增強功能裏面,第三個是最重要的,在AT模式裏面,會自動記錄undoLog,資源鎖定,都是通過ConnectionProxy
完成的,除此之外 DataSrouceProxy
重寫了一個方法 getConnection
,因為這裏返回的是一個 ConnectionProxy
,而不是原生的Connection
```java @Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); }
@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection);
}
```
ConnectionProxy
ConnectionProxy
繼承 AbstractConnectionProxy
,在這個父類中有很多公用的方法,在這個父類有 PreparedStatementProxy
、StatementProxy
、DataSourceProxy
所以我們需要先來看一下AbstractConnectionProxy
,因為這裏封裝了需要我們用到的通用方法和邏輯,在其中我們需要關注的主要在於 PreparedStatementProxy
和 StatementProxy
,在這裏的邏輯主要是數據源連接的步驟,連接獲取,創建執行對象等等
```java @Override public Statement createStatement() throws SQLException { //調用真實連接對象獲取Statement對象 Statement targetStatement = getTargetConnection().createStatement(); //創建Statement的代理 return new StatementProxy(this, targetStatement); }
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
//獲取數據庫類型 mysql/oracle
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
//如果是AT模式且開啟全局事務
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
//獲取表的元數據
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
//得到表的主鍵列名
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
//創建PreparedStatementProxy代理
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
```
在這兩個代理對象中,都用到了以下幾個方法:
```java @Override public ResultSet executeQuery(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql); }
@Override public int executeUpdate(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql); }
@Override public boolean execute(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql); }
```
在這些方法中都調用了 ExecuteTemplate.execute()
,所以我們就看一下在 ExecuteTemplate
類中具體是做了什麼操作:
```java public class ExecuteTemplate {
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
//如果沒有全局鎖,並且不是AT模式,直接執行SQL
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
//得到數據庫類型- mysql/oracle
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//sqlRecognizers 為SQL語句的解析器,獲取執行的SQL,通過它可以獲得SQL語句表名、相關的列名、類型等信息,最後解析出對應的SQL表達式
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//如果seata沒有找到合適的SQL語句解析器,那麼便創建簡單執行器PlainExecutor
//PlainExecutor直接使用原生的Statment對象執行SQL
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
//新增
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
//修改
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//刪除
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//加鎖
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//插入加鎖
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
//原生
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
//批量處理SQL語句
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
//執行
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
} ```
在 ExecuteTemplate
就一個 execute()
,Seata將SQL執行委託給不同的執行器(模板),Seata提供了6種執行器也就是我們代碼 case 中(INSERT
,UPDATE
,DELETE
,SELECT_FOR_UPDATE
,INSERT_ON_DUPLICATE_UPDATE
),這些執行器的父類都是AbstractDMLBaseExecutor
UpdateExecutor
: 執行update語句InsertExecutor
: 執行insert語句DeleteExecutor
: 執行delete語句SelectForUpdateExecutor
: 執行select for update語句PlainExecutor
: 執行普通查詢語句MultiExecutor
: 複合執行器,在一條SQL語句中執行多條語句
關係圖如下:
然後我們找到rs = executor.execute(args);
最終執行的方法,找到最頂級的父類BaseTransactionalExecutor.execute()
```java @Override public T execute(Object... args) throws Throwable { String xid = RootContext.getXID(); if (xid != null) { //獲取XID statementProxy.getConnectionProxy().bind(xid); } //設置全局鎖 statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args); }
``
在根據
doExecute(args);找到其中的重寫方法
AbstractDMLBaseExecutor.doExecute()`
java
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//是否自動提交
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
對於數據庫而言,本身都是自動提交的,所以我們進入executeAutoCommitTrue()
java
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
//設置為手動提交
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
//調用手動提交方法,得到分支執行的最終結果
T result = executeAutoCommitFalse(args);
//執行提交
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
connectionProxy.changeAutoCommit()
方法,修改為手動提交後,我們看來最關鍵的代碼executeAutoCommitFalse()
java
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
//獲取前鏡像
TableRecords beforeImage = beforeImage();
//執行具體業務
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//獲取執行數量
int updateCount = statementProxy.getUpdateCount();
//判斷如果執行數量大於0
if (updateCount > 0) {
//獲取後鏡像
TableRecords afterImage = afterImage(beforeImage);
//暫存到undolog中,在Commit的時候保存到數據庫
prepareUndoLog(beforeImage, afterImage);
}
return result;
}
我們再回到executeAutoCommitTrue
中,去看看提交做了哪些操作connectionProxy.commit();
java
@Override
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
//具體執行
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
進入到doCommit()
中
java
private void doCommit() throws SQLException {
//判斷是否存在全局事務
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
作為分佈式事務,一定是存在全局事務的,所以我們進入 processGlobalTransactionCommit()
java
private void processGlobalTransactionCommit() throws SQLException {
try {
//註冊分支事務
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//寫入數據庫undolog
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//執行原生提交 一階段提交
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
其中register()
方法就是註冊分支事務的方法,同時還會將undoLog寫入數據庫和執行提交等操作
```java
//註冊分支事務,生成分支事務ID
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
//註冊分支事務
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
}
```
然後我們在回到processGlobalTransactionCommit
中,看看寫入數據庫中的flushUndoLogs()
```java @Override public void flushUndoLogs(ConnectionProxy cp) throws SQLException { ConnectionContext connectionContext = cp.getContext(); if (!connectionContext.hasUndoLog()) { return; } //獲取XID String xid = connectionContext.getXid(); //獲取分支ID long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}
CompressorType compressorType = CompressorType.NONE;
if (needCompress(undoLogContent)) {
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
//寫入數據庫具體位置
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
}
```
具體寫入方法,此時我們使用的是MySql,所以執行的是MySql實現類MySQLUndoLogManager.insertUndoLogWithNormal()
```java
@Override
protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
Connection conn) throws SQLException {
insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
}
//具體寫入操作
private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
State state, Connection conn) throws SQLException {
try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
pst.setLong(1, branchId);
pst.setString(2, xid);
pst.setString(3, rollbackCtx);
pst.setBytes(4, undoLogContent);
pst.setInt(5, state.getValue());
pst.executeUpdate();
} catch (Exception e) {
if (!(e instanceof SQLException)) {
e = new SQLException(e);
}
throw (SQLException) e;
}
}
```
具體流程如下所示:
Seata 服務端
我們找到Server.java
這裏就是啟動入口,在這個入口中找到協調者,因為TC整體的操作就是協調整體的全局事務
java
//默認協調者
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
在DefaultCoordinator
類中我們找到 一個doGlobalBegin
這個就是處理全局事務開始的方法,以及全局提交 doGlobalCommit
和全局回滾 doGlobalRollback
```java
//處理全局事務
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//響應客户端xid
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
//處理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.commit(request.getXid()));
}
//處理全局回滾
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.rollback(request.getXid()));
}
```
在這裏我們首先關注 doGlobalBegin
中 core.begin()
```java @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { //創建全局事務Session GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//為Session重添加回調監聽,SessionHolder.getRootSessionManager() 獲取一個全局Session管理器DataBaseSessionManager
//觀察者設計模式,創建DataBaseSessionManager
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//全局事務開始
session.begin();
// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);
return session.getXid();
}
```
然後我們在來看一下SessionHolder.getRootSessionManager()
```java /* * Gets root session manager. * 獲取一個全局Session管理器 * @return the root session manager / public static SessionManager getRootSessionManager() { if (ROOT_SESSION_MANAGER == null) { throw new ShouldNeverHappenException("SessionManager is NOT init!"); } return ROOT_SESSION_MANAGER; }
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
}
StoreMode storeMode = StoreMode.get(mode);
//判斷Seata模式,當前為DB
if (StoreMode.DB.equals(storeMode)) {
//通過SPI機制讀取SessionManager接口實現類,讀取的META-INF.services目錄,在通過反射機制創建對象DataBaseSessionManager
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
........
}
}
```
在這裏他其實讀取的是DB模式下 io.seata.server.session.SessionManager
文件的內容
我們在回到begin
方法中,去查看session.begin()
```java @Override public void begin() throws TransactionException { //聲明全局事務開始 this.status = GlobalStatus.Begin; //開始時間 this.beginTime = System.currentTimeMillis(); //激活全局事務 this.active = true; //將SessionManager放入到集合中,調用onBegin方法 for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { //調用父級抽象類的方法 lifecycleListener.onBegin(this); } }
```
這裏我們來看一下 onBegin()
方法,調用的是父級的方法,在這其中我們要關注 addGlobalSession()
方法,但是要注意,這裏我們用的是db模式所以調用的是db模式的 DateBaseSessionManager
```java @Override public void onBegin(GlobalSession globalSession) throws TransactionException { //這裏調用的是DateBaseSessionManager addGlobalSession(globalSession); }
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
//寫入session
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}
然後在看查詢其中關鍵的方法`DataBaseTransactionStoreManager.writeSession()`
java
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
//第一次進入是寫入 會進入當前方法
//全局添加
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//全局修改
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//全局刪除
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//分支添加
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
//分支更新
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
//分支移除
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}
```
我們就看第一次進去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
java
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(index++, globalTransactionDO.getXid());
ps.setLong(index++, globalTransactionDO.getTransactionId());
ps.setInt(index++, globalTransactionDO.getStatus());
ps.setString(index++, globalTransactionDO.getApplicationId());
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
ps.setString(index++, transactionName);
ps.setInt(index++, globalTransactionDO.getTimeout());
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
在這裏有一個 GlobalTransactionDO
對象,裏面有xid、transactionId
等等,到這裏是不是就很熟悉了、
還記得我們第一次使用Seata的時候會創建三張表
- branch_table 分支事務表
- global_table 全局事務表
- lock_table 全局鎖表
而這裏就是對應我們的global_table
表,其他兩個也是差不多,都是一樣的操作
流程圖如下:
總結
完整流程圖:
對於Seata源碼來説主要是瞭解從哪裏入口以及核心點在哪裏,遇到有疑問的,可以Debug,對於Seata AT模式,我們主要掌握的核心點是 - 如何獲取全局鎖、開啟全局事務 - 解析SQL並寫入undolog
圍繞這兩點去看的話,會有針對性一點,到這裏我們的Seata源碼就講解完了,有疑問的小夥伴記得在下方留言。
我是牧小農,怕什麼真理無窮,進一步有進一步的歡喜,大家加油!
- 我正在參與掘金技術社區創作者簽約計劃招募活動,點擊鏈接報名投稿。
- 分佈式事務(Seata)原理 詳解篇,建議收藏
- 這篇SpringCloud GateWay 詳解,你用的到
- 支付寶h5支付(java版)
- Sentinel與OpenFeign 服務熔斷那些事
- Nacos源碼系列—訂閲機制的前因後果(上)
- Nacos源碼系列—服務端那些事兒
- 這一篇 K8S(Kubernetes)集羣部署 我覺得還可以
- 這一篇 K8S(Kubernetes)我覺得可以瞭解一下!!!
- Zookeeper入門看這篇就夠了
- 淺析 DDD 領域驅動設計
- 如果父母依舊辛苦,那我們的成長又有什麼意義?
- 【玩轉PDF】賊穩,產品要做一個三方合同簽署,我方了!
- 【多線程與高併發】從一則招聘信息進入多線程的世界
- 【死磕JVM】用Arthas排查JVM內存 真爽!我從小用到大
- 【死磕JVM】看完這篇我也會排查JVM內存過高了 就是玩兒!
- 【死磕JVM】一道面試題引發的“棧幀”!!!
- 我給Apache頂級項目貢獻了點源碼。
- 為什麼要使用位運算表示線程池狀態?
- 結構化思維如何指導技術系統優化
- 【死磕JVM】JVM快速入門之前戲篇