分布式事务管理Atomikos

目录

XA协议

Atomikos介绍

atomikos_demo

Spring对分布式事务的支持

基于spring实现

事务执行流程


XA协议

  • 用于支持单个服务操作多个服务源,XA是由X/Open组织提出的分布式事务的规范。
  • XA规范主要定义了(全局)事务管理器(TM:Transaction Manager)和(局部)资源管理器(RM:Resource Manager)之间的接口。
  • 主流的关系型数据库产品都是实现了XA接口的。
  • XA接口是双向的系统接口,在TM:RM = 1:N之间形成通信桥梁
  • 在分布式系统中,从理论上来说,两台机器无法达成一致的状态,需要引入一个单点进行协调,即为XA引入TM的原因。TM一般使用XA两阶段提交协议与数据库进行交互

两阶段提交

  • 阶段一为准备阶段prepare,即所有的参与者准备执行事务并锁住需要的资源。当参与者Ready时,向TM 汇报自己已经准备好。以mysql数据库为例,事务管理器向所有涉及到的数据库服务器发出prepare“准备提交”请求,数据库收到请求后执行数据修改和日志记录等处理,处理完成后只是把事务的状态改成“可以提交”,然后把结果返回给事务管理器
  • 阶段二为提交阶段commit,TM根据阶段1各个RM prepare的结果,决定是提交还是回滚事务。如果所有的RM都prepare成功,那么TM通知所有的RM进行提交;如果有RM prepare失败的话,则TM通知所有RM回滚自己的事务分支。以mysql数据库为例,如果第一阶段中所有数据库都prepare成功,那么事务管理器向数据库服务器发出commit请求,数据库服务器把事务的"可以提交"状态改为"提交完成"状态,然后返回应答。如果在第一阶段内有任何一个数据库的操作发生了错误,或者事务管理器收不到某个数据库的回应,则认为事务失败,回撤所有数据库的事务。数据库服务器收不到第二阶段的确认提交请求,也会把"可以提交"的事务回撤。

图解

Atomikos介绍

Atomikos是一个非常流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。Tomcat应用服务器没有实现JTA规范,当使用Tomcat作为应用服务器的时候,需要使用第三方的事务管理器类来作为全局的事务管理器,而Atomikos框架就是这个作用,将事务管理整合到应用中,而不依赖于application server。

atomikos_demo

Atomikos核心类

1、数据源定义:com.atomikos.jdbc.AtomikosDataSourceBean

2、atomikos提供的javax.transaction.UserTransaction接口实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交、回滚事务。

实现代码

public class AtomikosExample {
    private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
        // 连接池基本属性
        Properties p = new Properties();
        p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
        p.setProperty("user", "root");
        p.setProperty("password", "root");

        // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        // atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
        ds.setUniqueResourceName(dbName);
        ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
        ds.setXaProperties(p);
        return ds;
    }

    public static void main(String[] args) {
        AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("test2022");
        AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("test2021");

        Connection conn1 = null;
        Connection conn2 = null;
        PreparedStatement ps1 = null;
        PreparedStatement ps2 = null;

        UserTransaction userTransaction = new UserTransactionImp();
        try {
            // 开启事务
            userTransaction.begin();

            // 执行db1上的sql
            conn1 = ds1.getConnection();
            ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
            ps1.setString(1, "zhangsan");
            ps1.executeUpdate();
            ResultSet generatedKeys = ps1.getGeneratedKeys();
            int userId = -1;
            while (generatedKeys.next()) {
                // 获得自动生成的userId
                userId = generatedKeys.getInt(1);
            }

            // 模拟异常 ,直接进入catch代码块,2个都不会提交
            // int i=1/0;

            // 执行db2上的sql
            conn2 = ds2.getConnection();
            ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
            ps2.setInt(1, userId);
            ps2.setDouble(2, 10000000);
            ps2.executeUpdate();

            // 两阶段提交
            userTransaction.commit();
        } catch (Exception e) {
            try {
                System.out.println("有异常,所有数据源都回滚");
                userTransaction.rollback();
            } catch (SystemException e1) {
                e1.printStackTrace();
            }
        } finally {
            try {
                ps1.close();
                ps2.close();
                conn1.close();
                conn2.close();
                ds1.close();
                ds2.close();
            } catch (Exception ignore) {
            }
        }
    }
}

注:新建的maven项目,是依赖Java JDK

需要手动导入javax

Spring对分布式事务的支持

使用其声明式事务管理功能来完成事务功能。一般使用的步骤如下:

1、配置事务管理器。spring提供了PlatformTransactionManager接口,其有2个重要的实现类。

  • DataSourceTransactionManager:用于支持本地事务,事实上,其内部也是通过操作java.sql.Connection来开启、提交和回滚事务。
  • JtaTransactionManager:用于支持分布式事务,其实现了JTA规范,使用XA协议进行两阶段提交。需要注意的是,这只是一个代理,我们需要为其提供一个JTA provider,一般是Java EE容器提供的事务协调器(Java EE server's transaction coordinator),也可以不依赖容器,配置一个本地的JTA provider。JTA规范:Java Transaction API,Java提供的基于XA规范的事务管理规范

2、在需要开启的事务的bean的方法上添加@Transitional注解

基于spring实现

配置文件,去配置分布式事务管理器JtaTransactionManager

一个事务方法,涉及两个数据源

public class JTAService {
    @Autowired
    private UserMapper userMapper;
    @Autowired
    private AccountMapper accountMapper;

    @Transactional(rollbackFor = Throwable.class)
    public void insert() {
        User user = new User();
        user.setName("张三");
        userMapper.insert(user);

        // 模拟异常
        // int i = 1 / 0;
        Account account = new Account();
        account.setUserId(user.getId());
        account.setMoney(123456789);
        accountMapper.insert(account);
    }
}

执行启动代码

ApplicationContext context = new ClassPathXmlApplicationContext("spring-atomikos.xml");
JTAService jtaService = context.getBean("jtaService", JTAService.class);
jtaService.insert();

事务执行流程

注:依赖spring版本4.3.7.RELEASE

事务注解被事务拦截器拦截后,调用代理对象的invoke方法org.springframework.transaction.interceptor.TransactionInterceptor#invoke

核心框架执行org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction

开启一个JTA事务:com.atomikos.icatch.jta.TransactionManagerImp#begin(int)

 事务初始状态

 底层管理

 


版权声明:本文为tec_1535原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>