免费下载案例集|20+数字化领先企业人才培养实践经验 了解详情
写点什么

Shardingsphere 整合 Atomikos 对 XA 分布式事务的支持(2)

  • 2020-11-30
  • 本文字数:22749 字

    阅读完需:约 75 分钟

Shardingsphere整合Atomikos对XA分布式事务的支持(2)

Apache ShardingSphere 是一套开源的分布式数据库中间件解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款相互独立,却又能够混合部署配合使用的产品组成。它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。


ShardingSphere 已于 2020 年 4 月 16 日成为 Apache 软件基金会的顶级项目。


咱们话不多,接上篇,我们直接进入正题。


Atomikos 简单介绍


Atomikos(https://www.atomikos.com/),其实是一家公司的名字,提供了基于JTA规范的XA分布式事务TM的实现。其旗下最著名的产品就是事务管理器。产品分两个版本:


  • TransactionEssentials:开源的免费产品;

  • ExtremeTransactions:上商业版,需要收费。


这两个产品的关系如下图所示:



ExtremeTransactions 在 TransactionEssentials 的基础上额外提供了以下功能(重要的):


  • 支持 TCC:这是一种柔性事务

  • 支持通过 RMI、IIOP、SOAP 这些远程过程调用技术,进行事务传播。

  • 事务日志云存储,云端对事务进行恢复,并且提供了完善的管理后台。


org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager 详解


我们简单的来回顾下org.apache.shardingsphere.transaction.spiShardingTransactionManager


public interface ShardingTransactionManager extends AutoCloseable {
/** * Initialize sharding transaction manager. * * @param databaseType database type * @param resourceDataSources resource data sources */ void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);
/** * Get transaction type. * * @return transaction type */ TransactionType getTransactionType();
/** * Judge is in transaction or not. * * @return in transaction or not */ boolean isInTransaction();
/** * Get transactional connection. * * @param dataSourceName data source name * @return connection * @throws SQLException SQL exception */ Connection getConnection(String dataSourceName) throws SQLException;
/** * Begin transaction. */ void begin();
/** * Commit transaction. */ void commit();
/** * Rollback transaction. */ void rollback();}
复制代码


我们重点县关注init方法,从它的命名,你就应该能够看出来,这是整个框架的初始化方法,让我们来看看它是如何进行初始化的。


 private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();
private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();
@Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) { for (ResourceDataSource each : resourceDataSources) { cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager)); } xaTransactionManager.init(); }
复制代码


  • 首先 SPI 的方式加载 XATransactionManager 的具体实现类,这里返回的就是org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager

  • 我们在关注下 new XATransactionDataSource() , 进入 org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource类的构造方法。


public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) {        this.databaseType = databaseType;        this.resourceName = resourceName;        this.dataSource = dataSource;        if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {            // 重点关注 1 ,返回了xaDatasource            xaDataSource = XADataSourceFactory.build(databaseType, dataSource);            this.xaTransactionManager = xaTransactionManager;            // 重点关注2 注册资源            xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);        }    }
复制代码


  • 我们重点来关注 XADataSourceFactory.build(databaseType, dataSource),从名字我们就可以看出,这应该是返回JTA规范里面的XADataSourc,在 ShardingSphere 里面很多的功能,可以从代码风格的命名上就能猜出来,这就是优雅代码(吹一波)。不多逼逼,我们进入该方法。


public final class XADataSourceFactory {
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) { return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource); }}
复制代码


  • 首先又是一个 SPI 定义的 XADataSourceDefinitionFactory,它根据不同的数据库类型,来加载不同的方言。然后我们进入 swap方法。


 public XADataSource swap(final DataSource dataSource) {        XADataSource result = createXADataSource();        setProperties(result, getDatabaseAccessConfiguration(dataSource));        return result;    }
复制代码


  • 很简明,第一步创建,XADataSource,第二步给它设置属性(包含数据的连接,用户名密码等),然后返回。

  • 返回 XATransactionDataSource 类,关注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); 从名字可以看出,这是注册事务恢复资源。这个我们在事务恢复的时候详解。

  • 返回 XAShardingTransactionManager.init() ,我们重点来关注:xaTransactionManager.init();,最后进入AtomikosTransactionManager.init()


public final class AtomikosTransactionManager implements XATransactionManager {
private final UserTransactionManager transactionManager = new UserTransactionManager();
private final UserTransactionService userTransactionService = new UserTransactionServiceImp();
@Override public void init() { userTransactionService.init(); }
}
复制代码


  • 进入UserTransactionServiceImp.init()


private void initialize() {       //添加恢复资源 不用关心        for (RecoverableResource resource : resources_) {            Configuration.addResource ( resource );        }        for (LogAdministrator logAdministrator : logAdministrators_) {            Configuration.addLogAdministrator ( logAdministrator );        }         //注册插件 不用关心        for (TransactionServicePlugin nxt : tsListeners_) {            Configuration.registerTransactionServicePlugin ( nxt );        }        //获取配置属性 重点关心        ConfigProperties configProps = Configuration.getConfigProperties();        configProps.applyUserSpecificProperties(properties_);        //进行初始化        Configuration.init();    }
复制代码


  • 我们重点关注,获取配置属性。最后进入com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()方法


    @Override    public ConfigProperties initializeProperties() {         //读取classpath下的默认配置transactions-defaults.properties        Properties defaults = new Properties();        loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);        //读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值        Properties transactionsProperties = new Properties(defaults);        loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);        //读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值        Properties jtaProperties = new Properties(transactionsProperties);        loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);
//读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置 Properties customProperties = new Properties(jtaProperties); loadPropertiesFromCustomFilePath(customProperties); //最终构造一个ConfigProperties对象,来表示实际要使用的配置 Properties finalProperties = new Properties(customProperties); return new ConfigProperties(finalProperties); }
复制代码


  • 接下来重点关注, Configuration.init(), 进行初始化。


ublic static synchronized boolean init() {        boolean startupInitiated = false;        if (service_ == null) {            startupInitiated = true;           //SPI方式加载插件注册,无需过多关心              addAllTransactionServicePluginServicesFromClasspath();            ConfigProperties configProperties = getConfigProperties();          //调用插件的beforeInit方法进行初始化话,无需过多关心            notifyBeforeInit(configProperties);          //进行事务日志恢复的初始化,很重要,接下来详解            assembleSystemComponents(configProperties);         //进入系统注解的初始化,一般重要            initializeSystemComponents(configProperties);            notifyAfterInit();            if (configProperties.getForceShutdownOnVmExit()) {                addShutdownHook(new ForceShutdownHook());            }        }        return startupInitiated;    }
复制代码


  • 我们先来关注 assembleSystemComponents(configProperties); 进入它,进入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()方法:


@Override    public TransactionServiceProvider assembleTransactionService(            ConfigProperties configProperties) {        RecoveryLog recoveryLog =null;       //打印日志        logProperties(configProperties.getCompletedProperties());       //生成唯一名字        String tmUniqueName = configProperties.getTmUniqueName();
long maxTimeout = configProperties.getMaxTimeout(); int maxActives = configProperties.getMaxActives(); boolean threaded2pc = configProperties.getThreaded2pc(); //SPI方式加载OltpLog ,这是最重要的扩展地方,如果用户没有SPI的方式去扩展那么就为null OltpLog oltpLog = createOltpLogFromClasspath(); if (oltpLog == null) { LOGGER.logInfo("Using default (local) logging and recovery..."); //创建事务日志存储资源 Repository repository = createRepository(configProperties); oltpLog = createOltpLog(repository); //??? Assemble recoveryLog recoveryLog = createRecoveryLog(repository); } StateRecoveryManagerImp recoveryManager = new StateRecoveryManagerImp(); recoveryManager.setOltpLog(oltpLog); //生成唯一id生成器,以后生成XID会用的到 UniqueIdMgr idMgr = new UniqueIdMgr ( tmUniqueName ); int overflow = idMgr.getMaxIdLengthInBytes() - MAX_TID_LENGTH; if ( overflow > 0 ) { // see case 73086 String msg = "Value too long : " + tmUniqueName; LOGGER.logFatal ( msg ); throw new SysException(msg); } return new TransactionServiceImp(tmUniqueName, recoveryManager, idMgr, maxTimeout, maxActives, !threaded2pc, recoveryLog); }
复制代码


  • 我们重点来分析createOltpLogFromClasspath(), 采用 SPI 的加载方式来获取,默认这里会返回 null, 什么意思呢?


就是当没有扩展的时候,atomikos,会创建框架自定义的资源,来存储事务日志。


private OltpLog createOltpLogFromClasspath() {        OltpLog ret = null;        ServiceLoader<OltpLogFactory> loader = ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader());        int i = 0;        for (OltpLogFactory l : loader ) {            ret = l.createOltpLog();            i++;        }        if (i > 1) {            String msg = "More than one OltpLogFactory found in classpath - error in configuration!";            LOGGER.logFatal(msg);            throw new SysException(msg);        }        return ret;    }
复制代码


  • 我们跟着进入 Repository repository = createRepository(configProperties);


    private CachedRepository createCoordinatorLogEntryRepository(            ConfigProperties configProperties) throws LogException {        //创建内存资源存储        InMemoryRepository inMemoryCoordinatorLogEntryRepository = new InMemoryRepository();       //进行初始化        inMemoryCoordinatorLogEntryRepository.init();       //创建使用文件存储资源作为backup        FileSystemRepository backupCoordinatorLogEntryRepository = new FileSystemRepository();       //进行初始化        backupCoordinatorLogEntryRepository.init();      //内存与file资源进行合并        CachedRepository repository = new CachedRepository(inMemoryCoordinatorLogEntryRepository, backupCoordinatorLogEntryRepository);        repository.init();        return repository;    }
复制代码


  • 这里就会创建出 CachedRepository,里面包含了 InMemoryRepositoryFileSystemRepository

  • 回到主线 com.atomikos.icatch.config.Configuration.init(), 最后来分析下notifyAfterInit();


    private static void notifyAfterInit() {         //进行插件的初始化        for (TransactionServicePlugin p : tsListenersList_) {            p.afterInit();        }        for (LogAdministrator a : logAdministrators_) {            a.registerLogControl(service_.getLogControl());        }         //设置事务恢复服务,进行事务的恢复        for (RecoverableResource r : resourceList_ ) {            r.setRecoveryService(recoveryService_);        }
}
复制代码


  • 插件的初始化会进入com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()


    public void afterInit() {        TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(), autoRegisterResources);          //如果我们自定义扩展了 OltpLog ,这里就会返回null,如果是null,那么XaResourceRecoveryManager就是null        RecoveryLog recoveryLog = Configuration.getRecoveryLog();        long maxTimeout = Configuration.getConfigProperties().getMaxTimeout();        if (recoveryLog != null) {            XaResourceRecoveryManager.installXaResourceRecoveryManager(new DefaultXaRecoveryLog(recoveryLog, maxTimeout),Configuration.getConfigProperties().getTmUniqueName());        }
}
复制代码


  • 重点注意 RecoveryLog recoveryLog = Configuration.getRecoveryLog(); ,如果用户采用SPI的方式,扩展了com.atomikos.recovery.OltpLog这里就会返回 null。如果是 null,则不会对 XaResourceRecoveryManager 进行初始化。

  • 回到 notifyAfterInit(), 我们来分析 setRecoveryService


public void setRecoveryService ( RecoveryService recoveryService )            throws ResourceException    {
if ( recoveryService != null ) { if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Installing recovery service on resource " + getName () ); this.branchIdentifier=recoveryService.getName(); recover(); } }
复制代码


  • 我们进入 recover() 方法:


 public void recover() {        XaResourceRecoveryManager xaResourceRecoveryManager = XaResourceRecoveryManager.getInstance();        //null for LogCloud recovery        if (xaResourceRecoveryManager != null) {             try {                xaResourceRecoveryManager.recover(getXAResource());            } catch (Exception e) {                refreshXAResource(); //cf case 156968            }
} }
复制代码


  • 看到最关键的注释了吗,如果用户采用SPI的方式,扩展了com.atomikos.recovery.OltpLog,那么XaResourceRecoveryManager 为 null,则就会进行云端恢复,反之则进行事务恢复。事务恢复很复杂,我们会单独来讲。


到这里 atomikos 的基本的初始化已经完成。


atomikos 事务 begin 流程


我们知道,本地的事务,都会有一个 trainsaction.begin, 对应 XA 分布式事务来说也不另外,我们再把思路切换回XAShardingTransactionManager.begin(), 会调用com.atomikos.icatch.jta.TransactionManagerImp.begin()


  public void begin ( int timeout ) throws NotSupportedException,            SystemException    {        CompositeTransaction ct = null;        ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;
ct = compositeTransactionManager.getCompositeTransaction(); if ( ct != null && ct.getProperty ( JTA_PROPERTY_NAME ) == null ) { LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() + " (will be resumed after JTA transaction ends)" ); ct = compositeTransactionManager.suspend(); resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct ); }
try { //创建事务补偿点 ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 ); if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant ); if ( ct.isRoot () && getDefaultSerial () ) ct.setSerial (); ct.setProperty ( JTA_PROPERTY_NAME , "true" ); } catch ( SysException se ) { String msg = "Error in begin()"; LOGGER.logError( msg , se ); throw new ExtendedSystemException ( msg , se ); } recreateCompositeTransactionAsJtaTransaction(ct); }
复制代码


  • 这里我们主要关注 compositeTransactionManager.createCompositeTransaction(),


public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException    {        CompositeTransaction ct = null , ret = null;
ct = getCurrentTx (); if ( ct == null ) { ret = getTransactionService().createCompositeTransaction ( timeout ); if(LOGGER.isDebugEnabled()){ LOGGER.logDebug("createCompositeTransaction ( " + timeout + " ): " + "created new ROOT transaction with id " + ret.getTid ()); } } else { if(LOGGER.isDebugEnabled()) LOGGER.logDebug("createCompositeTransaction ( " + timeout + " )"); ret = ct.createSubTransaction ();
}
Thread thread = Thread.currentThread (); setThreadMappings ( ret, thread );
return ret; }
复制代码


  • 创建了事务补偿点,然后把他放到了用当前线程作为 key 的 Map 当中,这里思考,为啥它不用 threadLocal


到这里 atomikos 的事务 begin 流程已经完成。大家可能有些疑惑,begin 好像什么都没有做,XA start 也没调用?别慌,下一节继续来讲。


XATransactionDataSource getConnection() 流程


我们都知道想要执行 SQL 语句,必须要获取到数据库的 connection。让我们再回到 XAShardingTransactionManager.getConnection() 最后会调用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()


 public Connection getConnection() throws SQLException, SystemException, RollbackException {      //先检查是否已经有存在的connection,这一步很关心,也是XA的关键,因为XA事务,必须在同一个connection        if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {            return dataSource.getConnection();        }      //获取数据库连接        Connection result = dataSource.getConnection();      //转成XAConnection,其实是同一个连接        XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);      //获取JTA事务定义接口        Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();        if (!enlistedTransactions.get().contains(transaction)) {      //进行资源注册            transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));            transaction.registerSynchronization(new Synchronization() {                @Override                public void beforeCompletion() {                    enlistedTransactions.get().remove(transaction);                }
@Override public void afterCompletion(final int status) { enlistedTransactions.get().clear(); } }); enlistedTransactions.get().add(transaction); } return result; }
复制代码


  • 首先第一步很关心,尤其是对 shardingsphere 来说,因为在一个事务里面,会有多个 SQL 语句,打到相同的数据库,所以对相同的数据库,必须获取同一个 XAConnection,这样才能进行 XA 事务的提交与回滚。

  • 我们接下来关心 transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 会进入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代码太长,截取一部分。


try {                restx = (XAResourceTransaction) res                        .getResourceTransaction(this.compositeTransaction);
// next, we MUST set the xa resource again, // because ONLY the instance we got as argument // is available for use now ! // older instances (set in restx from previous sibling) // have connections that may be in reuse already // ->old xares not valid except for 2pc operations
restx.setXAResource(xares); restx.resume(); } catch (ResourceException re) { throw new ExtendedSystemException( "Unexpected error during enlist", re); } catch (RuntimeException e) { throw e; }
addXAResourceTransaction(restx, xares);
复制代码


  • 我们直接看 restx.resume();


public synchronized void resume() throws ResourceException {        int flag = 0;        String logFlag = "";        if (this.state.equals(TxState.LOCALLY_DONE)) {// reused instance            flag = XAResource.TMJOIN;            logFlag = "XAResource.TMJOIN";        } else if (!this.knownInResource) {// new instance            flag = XAResource.TMNOFLAGS;            logFlag = "XAResource.TMNOFLAGS";        } else            throw new IllegalStateException("Wrong state for resume: "                    + this.state);
try { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.start ( " + this.xidToHexString + " , " + logFlag + " ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } this.xaresource.start(this.xid, flag);
} catch (XAException xaerr) { String msg = interpretErrorCode(this.resourcename, "resume", this.xid, xaerr.errorCode); LOGGER.logWarning(msg, xaerr); throw new ResourceException(msg, xaerr); } setState(TxState.ACTIVE); this.knownInResource = true; }
复制代码


  • 哦多尅,看见了吗,各位,看见了 this.xaresource.start(this.xid, flag); 了吗????,我们进去,假设我们使用的 Mysql 数据库:


 public void start(Xid xid, int flags) throws XAException {        StringBuilder commandBuf = new StringBuilder(300);        commandBuf.append("XA START ");        appendXid(commandBuf, xid);        switch(flags) {        case 0:            break;        case 2097152:            commandBuf.append(" JOIN");            break;        case 134217728:            commandBuf.append(" RESUME");            break;        default:            throw new XAException(-5);        }
this.dispatchCommand(commandBuf.toString()); this.underlyingConnection.setInGlobalTx(true); }
复制代码


  • 组装XA start Xid SQL 语句,进行执行。


到这里,我们总结下,在获取数据库连接的时候,我们执行了 XA 协议接口中的 XA start xid


atomikos 事务 commit 流程


好了,上面我们已经开启了事务,现在我们来分析下事务 commit 流程,我们再把视角切换回XAShardingTransactionManager.commit(),最后我们会进入com.atomikos.icatch.imp.CompositeTransactionImp.commit() 方法:


 public void commit () throws HeurRollbackException, HeurMixedException,            HeurHazardException, SysException, SecurityException,            RollbackException    {       //首先更新下事务日志的状态        doCommit ();        setSiblingInfoForIncoming1pcRequestFromRemoteClient();
if ( isRoot () ) { //真正的commit操作 coordinator.terminate ( true ); } }
复制代码


  • 我们关注coordinator.terminate ( true );


 protected void terminate ( boolean commit ) throws HeurRollbackException,            HeurMixedException, SysException, java.lang.SecurityException,            HeurCommitException, HeurHazardException, RollbackException,            IllegalStateException
{ synchronized ( fsm_ ) { if ( commit ) { //判断有几个参与者,如果只有一个,直接提交 if ( participants_.size () <= 1 ) { commit ( true ); } else { //否则,走XA 2阶段提交流程,先prepare, 再提交 int prepareResult = prepare (); // make sure to only do commit if NOT read only if ( prepareResult != Participant.READ_ONLY ) commit ( false ); } } else { rollback (); } } }
复制代码


  • 首先会判断参与者的个数,这里我们可以理解为 MySQL 的 database 数量,如果只有一个,退化成一阶段,直接提交。

  • 如果有多个,则走标准的 XA 二阶段提交流程。

  • 我们来看 prepare (); 流程,最后会走到com.atomikos.icatch.imp.PrepareMessage.send() ---> com.atomikos.datasource.xa.XAResourceTransaction.prepare()


int ret = 0;        terminateInResource();
if (TxState.ACTIVE == this.state) { // tolerate non-delisting apps/servers suspend(); }
// duplicate prepares can happen for siblings in serial subtxs!!! // in that case, the second prepare just returns READONLY if (this.state == TxState.IN_DOUBT) return Participant.READ_ONLY; else if (!(this.state == TxState.LOCALLY_DONE)) throw new SysException("Wrong state for prepare: " + this.state); try { // refresh xaresource for MQSeries: seems to close XAResource after // suspend??? testOrRefreshXAResourceFor2PC(); if (LOGGER.isTraceEnabled()) { LOGGER.logTrace("About to call prepare on XAResource instance: " + this.xaresource); } ret = this.xaresource.prepare(this.xid);
} catch (XAException xaerr) { String msg = interpretErrorCode(this.resourcename, "prepare", this.xid, xaerr.errorCode); if (XAException.XA_RBBASE <= xaerr.errorCode && xaerr.errorCode <= XAException.XA_RBEND) { LOGGER.logWarning(msg, xaerr); // see case 84253 throw new RollbackException(msg); } else { LOGGER.logError(msg, xaerr); throw new SysException(msg, xaerr); } } setState(TxState.IN_DOUBT); if (ret == XAResource.XA_RDONLY) { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString + " ) returning XAResource.XA_RDONLY " + "on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } return Participant.READ_ONLY; } else { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString + " ) returning OK " + "on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } return Participant.READ_ONLY + 1; }
复制代码


  • 终于,我们看到了这么一句 ret = this.xaresource.prepare(this.xid); 但是等等,我们之前不是说了,XA start xid 以后要先 XA end xid 吗?答案就在 suspend(); 里面。


public synchronized void suspend() throws ResourceException {
// BugzID: 20545 // State may be IN_DOUBT or TERMINATED when a connection is closed AFTER // commit! // In that case, don't call END again, and also don't generate any // error! // This is required for some hibernate connection release strategies. if (this.state.equals(TxState.ACTIVE)) { try { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.end ( " + this.xidToHexString + " , XAResource.TMSUCCESS ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } //执行了 xa end 语句 this.xaresource.end(this.xid, XAResource.TMSUCCESS);
} catch (XAException xaerr) { String msg = interpretErrorCode(this.resourcename, "end", this.xid, xaerr.errorCode); if (LOGGER.isTraceEnabled()) LOGGER.logTrace(msg, xaerr); // don't throw: fix for case 102827 } setState(TxState.LOCALLY_DONE); } }
复制代码


到了这里,我们已经执行了 XA start xid -> XA end xid --> XA prepare xid, 接下来就是最后一步 commit


  • 我们再回到 terminate(false) 方法,来看 commit()流程。其实和 prepare 流程一样,最后会走到 com.atomikos.datasource.xa.XAResourceTransaction.commit()。commit 执行完,数据提交


//繁杂代码过多,就显示核心的this.xaresource.commit(this.xid, onePhase);
复制代码


思考:这里的参与者提交是在一个循环里面,一个一个提交的,如果之前的提交了,后面的参与者提交的时候,挂了,就会造成数据的不一致性。


Atomikos rollback() 流程


上面我们已经分析了 commit 流程,其实 rollback 流程和 commit 流程一样,我们在把目光切换回 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback() ,最后会执行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()


    public void rollback () throws IllegalStateException, SysException    {        //清空资源,更新事务日志状态等        doRollback ();        if ( isRoot () ) {            try {                coordinator.terminate ( false );            } catch ( Exception e ) {                throw new SysException ( "Unexpected error in rollback: " + e.getMessage (), e );            }        }    }
复制代码


  • 重点关注 coordinator.terminate ( false ); ,这个和 commit 流程是一样的,只不过在 commit 流程里面,参数传的是 true。


 protected void terminate ( boolean commit ) throws HeurRollbackException,            HeurMixedException, SysException, java.lang.SecurityException,            HeurCommitException, HeurHazardException, RollbackException,            IllegalStateException
{ synchronized ( fsm_ ) { if ( commit ) { if ( participants_.size () <= 1 ) { commit ( true ); } else { int prepareResult = prepare (); // make sure to only do commit if NOT read only if ( prepareResult != Participant.READ_ONLY ) commit ( false ); } } else { //如果是false,走的是rollback rollback (); } } }
复制代码


  • 我们重点关注 rollback() ,最后会走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()


public synchronized void rollback()            throws HeurCommitException, HeurMixedException,            HeurHazardException, SysException {        terminateInResource();
if (rollbackShouldDoNothing()) { return; } if (this.state.equals(TxState.TERMINATED)) { return; }
if (this.state.equals(TxState.HEUR_MIXED)) throw new HeurMixedException(); if (this.state.equals(TxState.HEUR_COMMITTED)) throw new HeurCommitException(); if (this.xaresource == null) { throw new HeurHazardException("XAResourceTransaction " + getXid() + ": no XAResource to rollback?"); }
try { if (this.state.equals(TxState.ACTIVE)) { // first suspend xid suspend(); }
// refresh xaresource for MQSeries: seems to close XAResource after // suspend??? testOrRefreshXAResourceFor2PC(); if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.rollback ( " + this.xidToHexString + " ) " + "on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } this.xaresource.rollback(this.xid);
复制代码


  • 先在supend()方法里面执行了 XA end xid 语句, 接下来执行 this.xaresource.rollback(this.xid); 进行数据的回滚。


Atomikos 事务恢复流程


在说,事务恢复流程之前,我们来讨论下,会啥会出现事务恢复?,XA 2 阶段提交协议不是强一致性的吗?。要解答这个问题,我们就要来看看 XA 二阶段协议有什么问题?


问题一 :单点故障


由于协调者的重要性,一旦协调者 TM 发生故障。参与者 RM 会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)


问题二 :数据不一致


数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送 commit 请求之后,发生了局部网络异常或者在发送 commit 请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了 commit 请求。而在这部分参与者接到 commit 请求之后就会执行 commit 操作。但是其他部分未接到 commit 请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。


如何解决?


解决的方案简单,就是我们在事务的操作的每一步,我们都需要对事务状态的日志进行人为的记录,我们可以把日志记录存储在我们想存储的地方,可以是本地存储,也可以中心化的存储。atomikos 的开源版本,我们之前也分析了,它是使用内存 + file 的方式,存储在本地,这样的话,如果在一个集群系统里面,如果有节点宕机,日志又存储在本地,所以事务不能及时的恢复(需要重启服务)。


Atomikos 多场景下事务恢复。


Atomikos 提供了二种方式,来应对不同场景下的异常情况。


  • 场景一:服务节点不宕机,因为其他的原因,产生需要事务恢复的情况。这个时候才要定时任务进行恢复。

  • 具体的代码 com.atomikos.icatch.imp.TransactionServiceImp.init() 方法,会初始化一个定时任务,进行事务的恢复。


public synchronized void init ( Properties properties ) throws SysException    {        shutdownInProgress_ = false;        control_ = new com.atomikos.icatch.admin.imp.LogControlImp ( (AdminLog) this.recoveryLog );        ConfigProperties configProperties = new ConfigProperties(properties);        long recoveryDelay = configProperties.getRecoveryDelay();         recoveryTimer = new PooledAlarmTimer(recoveryDelay);          recoveryTimer.addAlarmTimerListener(new AlarmTimerListener() {
@Override public void alarm(AlarmTimer timer) { //进行事务恢复 performRecovery();
}

});
TaskManager.SINGLETON.executeTask(recoveryTimer);
initialized_ = true;

}
复制代码


  • 最终会进入com.atomikos.datasource.xa.XATransactionalResource.recover() 方法。


   public void recover() {        XaResourceRecoveryManager xaResourceRecoveryManager = XaResourceRecoveryManager.getInstance();        if (xaResourceRecoveryManager != null) { //null for LogCloud recovery            try {                xaResourceRecoveryManager.recover(getXAResource());            } catch (Exception e) {                refreshXAResource(); //cf case 156968            }
} }
复制代码


  • 场景二: 当服务节点宕机重启动过程中进行事务的恢复。具体实现在com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面


 @Override    public void setRecoveryService ( RecoveryService recoveryService )            throws ResourceException    {
if ( recoveryService != null ) { if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Installing recovery service on resource " + getName () ); this.branchIdentifier=recoveryService.getName(); //进行事务恢复 recover(); }
}
复制代码


com.atomikos.datasource.xa.XATransactionalResource.recover() 流程详解。


    public void recover(XAResource xaResource) throws XAException {      // 根据XA recovery 协议获取 xid        List<XID> xidsToRecover = retrievePreparedXidsFromXaResource(xaResource);        Collection<XID> xidsToCommit;        try {             // xid 与日志记录的xid进行匹配            xidsToCommit = retrieveExpiredCommittingXidsFromLog();            for (XID xid : xidsToRecover) {                if (xidsToCommit.contains(xid)) {            //执行 XA commit xid 进行提交                                     replayCommit(xid, xaResource);                } else {                    attemptPresumedAbort(xid, xaResource);                }            }        } catch (LogException couldNotRetrieveCommittingXids) {            LOGGER.logWarning("Transient error while recovering - will retry later...", couldNotRetrieveCommittingXids);        }    }
复制代码


  • 我们来看一下如何根据 XA recovery 协议获取RM端存储的xid。进入方法 retrievePreparedXidsFromXaResource(xaResource), 最后进入 com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。


public static List<XID> recoverXids(XAResource xaResource, XidSelector selector) throws XAException {        List<XID> ret = new ArrayList<XID>();
boolean done = false; int flags = XAResource.TMSTARTRSCAN; Xid[] xidsFromLastScan = null; List<XID> allRecoveredXidsSoFar = new ArrayList<XID>(); do { xidsFromLastScan = xaResource.recover(flags); flags = XAResource.TMNOFLAGS; done = (xidsFromLastScan == null || xidsFromLastScan.length == 0); if (!done) {
// TEMPTATIVELY SET done TO TRUE // TO TOLERATE ORACLE 8.1.7 INFINITE // LOOP (ALWAYS RETURNS SAME RECOVER // SET). IF A NEW SET OF XIDS IS RETURNED // THEN done WILL BE RESET TO FALSE
done = true; for ( int i = 0; i < xidsFromLastScan.length; i++ ) { XID xid = new XID ( xidsFromLastScan[i] ); // our own XID implements equals and hashCode properly if (!allRecoveredXidsSoFar.contains(xid)) { // a new xid is returned -> we can not be in a recovery loop -> go on allRecoveredXidsSoFar.add(xid); done = false; if (selector.selects(xid)) { ret.add(xid); } } } } } while (!done);
return ret; }
复制代码


  • 我们重点关注xidsFromLastScan = xaResource.recover(flags); 这个方法,如果我们使用 MySQL,那么久会进入 MysqlXAConnection.recover()方法。执行 XA recovery xid 语句来获取 xid


 protected static Xid[] recover(Connection c, int flag) throws XAException {        /*         * The XA RECOVER statement returns information for those XA transactions on the MySQL server that are in the PREPARED state. (See Section 13.4.7.2, ???XA         * Transaction States???.) The output includes a row for each such XA transaction on the server, regardless of which client started it.         *          * XA RECOVER output rows look like this (for an example xid value consisting of the parts 'abc', 'def', and 7):         *          * mysql> XA RECOVER;         * +----------+--------------+--------------+--------+         * | formatID | gtrid_length | bqual_length | data |         * +----------+--------------+--------------+--------+         * | 7 | 3 | 3 | abcdef |         * +----------+--------------+--------------+--------+         *          * The output columns have the following meanings:         *          * formatID is the formatID part of the transaction xid         * gtrid_length is the length in bytes of the gtrid part of the xid         * bqual_length is the length in bytes of the bqual part of the xid         * data is the concatenation of the gtrid and bqual parts of the xid         */
boolean startRscan = ((flag & TMSTARTRSCAN) > 0); boolean endRscan = ((flag & TMENDRSCAN) > 0);
if (!startRscan && !endRscan && flag != TMNOFLAGS) { throw new MysqlXAException(XAException.XAER_INVAL, Messages.getString("MysqlXAConnection.001"), null); }
// // We return all recovered XIDs at once, so if not TMSTARTRSCAN, return no new XIDs // // We don't attempt to maintain state to check for TMNOFLAGS "outside" of a scan //
if (!startRscan) { return new Xid[0]; }
ResultSet rs = null; Statement stmt = null;
List<MysqlXid> recoveredXidList = new ArrayList<MysqlXid>();
try { // TODO: Cache this for lifetime of XAConnection stmt = c.createStatement();
rs = stmt.executeQuery("XA RECOVER");
while (rs.next()) { final int formatId = rs.getInt(1); int gtridLength = rs.getInt(2); int bqualLength = rs.getInt(3); byte[] gtridAndBqual = rs.getBytes(4);
final byte[] gtrid = new byte[gtridLength]; final byte[] bqual = new byte[bqualLength];
if (gtridAndBqual.length != (gtridLength + bqualLength)) { throw new MysqlXAException(XAException.XA_RBPROTO, Messages.getString("MysqlXAConnection.002"), null); }
System.arraycopy(gtridAndBqual, 0, gtrid, 0, gtridLength); System.arraycopy(gtridAndBqual, gtridLength, bqual, 0, bqualLength);
recoveredXidList.add(new MysqlXid(gtrid, bqual, formatId)); } } catch (SQLException sqlEx) { throw mapXAExceptionFromSQLException(sqlEx); } finally { if (rs != null) { try { rs.close(); } catch (SQLException sqlEx) { throw mapXAExceptionFromSQLException(sqlEx); } }
if (stmt != null) { try { stmt.close(); } catch (SQLException sqlEx) { throw mapXAExceptionFromSQLException(sqlEx); } } }
int numXids = recoveredXidList.size();
Xid[] asXids = new Xid[numXids]; Object[] asObjects = recoveredXidList.toArray();
for (int i = 0; i < numXids; i++) { asXids[i] = (Xid) asObjects[i]; }
return asXids; }
复制代码


  • 这里要注意如果Mysql的版本 <5.7.7 ,则不会有任何数据,在以后的版本中Mysql进行了修复,因此如果我们想要使用MySQL充当RM,版本必须 >= 5.7.7 ,原因是:


MySQL 5.6 版本在客户端退出的时候,自动把已经 prepare 的事务回滚了,那么 MySQL 为什么要这样做?这主要取决于 MySQL 的内部实现,MySQL 5.7 以前的版本,对于 prepare 的事务,MySQL 是不会记录 binlog 的(官方说是减少 fsync,起到了优化的作用)。只有当分布式事务提交的时候才会把前面的操作写入 binlog 信息,所以对于 binlog 来说,分布式事务与普通的事务没有区别,而 prepare 以前的操作信息都保存在连接的 IO_CACHE 中,如果这个时候客户端退出了,以前的 binlog 信息都会被丢失,再次重连后允许提交的话,会造成 Binlog 丢失,从而造成主从数据的不一致,所以官方在客户端退出的时候直接把已经 prepare 的事务都回滚了!


  • 回到主线,假设我们获取到需要进行事务恢复的 XID,再从自己记录的事务日志里面获取 XID,如果前者包含在后者之中,则进行 commit,否则进行 rollback.


List<XID> xidsToRecover = retrievePreparedXidsFromXaResource(xaResource);        Collection<XID> xidsToCommit;        try {            xidsToCommit = retrieveExpiredCommittingXidsFromLog();            for (XID xid : xidsToRecover) {                if (xidsToCommit.contains(xid)) {                    replayCommit(xid, xaResource);                } else {                    attemptPresumedAbort(xid, xaResource);                }            }        } catch (LogException couldNotRetrieveCommittingXids) {            LOGGER.logWarning("Transient error while recovering - will retry later...", couldNotRetrieveCommittingXids);        }
复制代码


  • replayCommit 方法如下:


private void replayCommit(XID xid, XAResource xaResource) {        if (LOGGER.isDebugEnabled()) LOGGER.logDebug("Replaying commit of xid: " + xid);        try {            xaResource.commit(xid, false);            log.terminated(xid);        } catch (XAException e) {            if (alreadyHeuristicallyTerminatedByResource(e)) {                handleHeuristicTerminationByResource(xid, xaResource, e, true);            } else if (xidTerminatedInResourceByConcurrentCommit(e)) {                log.terminated(xid);            } else {                LOGGER.logWarning("Transient error while replaying commit - will retry later...", e);            }        }    }
复制代码


  • attemptPresumedAbort(xid, xaResource); 方法如下:


private void attemptPresumedAbort(XID xid, XAResource xaResource) {        try {            log.presumedAborting(xid);            if (LOGGER.isDebugEnabled()) LOGGER.logDebug("Presumed abort of xid: " + xid);            try {                xaResource.rollback(xid);                log.terminated(xid);             } catch (XAException e) {                if (alreadyHeuristicallyTerminatedByResource(e)) {                    handleHeuristicTerminationByResource(xid, xaResource, e, false);                } else if (xidTerminatedInResourceByConcurrentRollback(e)) {                    log.terminated(xid);                } else {                    LOGGER.logWarning("Unexpected exception during recovery - ignoring to retry later...", e);                }            }        } catch (IllegalStateException presumedAbortNotAllowedInCurrentLogState) {            // ignore to retry later if necessary        } catch (LogException logWriteException) {            LOGGER.logWarning("log write failed for Xid: "+xid+", ignoring to retry later", logWriteException);        }    }
复制代码


文章到此,已经写的很长很多了,我们分析了 ShardingSphere 对于 XA 方案,提供了一套 SPI 解决方案,对 Atomikos 进行了整合,也分析了 Atomikos 初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程,事务恢复流程。


希望对大家理解 XA 的原理有所帮助。


作者介绍


肖宇,Apache ShardingSphere Committer,开源 hmily 分布式事务框架作者,开源 soul 网关作者,热爱开源,追求写优雅代码。目前就职于京东数科,参与 ShardingSphere 的开源建设,以及分布式数据库的研发工作。


本文转载自公众号 ShardingSphere 官微(ID:Sharding-Sphere)。


原文链接


Shardingsphere整合Atomikos对XA分布式事务的支持(2)


2020-11-30 10:051613

评论

发布
暂无评论
发现更多内容

模块五-微博评论的高性能高可用计算架构

娜酱

「架构实战营」

架构实战训练营模块 5 作业

Sonichen

这几种Java异常处理方法,你会吗?

华为云开发者联盟

Java 数组 异常 程序

Prometheus 基础查询(三)范围向量和 PromQL 的缺陷

耳东@Erdong

Prometheus 10月月更

【Promise 源码学习】目录 - Promise 知识点梳理

Brave

源码 Promise 10月月更

架构实战营模块五作业 - 设计微博系统中”微博评论“的高性能高可用计算架构

李焕之

模块5作业

4anonymous

微博评论高性能高可用架构设计

Geek_db27b5

为什么常用二倍图,流式布局中一倍图是否靠得住

你好bk

css3 大前端 html/css 页面布局

在线EXCEL文件数据转换解析工具

入门小站

工具

(model5)微博评论高性能高可用计算架构

消失的子弹

架构 微服务

声网教育aPaaS 产品灵动课堂:「低代码」开发,15分钟极速上线

声网

人工智能 大数据 云服务

微博评论高性能高可用计算架构

毛先生

微博系统中的微博评论架构分析

眼镜盒子

「架构实战营」

架构实战营第五次作业

Geek_d18264

架构实战营

linux之grep使用技巧

入门小站

Linux

技术人在职场如何摆正心态

baiyutang

职场 10月月更

看动画学算法之:平衡二叉搜索树AVL Tree

程序那些事

数据结构 算法 二叉树 程序那些事

创建线程池学习笔记

风翱

线程池 10月月更

学习心得 - 架构训练营 - 第五课

Fm

构建全屏 Web 应用程序

devpoint

JavaScript html5 大前端 10月月更

架构训练营 模块五

Leach Sun

微博评论背后的高性能高可用计算架构

Nico

架构:微内核架构(Microkernel Architecture)

程序员架构进阶

架构 微内核 插件化 10月月更

”微博评论“的高性能高可用计算架构

Sky

「架构实战营」

阿里开源的这个库,让 Excel 导出不再复杂(填充模板的使用指南)

看山

Java EasyExcel 10月月更

【LeetCode】外观数列Java题解

Albert

算法 LeetCode 10月月更

架构设计系列五 如何设计业务高性能高可用计算架构

nydia

阿里开源的这个库,让 Excel 导出不再复杂(既要能写,还要写的好看)

看山

Java EasyExcel 10月月更

微博评论架构设计

Yina🌝很浪🌊

作业五:微博评论高性能高可用架构设计

紫云

架构实战营

Shardingsphere整合Atomikos对XA分布式事务的支持(2)_开源_肖宇_InfoQ精选文章