欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

atomikos JTA/XA全局事务

发布时间:2024/3/24 编程问答 66 豆豆
生活随笔 收集整理的这篇文章主要介绍了 atomikos JTA/XA全局事务 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

Atomikos公司官方网址为:https://www.atomikos.com/。其旗下最著名的产品就是事务管理器。产品分两个版本:

TransactionEssentials:开源的免费产品

ExtremeTransactions:上商业版,需要收费。

这两个产品的关系如下图所示: 

TransactionEssentials:

1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:

    UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类

    TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager

    Transaction实现是com.atomikos.icatch.jta.TransactionImp

2、针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。

     在上一节我们讲解JTA规范时,提到过XADataSource、XAConnection等接口应该由资源管理器RM来实现,而Atomikos的作用是一个事务管理器(TM),并不需要提供对应的实现。而Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。封装XADataSource的实现类为AtomikosDataSourceBean。典型的XADataSource实现包括:

    1、mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

    2、阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource

    3、tomcat-jdbc连接池提供的org.apache.tomcat.jdbc.pool.XADataSource

    而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常: 

  • com.atomikos.jdbc.AtomikosSQLException: The class 'com.mchange.v2.c3p0.ComboPooledDataSource'
  •  specified by property 'xaDataSourceClassName' does not implement the required interface  
  •  javax.jdbc.XADataSource.  
  •  Please make sure the spelling is correct, and check your JDBC driver vendor's documentation.
  •    

    ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能:

    支持TCC:这是一种柔性事务

    支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。

    本文主要针对Atomikos开源版本的事务管理器实现TransactionEssentials进行讲解,包括:

    1、直接使用TransactionEssentials的API

    2、TransactionEssentials与spring、mybatis整合

    3、Atomikos配置详解

     

    直接使用TransactionEssentials的API

    在maven项目的pom文件中引入以下依赖:

  • <dependency>
  •     <groupId>com.atomikos</groupId>
  •     <artifactId>transactions-jdbc</artifactId>
  •     <version>4.0.6</version>
  • </dependency>
  • <dependency>
  •     <groupId>mysql</groupId>
  •     <artifactId>mysql-connector-java</artifactId>
  •     <version>5.1.39</version>
  • </dependency>
  • 新建mysql数据库表

    需要注意的是,在mysql中,只有innodb引擎才支持XA事务,所以这里显式的指定了数据库引擎为innodb。

  • -- 新建数据库db_user;
  • create database db_user;
  • -- 在db_user库中新建user表
  • create table db_user.user(id int AUTO_INCREMENT PRIMARY KEY,name varchar(50)) engine=innodb;
  • -- 新建数据库db_account;
  • create database db_account;
  • -- 在db_account库中新建account表
  • create table db_account.account(user_id int,money double) engine=innodb;
  • 另外,在本案例中,db_user库和db_account库是位于同一个mysql实例中的。 

           

    案例代码:

        在使用了事务管理器之后,我们通过atomikos提供的UserTransaction接口的实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交和回滚事务。而不再是使用java.sql.Connection中的setAutoCommit(false)的方式来开启事务。其他JTA规范中定义的接口,开发人员并不需要直接使用。 

  • import com.atomikos.icatch.jta.UserTransactionImp;
  • import com.atomikos.jdbc.AtomikosDataSourceBean;
  •  
  • import javax.transaction.SystemException;
  • import javax.transaction.UserTransaction;
  • import java.sql.Connection;
  • import java.sql.PreparedStatement;
  • import java.sql.ResultSet;
  • import java.sql.Statement;
  • import java.util.Properties;
  •  
  • public class AtomikosExample {
  •  
  •    private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
  •       // 连接池基本属性
  •       Properties p = new Properties();
  •       p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
  •       p.setProperty("user", "root");
  •       p.setProperty("password", "your password");
  •  
  •       // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
  •       AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
  •       //atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
  •       ds.setUniqueResourceName(dbName);
  •       ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
  •       ds.setXaProperties(p);
  •       return ds;
  •    }
  •  
  •    public static void main(String[] args) {
  •  
  •       AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
  •       AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
  •  
  •       Connection conn1 = null;
  •       Connection conn2 = null;
  •       PreparedStatement ps1 = null;
  •       PreparedStatement ps2 = null;
  •  
  •       UserTransaction userTransaction = new UserTransactionImp();
  •       try {
  •          // 开启事务
  •          userTransaction.begin();
  •  
  •          // 执行db1上的sql
  •          conn1 = ds1.getConnection();
  •          ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
  •          ps1.setString(1, "tianshouzhi");
  •          ps1.executeUpdate();
  •          ResultSet generatedKeys = ps1.getGeneratedKeys();
  •          int userId = -1;
  •          while (generatedKeys.next()) {
  •             userId = generatedKeys.getInt(1);// 获得自动生成的userId
  •          }
  •  
  •          // 模拟异常 ,直接进入catch代码块,2个都不会提交
  • //        int i=1/0;
  •  
  •          // 执行db2上的sql
  •          conn2 = ds2.getConnection();
  •          ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
  •          ps2.setInt(1, userId);
  •          ps2.setDouble(2, 10000000);
  •          ps2.executeUpdate();
  •  
  •          // 两阶段提交
  •          userTransaction.commit();
  •       } catch (Exception e) {
  •          try {
  •             e.printStackTrace();
  •             userTransaction.rollback();
  •          } catch (SystemException e1) {
  •             e1.printStackTrace();
  •          }
  •       } finally {
  •          try {
  •             ps1.close();
  •             ps2.close();
  •             conn1.close();
  •             conn2.close();
  •             ds1.close();
  •             ds2.close();
  •          } catch (Exception ignore) {
  •          }
  •       }
  •    }
  • }
  • 2、TransactionEssentials与spring、mybatis整合

    在pom中添加以下依赖 

  • <dependency>
  •     <groupId>org.springframework</groupId>
  •     <artifactId>spring-jdbc</artifactId>
  •     <version>4.3.7.RELEASE</version>
  • </dependency>
  • <dependency>
  •     <groupId>org.springframework</groupId>
  •     <artifactId>spring-context</artifactId>
  •     <version>4.3.7.RELEASE</version>
  • </dependency>
  • <dependency>
  •     <groupId>org.mybatis</groupId>
  •     <artifactId>mybatis</artifactId>
  •     <version>3.4.1</version>
  • </dependency>
  • <dependency>
  •     <groupId>org.mybatis</groupId>
  •     <artifactId>mybatis-spring</artifactId>
  •     <version>1.3.1</version>
  • </dependency>
  • 新建User实体

  • package com.tianshouzhi.atomikos;
  • public class User {
  •    private int id;
  •    private String name;
  •    // setters and getters
  • }
  • 新建Account实例

  • package com.tianshouzhi.atomikos;
  • public class Account {
  •    private int userId;
  •    private double money;
  •    // setters and getters
  • }
  • 新建UserMapper接口,为了方便,这里使用了mybatis 注解方式,没有编写映射文件,作用是一样的

  • package com.tianshouzhi.atomikos.mappers.db_user;
  • import org.apache.ibatis.annotations.Insert;
  • import com.tianshouzhi.atomikos.User;
  • import org.apache.ibatis.annotations.Options;
  • public interface UserMapper {
  •    @Insert("INSERT INTO user(id,name) VALUES(#{id},#{name})")
  •    @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")
  •    public void insert(User user);
  • }
  • 新建AccountMapper接口

  • package com.tianshouzhi.atomikos.mappers.ds_account;
  • import com.tianshouzhi.atomikos.Account;
  • import org.apache.ibatis.annotations.Insert;
  • public interface AccountMapper {
  •     @Insert("INSERT INTO account(user_id,money) VALUES(#{userId},#{money})")
  •     public void insert(Account account);
  • }
  • 新建使用JTA事务的bean,注意在使用jta事务的时候,依然可以使用spring的声明式事务管理

  • package com.tianshouzhi.atomikos;
  • import com.tianshouzhi.atomikos.mappers.db_user.UserMapper;
  • import com.tianshouzhi.atomikos.mappers.ds_account.AccountMapper;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.transaction.annotation.Transactional;
  • public class JTAService {
  •    @Autowired
  •    private UserMapper userMapper;//操作db_user库
  •    @Autowired
  •    private AccountMapper accountMapper;//操作db_account库
  •    @Transactional
  •    public void insert() {
  •       User user = new User();
  •       user.setName("wangxiaoxiao");
  •       userMapper.insert(user);
  •        
  •       //    int i = 1 / 0;//模拟异常,spring回滚后,db_user库中user表中也不会插入记录
  •       Account account = new Account();
  •       account.setUserId(user.getId());
  •       account.setMoney(123456789);
  •       accountMapper.insert(account);
  •    }
  • }
  • 编写配置文件spring-atomikos.xml

  • <?xml version="1.0" encoding="UTF-8"?>
  • <beans xmlns="http://www.springframework.org/schema/beans"
  •        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
  •        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  •        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
  •  
  •  
  •     <!--==========针对两个库,各配置一个AtomikosDataSourceBean,底层都使用MysqlXADataSource=====================-->
  •     <!--配置数据源db_user-->
  •     <bean id="db_user" class="com.atomikos.jdbc.AtomikosDataSourceBean"
  •           init-method="init" destroy-method="close">
  •         <property name="uniqueResourceName" value="ds1" />
  •         <property name="xaDataSourceClassName"
  •                   value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
  •         <property name="xaProperties">
  •             <props>
  •                 <prop key="url">jdbc:mysql://localhost:3306/db_user</prop>
  •                 <prop key="user">root</prop>
  •                 <prop key="password">shxx12151022</prop>
  •             </props>
  •         </property>
  •     </bean>
  •  
  •     <!--配置数据源db_account-->
  •     <bean id="db_account" class="com.atomikos.jdbc.AtomikosDataSourceBean"
  •           init-method="init" destroy-method="close">
  •         <property name="uniqueResourceName" value="ds2" />
  •         <property name="xaDataSourceClassName"
  •                   value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
  •         <property name="xaProperties">
  •             <props>
  •                 <prop key="url">jdbc:mysql://localhost:3306/db_account</prop>
  •                 <prop key="user">root</prop>
  •                 <prop key="password">shxx12151022</prop>
  •             </props>
  •         </property>
  •     </bean>
  •  
  •     <!--=============针对两个数据源,各配置一个SqlSessionFactoryBean============ -->
  •     <bean id="ssf_user" class="org.mybatis.spring.SqlSessionFactoryBean">
  •         <property name="dataSource" ref="db_user" />
  •     </bean>
  •  
  •     <bean id="ssf_account" class="org.mybatis.spring.SqlSessionFactoryBean">
  •         <property name="dataSource" ref="db_account" />
  •     </bean>
  •  
  •     <!--=============针对两个SqlSessionFactoryBean,各配置一个MapperScannerConfigurer============ -->
  •     <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  •         <property name="sqlSessionFactoryBeanName" value="ssf_user"/>
  •         <!--指定com.tianshouzhi.atomikos.mappers.db_user包下的UserMapper接口使用ssf_user获取底层数据库连接-->
  •         <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.db_user"/>
  •     </bean>
  •  
  •     <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  •         <property name="sqlSessionFactoryBeanName" value="ssf_account"/>
  •         <!--指定com.tianshouzhi.atomikos.mappers.ds_account包下的AccountMapper接口使用ssf_account获取底层数据库连接-->
  •         <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.ds_account"/>
  •     </bean>
  •  
  •     <!--================配置atomikos事务管理器========================-->
  •     <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
  •           destroy-method="close">
  •         <property name="forceShutdown" value="false"/>
  •     </bean>
  •  
  •     <!--============配置spring的JtaTransactionManager,底层委派给atomikos进行处理===============-->
  •     <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
  •         <property name="transactionManager" ref="atomikosTransactionManager"/>
  •     </bean>
  •  
  •     <!--配置spring声明式事务管理器-->
  •     <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
  •  
  •     <bean id="jtaService" class="com.tianshouzhi.atomikos.JTAService"/>
  • </beans>
  • 测试代码

  • package com.tianshouzhi.atomikos;
  •  
  • import org.springframework.context.ApplicationContext;
  • import org.springframework.context.support.ClassPathXmlApplicationContext;
  •  
  • public class AtomikosSpringMybatisExample {
  •    public static void main(String[] args) {
  •       ApplicationContext context = new ClassPathXmlApplicationContext("spring-atomikos.xml");
  •       JTAService jtaService = context.getBean("jtaService", JTAService.class);
  •       jtaService.insert();
  •    }
  • }
  •     建议读者先直接按照上述代码运行,以确定代码执行后,db_user库的user表和db_account的account表中的确各插入了一条记录,以证明我们的代码的确是操作了2个库,属于分布式事务。

        然后将JTAService中的异常模拟的注释打开,会发现出现异常后,两个库中都没有新插入的数据库,说明我们使用的JTA事务管理器的确保证数据的一致性了。

    Atomikos配置

        在掌握了Atomikos基本使用之后,我们对Atomikos的配置进行一下简单的介绍。Atomikos在启动后,默认会从以下几个位置读取配置文件,这里笔者直接贴出atomikos源码进行说明:

    com.atomikos.icatch.provider.imp.AssemblerImp#initializeProperties方法中定义了配置加载顺序逻辑: 

  • @Override
  •     public ConfigProperties initializeProperties() {
  •         //读取classpath下的默认配置transactions-defaults.properties
  •         Properties defaults = new Properties();
  •         loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);
  •         //读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值
  •         Properties transactionsProperties = new Properties(defaults);
  •         loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);
  •         //读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值
  •         Properties jtaProperties = new Properties(transactionsProperties);
  •         loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);
  •         
  •         //读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置
  •         Properties customProperties = new Properties(jtaProperties);
  •         loadPropertiesFromCustomFilePath(customProperties);
  •         //最终构造一个ConfigProperties对象,来表示实际要使用的配置
  •         Properties finalProperties = new Properties(customProperties);
  •         return new ConfigProperties(finalProperties);
  •     }
  • 配置文件优先级:transactions-defaults.properties<transactions.properties<jta.properties<自定义配置文件路径,后面的配置会覆盖之前同名key的配置。

    其中transactions-defaults.properties是atomikos自带的默认配置,位于transactions-xxx.jar中.

    注意不同版本的默认配置可能不同。特别是3.x版本和4.x版本的差异比较明显。   

    以下是4.0.6中 transactions-default.properties中配置内容,笔者对这些配置进行了归类,如下: 

  • ===============================================================
  • ============          事务管理器(TM)配置参数       ==============
  • ===============================================================
  • #指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
  • com.atomikos.icatch.enable_logging=true
  • #JTA/XA资源是否应该自动注册
  • com.atomikos.icatch.automatic_resource_registration=true
  • #JTA事务的默认超时时间,默认为10000ms
  • com.atomikos.icatch.default_jta_timeout=10000
  • #事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
  • com.atomikos.icatch.max_timeout=300000
  • #指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
  • com.atomikos.icatch.threaded_2pc=false
  • #指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制
  • com.atomikos.icatch.max_actives=50
  • #是否支持subtransaction,默认为true
  • com.atomikos.icatch.allow_subtransactions=true
  • #指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
  • com.atomikos.icatch.serial_jta_transactions=true
  • #指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
  • com.atomikos.icatch.force_shutdown_on_vm_exit=false
  • #在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
  • com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807
  •  
  • ===============================================================
  • =========        事务日志(Transaction logs)记录配置       =======
  • ===============================================================
  • #事务日志目录,默认为./。
  • com.atomikos.icatch.log_base_dir=./
  • #事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
  • com.atomikos.icatch.log_base_name=tmlog
  • #指定两次checkpoint的时间间隔,默认为500
  • com.atomikos.icatch.checkpoint_interval=500
  •  
  • ===============================================================
  • =========          事务日志恢复(Recovery)配置       =============
  • ===============================================================
  • #指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
  • com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
  • #指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同
  • com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
  • #提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
  • com.atomikos.icatch.oltp_max_retries=5
  • #提交失败时,每次重试的时间间隔,默认10000ms
  • com.atomikos.icatch.oltp_retry_interval=10000
  •  
  • ===============================================================
  • =========          其他       =============================== ==
  • ===============================================================
  • java.naming.factory.initial=com.sun.jndi.rmi.registry.RegistryContextFactory
  • com.atomikos.icatch.client_demarcation=false
  • java.naming.provider.url=rmi://localhost:1099
  • com.atomikos.icatch.rmi_export_class=none
  • com.atomikos.icatch.trust_client_tm=false
  • 当我们想对默认的配置进行修改时,可以在classpath下新建一个jta.properties,覆盖同名的配置项即可。

    关于不同版本配置的差异,请参考官方文档:https://www.atomikos.com/Documentation/JtaProperties

    打印日志

    4.x版本之后,优先尝试使用slf4j,如果没有则尝试使用log4j,如果二者都没有,则使用JUL。

    参见:https://www.atomikos.com/Documentation/ConfiguringTheLogs

    注意这里是说的是如何配置打印工作日志(work log),而前面说的是事务日志(transactions log),二者不是不同的。

    总结

    以上是生活随笔为你收集整理的atomikos JTA/XA全局事务的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。