Java架构师方案—分布式事务2PC方案Atomikos(附完整项目代码)


作者:空白

1. 分布式事务2PC原理

事务有哪些基本特性(ACID)?

  • 原子性:一个业务过程中,事务内的多个数据变更操作要么全部成功,要么全部失败。

  • 一致性:指的是业务的正确性,业务数据的一致性;例如,银行转账业务,从账户A转10元到账户B,最后结果肯定是B账户多了10元,A账户少了10元;不能出现A金额没少但B金额多了的结果。

  • 隔离性:多个并发事务之间相互隔离,不能互相干扰。简单提一下事务的隔离机制和事务并发引起的问题

    • 隔离机制:读未提交,读已提交,可重复读,顺序读写

    • 事务并发问题:脏读,幻读(虚读),不可重复读。

  • 持久性:事务完成后,对数据库的更改是永久保存的,不能回滚。

** 什么是2PC?**

两阶段提交(two phrase commit),不言而喻,是指分布式事务工作流程分为两个阶段。对于单体架构的单个数据库的事务,我们会开启事务管理器,并在需要事务的方法上声明事务注解并确定事务元数据,本地事务是基于数据库资源层实现。

2PC分布式事务解决方案,是基于资源层来实现,其中两个阶段分别是:预备阶段,提交阶段。

  • 预备阶段:数据库开启事务,执行业务sql,但是数据库事务并没有提交,而是等待事务协调器commit命令或rollback命令。

  • 提交阶段:数据库收到commit命令后提交事务,或者收到rollback命令后回滚数据库。

两阶段提交方案中,有两种角色:事务协调者和事务参与者。

  • 事务协调者:协调者角色主要负责协调不同数据库事务执行情况,保证跨数据库业务中事务的原子性,事务协调者根据预备阶段的的执行情况,来决定提交阶段发送commit命令还是rollback命令。

  • 事务参与者:事务参与者主要是指资源层的数据库,数据库要支持两阶段提交分布式事务方案,必须要实现XA协议。

参考流程图如下:

alt

状态图分析:

  • 事务协调者:预备阶段,协调者发送预备命令给参与者,并进入等待状态,直到返回执行结果。

  • 事务参与者:执行完预备阶段,返回协调者执行结果后进入等待状态,直到协调者最后发送的提交或回滚命令。

  • 协调者判断:协调者收集到所有的参与者返回的结果后,判断所有结果是否都是成功,成功则发送给所有参与者提交命令,否则发送回滚命令。

2. springboot集成Atomikos

配置数据库参数

在properties文件中配置两个不同的数据库参数

##Spring表数据库配置
spring.jta.atomikos.datasource.spring.max-pool-size=25
spring.jta.atomikos.datasource.spring.min-pool-size=3
spring.jta.atomikos.datasource.spring.max-lifetime=20000
spring.jta.atomikos.datasource.spring.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.spring.unique-resource-name=spring
spring.jta.atomikos.datasource.spring.xa-properties.url=jdbc:mysql://url2:3306/spring?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
spring.jta.atomikos.datasource.spring.xa-properties.username=xxx
spring.jta.atomikos.datasource.spring.xa-properties.password=xxx
spring.jta.atomikos.datasource.spring.xa-properties.driverClassName=com.mysql.jdbc.Driver
# 初始化大小,最小,最大
spring.jta.atomikos.datasource.spring.xa-properties.initialSize=10
spring.jta.atomikos.datasource.spring.xa-properties.minIdle=20
spring.jta.atomikos.datasource.spring.xa-properties.maxActive=100
## 配置获取连接等待超时的时间
spring.jta.atomikos.datasource.spring.xa-properties.maxWait=60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.jta.atomikos.datasource.spring.xa-properties.timeBetweenEvictionRunsMillis=60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
spring.jta.atomikos.datasource.spring.xa-properties.minEvictableIdleTimeMillis=300000
spring.jta.atomikos.datasource.spring.xa-properties.testWhileIdle=true
spring.jta.atomikos.datasource.spring.xa-properties.testOnBorrow=false
spring.jta.atomikos.datasource.spring.xa-properties.testOnReturn=false
# 打开PSCache,并且指定每个连接上PSCache的大小
spring.jta.atomikos.datasource.spring.xa-properties.poolPreparedStatements=true
spring.jta.atomikos.datasource.spring.xa-properties.maxPoolPreparedStatementPerConnectionSize=20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.jta.atomikos.datasource.spring.xa-properties.filters=stat,slf4j,wall
spring.jta.atomikos.datasource.spring.xa-data-source-class-name=com.alibaba.druid.pool.xa.DruidXADataSource
#------------------------------ 分隔符-------------------------------------
##test表数据库配置
spring.jta.atomikos.datasource.test.max-pool-size=25
spring.jta.atomikos.datasource.test.min-pool-size=3
spring.jta.atomikos.datasource.test.max-lifetime=20000
spring.jta.atomikos.datasource.test.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.test.unique-resource-name=test
spring.jta.atomikos.datasource.test.xa-properties.url=jdbc:mysql://url1:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
spring.jta.atomikos.datasource.test.xa-properties.username=xxx
spring.jta.atomikos.datasource.test.xa-properties.password=xxx
spring.jta.atomikos.datasource.test.xa-properties.driverClassName=com.mysql.jdbc.Driver
spring.jta.atomikos.datasource.test.xa-properties.initialSize=10
spring.jta.atomikos.datasource.test.xa-properties.minIdle=20
spring.jta.atomikos.datasource.test.xa-properties.maxActive=100
spring.jta.atomikos.datasource.test.xa-properties.maxWait=60000
spring.jta.atomikos.datasource.test.xa-properties.timeBetweenEvictionRunsMillis=60000
spring.jta.atomikos.datasource.test.xa-properties.minEvictableIdleTimeMillis=300000
spring.jta.atomikos.datasource.test.xa-properties.testWhileIdle=true
spring.jta.atomikos.datasource.test.xa-properties.testOnBorrow=false
spring.jta.atomikos.datasource.test.xa-properties.testOnReturn=false
spring.jta.atomikos.datasource.test.xa-properties.poolPreparedStatements=true
spring.jta.atomikos.datasource.test.xa-properties.maxPoolPreparedStatementPerConnectionSize=20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.jta.atomikos.datasource.test.xa-properties.filters=stat,slf4j,wall
spring.jta.atomikos.datasource.test.xa-data-source-class-name=com.alibaba.druid.pool.xa.DruidXADataSource
#jta相关参数配置
#spring.jta.log-dir=classpath:transaction-logs
#spring.jta.transaction-manager-id=txManager

两个数据源

在配置类MybatisConfiguration中创建数据源bean:SpringDataSource和TestDataSource。


@Configuration
@EnableConfigurationProperties
@EnableTransactionManagement(proxyTargetClass = true)
public class MybatisConfiguration {
    /**
     * spring数据库配置前缀.
     */
    final static String SPRING_PREFIX = "spring.jta.atomikos.datasource.spring";
    /**
     * test数据库配置前缀.
     */
    final static String TEST_PREFIX = "spring.jta.atomikos.datasource.test";

    /**
     * The constant logger.
     */
    final static Logger logger = LoggerFactory.getLogger(MybatisConfiguration.class);





    /**
     * 配置Spring数据库的数据源
     *
     * @return the data source
     */
    @Bean(name = "SpringDataSource")
    @ConfigurationProperties(prefix = SPRING_PREFIX)  // application.properties中对应属性的前缀
    public DataSource springDataSource() {
        return new AtomikosDataSourceBean();
    }

    /**
     * 配置Test数据库的数据源
     *
     * @return the data source
     */
    @Bean(name = "TestDataSource")
    @ConfigurationProperties(prefix = TEST_PREFIX)  // application.properties中对应属性的前缀
    public DataSource testDataSource() {
        return new AtomikosDataSourceBean();
    }
}

两个Session工厂对象

mybatis中重要的组件是Session工厂,每个数据库都有个固定的session工厂,应用与数据库交互前需要向对应的工厂申请资源。两个session工厂配置类分别生成两种不同数据库的session工厂。

@Configuration
@MapperScan(basePackages = {"com.jta.mapper.spring"}, sqlSessionFactoryRef = "springSqlSessionFactory")
public class SpringDataSourceConfiguration {
    /**
     * The constant MAPPER_XML_LOCATION.
     */
    public static final String MAPPER_XML_LOCATION = "classpath:mapper/spring/*.xml";

    /**
     * The Open plat form data source.
     */
    @Autowired
    @Qualifier("SpringDataSource")
    DataSource springDataSource;

    /**
     * 配置Sql Session模板
     *
     * @return the sql session template
     * @throws Exception the exception
     */
    @Bean
    public SqlSessionTemplate springSqlSessionTemplate() throws Exception {
        return new SqlSessionTemplate(springSqlSessionFactory());
    }

    /**
     * 配置SQL Session工厂
     *
     * @return the sql session factory
     * @throws Exception the exception
     */
    @Bean
    public SqlSessionFactory springSqlSessionFactory() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(springDataSource);
        //指定XML文件路径
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_XML_LOCATION));
        return factoryBean.getObject();
    }
}

另外一个session工厂配置类,请查看本项目完整代码。

测试

分布式事务测试,启动应用,使用postman进行测试。

在postman中输入地址:localhost:8080/insertPeopleAndUser?peopleName=jack&userName=helloworld001,访问结果如图所示

alt

people数据插入失败,导致回滚了,验证user数据是否也成功回滚了?

alt

helloworld001数据没有插入user表。回滚成功!

到此,springboot项目就成功整合了Atomikos框架实现分布式事务的2PC方案,详细代码请参考我的完整项目。

3. 2PC分布式事务的问题及局限性

性能低效:2PC方案的两种角色都会阻塞等待,直到返回执行结果或收到命令,事务锁会一直积压,导致数据库性能会非常差。

单点故障:事务协调者是整个XA模型的核心,一旦事务协调者节点挂掉,会导致参与者收不到提交或回滚的通知,从而导致参与者节点始终处于事务无法完成的中间状态。

数据不一致:第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就会导致节点间数据的不一致问题。

对于2PC方案,是基于资源层的分布式事务解决方案,常用于单体应用操作不同数据库的场景。对于微服务架构就不适用了,大家选择方案的时候还要根据具体情况来做。

本文是有已经实现的完整项目,读者可以先收藏起来,以后可以拿来即用。关注“前沿科技“公众后,发送“两阶段提交“消息获得项目地址。

转载这篇文章需要标注作者和出处:空白-bittechblog

欢迎加入技术群:获得更多java,springboot,redis,kafka圈的好友,共图技术未来 点击获取技术群二维码

还要配置状态转移信息以及事件处理器逻辑的开发。完整的demo项目,请关注公众号“前沿科技bot“并发送"状态机"获取。

alt

扫码或搜索:前沿科技
发送 290992
即可立即永久解锁本站全部文章