Java架构师方案—分布式事务2PC方案Atomikos(附完整项目代码)
1. 分布式事务2PC原理
事务有哪些基本特性(ACID)?- 原子性:一个业务过程中,事务内的多个数据变更操作要么全部成功,要么全部失败。
- 一致性:指的是业务的正确性,业务数据的一致性;例如,银行转账业务,从账户A转10元到账户B,最后结果肯定是B账户多了10元,A账户少了10元;不能出现A金额没少但B金额多了的结果。
- 隔离性:多个并发事务之间相互隔离,不能互相干扰。简单提一下事务的隔离机制和事务并发引起的问题
- 隔离机制:读未提交,读已提交,可重复读,顺序读写
- 事务并发问题:脏读,幻读(虚读),不可重复读。
- 持久性:事务完成后,对数据库的更改是永久保存的,不能回滚。
两阶段提交(two phrase commit),不言而喻,是指分布式事务工作流程分为两个阶段。对于单体架构的单个数据库的事务,我们会开启事务管理器,并在需要事务的方法上声明事务注解并确定事务元数据,本地事务是基于数据库资源层实现。
2PC分布式事务解决方案,是基于资源层来实现,其中两个阶段分别是:预备阶段,提交阶段。
- 预备阶段:数据库开启事务,执行业务sql,但是数据库事务并没有提交,而是等待事务协调器commit命令或rollback命令。
- 提交阶段:数据库收到commit命令后提交事务,或者收到rollback命令后回滚数据库。
两阶段提交方案中,有两种角色:事务协调者和事务参与者。
- 事务协调者:协调者角色主要负责协调不同数据库事务执行情况,保证跨数据库业务中事务的原子性,事务协调者根据预备阶段的的执行情况,来决定提交阶段发送commit命令还是rollback命令。
- 事务参与者:事务参与者主要是指资源层的数据库,数据库要支持两阶段提交分布式事务方案,必须要实现XA协议。
参考流程图如下:
状态图分析:
- 事务协调者:预备阶段,协调者发送预备命令给参与者,并进入等待状态,直到返回执行结果。
- 事务参与者:执行完预备阶段,返回协调者执行结果后进入等待状态,直到协调者最后发送的提交或回滚命令。
- 协调者判断:协调者收集到所有的参与者返回的结果后,判断所有结果是否都是成功,成功则发送给所有参与者提交命令,否则发送回滚命令。
2. springboot集成Atomikos
配置数据库参数在properties文件中配置两个不同的数据库参数
##Spring表数据库配置
spring.jta.atomikos.datasource.spring.max-pool-size=25
spring.jta.atomikos.datasource.spring.min-pool-size=3
spring.jta.atomikos.datasource.spring.max-lifetime=20000
spring.jta.atomikos.datasource.spring.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.spring.unique-resource-name=spring
spring.jta.atomikos.datasource.spring.xa-properties.url=jdbc:mysql://url2:3306/spring?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
spring.jta.atomikos.datasource.spring.xa-properties.username=xxx
spring.jta.atomikos.datasource.spring.xa-properties.password=xxx
spring.jta.atomikos.datasource.spring.xa-properties.driverClassName=com.mysql.jdbc.Driver
# 初始化大小,最小,最大
spring.jta.atomikos.datasource.spring.xa-properties.initialSize=10
spring.jta.atomikos.datasource.spring.xa-properties.minIdle=20
spring.jta.atomikos.datasource.spring.xa-properties.maxActive=100
## 配置获取连接等待超时的时间
spring.jta.atomikos.datasource.spring.xa-properties.maxWait=60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.jta.atomikos.datasource.spring.xa-properties.timeBetweenEvictionRunsMillis=60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
spring.jta.atomikos.datasource.spring.xa-properties.minEvictableIdleTimeMillis=300000
spring.jta.atomikos.datasource.spring.xa-properties.testWhileIdle=true
spring.jta.atomikos.datasource.spring.xa-properties.testOnBorrow=false
spring.jta.atomikos.datasource.spring.xa-properties.testOnReturn=false
# 打开PSCache,并且指定每个连接上PSCache的大小
spring.jta.atomikos.datasource.spring.xa-properties.poolPreparedStatements=true
spring.jta.atomikos.datasource.spring.xa-properties.maxPoolPreparedStatementPerConnectionSize=20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.jta.atomikos.datasource.spring.xa-properties.filters=stat,slf4j,wall
spring.jta.atomikos.datasource.spring.xa-data-source-class-name=com.alibaba.druid.pool.xa.DruidXADataSource
#------------------------------ 分隔符-------------------------------------
##test表数据库配置
spring.jta.atomikos.datasource.test.max-pool-size=25
spring.jta.atomikos.datasource.test.min-pool-size=3
spring.jta.atomikos.datasource.test.max-lifetime=20000
spring.jta.atomikos.datasource.test.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.test.unique-resource-name=test
spring.jta.atomikos.datasource.test.xa-properties.url=jdbc:mysql://url1:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
spring.jta.atomikos.datasource.test.xa-properties.username=xxx
spring.jta.atomikos.datasource.test.xa-properties.password=xxx
spring.jta.atomikos.datasource.test.xa-properties.driverClassName=com.mysql.jdbc.Driver
spring.jta.atomikos.datasource.test.xa-properties.initialSize=10
spring.jta.atomikos.datasource.test.xa-properties.minIdle=20
spring.jta.atomikos.datasource.test.xa-properties.maxActive=100
spring.jta.atomikos.datasource.test.xa-properties.maxWait=60000
spring.jta.atomikos.datasource.test.xa-properties.timeBetweenEvictionRunsMillis=60000
spring.jta.atomikos.datasource.test.xa-properties.minEvictableIdleTimeMillis=300000
spring.jta.atomikos.datasource.test.xa-properties.testWhileIdle=true
spring.jta.atomikos.datasource.test.xa-properties.testOnBorrow=false
spring.jta.atomikos.datasource.test.xa-properties.testOnReturn=false
spring.jta.atomikos.datasource.test.xa-properties.poolPreparedStatements=true
spring.jta.atomikos.datasource.test.xa-properties.maxPoolPreparedStatementPerConnectionSize=20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.jta.atomikos.datasource.test.xa-properties.filters=stat,slf4j,wall
spring.jta.atomikos.datasource.test.xa-data-source-class-name=com.alibaba.druid.pool.xa.DruidXADataSource
#jta相关参数配置
#spring.jta.log-dir=classpath:transaction-logs
#spring.jta.transaction-manager-id=txManager
两个数据源
在配置类MybatisConfiguration中创建数据源bean:SpringDataSource和TestDataSource。
@Configuration
@EnableConfigurationProperties
@EnableTransactionManagement(proxyTargetClass = true)
public class MybatisConfiguration {
/**
* spring数据库配置前缀.
*/
final static String SPRING_PREFIX = "spring.jta.atomikos.datasource.spring";
/**
* test数据库配置前缀.
*/
final static String TEST_PREFIX = "spring.jta.atomikos.datasource.test";
/**
* The constant logger.
*/
final static Logger logger = LoggerFactory.getLogger(MybatisConfiguration.class);
/**
* 配置Spring数据库的数据源
*
* @return the data source
*/
@Bean(name = "SpringDataSource")
@ConfigurationProperties(prefix = SPRING_PREFIX) // application.properties中对应属性的前缀
public DataSource springDataSource() {
return new AtomikosDataSourceBean();
}
/**
* 配置Test数据库的数据源
*
* @return the data source
*/
@Bean(name = "TestDataSource")
@ConfigurationProperties(prefix = TEST_PREFIX) // application.properties中对应属性的前缀
public DataSource testDataSource() {
return new AtomikosDataSourceBean();
}
}
两个Session工厂对象
mybatis中重要的组件是Session工厂,每个数据库都有个固定的session工厂,应用与数据库交互前需要向对应的工厂申请资源。两个session工厂配置类分别生成两种不同数据库的session工厂。
@Configuration
@MapperScan(basePackages = {"com.jta.mapper.spring"}, sqlSessionFactoryRef = "springSqlSessionFactory")
public class SpringDataSourceConfiguration {
/**
* The constant MAPPER_XML_LOCATION.
*/
public static final String MAPPER_XML_LOCATION = "classpath:mapper/spring/*.xml";
/**
* The Open plat form data source.
*/
@Autowired
@Qualifier("SpringDataSource")
DataSource springDataSource;
/**
* 配置Sql Session模板
*
* @return the sql session template
* @throws Exception the exception
*/
@Bean
public SqlSessionTemplate springSqlSessionTemplate() throws Exception {
return new SqlSessionTemplate(springSqlSessionFactory());
}
/**
* 配置SQL Session工厂
*
* @return the sql session factory
* @throws Exception the exception
*/
@Bean
public SqlSessionFactory springSqlSessionFactory() throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(springDataSource);
//指定XML文件路径
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_XML_LOCATION));
return factoryBean.getObject();
}
}
另外一个session工厂配置类,请查看本项目完整代码。
测试分布式事务测试,启动应用,使用postman进行测试。
在postman中输入地址:localhost:8080/insertPeopleAndUser?peopleName=jack&userName=helloworld001,访问结果如图所示
people数据插入失败,导致回滚了,验证user数据是否也成功回滚了?
helloworld001数据没有插入user表。回滚成功!
到此,springboot项目就成功整合了Atomikos框架实现分布式事务的2PC方案,详细代码请参考我的完整项目。
3. 2PC分布式事务的问题及局限性
性能低效:2PC方案的两种角色都会阻塞等待,直到返回执行结果或收到命令,事务锁会一直积压,导致数据库性能会非常差。
单点故障:事务协调者是整个XA模型的核心,一旦事务协调者节点挂掉,会导致参与者收不到提交或回滚的通知,从而导致参与者节点始终处于事务无法完成的中间状态。
数据不一致:第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就会导致节点间数据的不一致问题。
对于2PC方案,是基于资源层的分布式事务解决方案,常用于单体应用操作不同数据库的场景。对于微服务架构就不适用了,大家选择方案的时候还要根据具体情况来做。
本文是有已经实现的完整项目,读者可以先收藏起来,以后可以拿来即用。关注“前沿科技“公众后,发送“两阶段提交“消息获得项目地址。
还要配置状态转移信息以及事件处理器逻辑的开发。完整的demo项目,请关注公众号“前沿科技bot“并发送"状态机"获取。
- 本文标签: Java Spring Boot Spring
- 版权声明: 本站原创文章,于2020年06月15日由空白发布,转载请注明出处