jfinal集成多数据库事务框架测试

上次ShardingSphere的分布式事务经过测试

所有数据源必须是相同的数据库类型;否则抛出异常:Database type inconsistent with '%s' and '%s';其数据库类型根据connection.getMetaData().getDatabaseProductName()

查看了seata框架发现要搭建fescar-server,我只想在单独项目里面实现多数据库事务所以未做深入了解。

 

了解到相关实现jta框架可以实现我的需求,就进行了集成到jfinal的相关测试。

选用jta的atomike实现

1)在本机不同数据库上建立数据库(mysql,sqlserver,oracle)

CREATE TABLE test (
  id int(11) NOT NULL,
  PRIMARY KEY (id)
)

2)加入atomikosjar

<dependency>
    <groupId>
com.atomikos</groupId>
    <artifactId>
transactions-jdbc</artifactId>
    <version>
5.0.0</version>
</dependency>
<dependency>
    <groupId>
javax.transaction</groupId>
    <artifactId>
jta</artifactId>
    <version>
1.1</version>
</dependency>

3)定义插件AtomikosPlugin(更多数据库设置参数未实现)

package com.csnt.source.plugin.atomikos;

import
com.atomikos.icatch.jta.UserTransactionImp;
import
com.atomikos.jdbc.AtomikosDataSourceBean;
import
com.jfinal.kit.LogKit;
import
com.jfinal.kit.StrKit;
import
com.jfinal.plugin.IPlugin;
import
com.jfinal.plugin.activerecord.ActiveRecordException;
import
com.jfinal.plugin.activerecord.IAtom;
import
com.jfinal.plugin.activerecord.IDataSourceProvider;

import
javax.sql.DataSource;
import
java.util.Properties;


/**
 * @author source
 */
public class AtomikosPlugin implements IPlugin, IDataSourceProvider {
   
private String name = null;
    private
String url;
    private
String username;
    private
String password;
    private
String xaDataSourceStr;

    private int
minPoolSize = 10;
    private int
maxPoolSize = 32;

    private
AtomikosDataSourceBean ds;
    private boolean
localTransactionMode = true;
    private volatile boolean
isStarted = false;

    public
AtomikosPlugin(String url, String username, String password, String xaDataSourceStr) {
       
this.url = url;
        this
.username = username;
        this
.password = password;
        this
.xaDataSourceStr = xaDataSourceStr;

   
}

   
public AtomikosPlugin set(int minPoolSize, int maxPoolSize) {
       
this.minPoolSize = minPoolSize;
        this
.maxPoolSize = maxPoolSize;
        return this;
   
}

   
public final AtomikosPlugin setLocalTransactionMode(boolean localTransactionMode) {
       
this.localTransactionMode = localTransactionMode;
        return this;
   
}

   
public final String getName() {
       
return this.name;
   
}

   
public final void setName(String name) {
       
this.name = name;
   
}

   
public static boolean tx(IAtom atom) {
        UserTransactionImp utx =
new UserTransactionImp();
        try
{
            utx.begin()
;
            boolean
result = atom.run();
            if
(result) {
               
// 提交事务
               
utx.commit();
           
} else {
               
//回滚事务
               
utx.rollback();
           
}
           
return result;
       
} catch (Throwable t) {
           
try {
                utx.rollback()
;
           
} catch (Exception e) {
                LogKit.error(e.getMessage()
, e);
           
}
           
throw t instanceof RuntimeException ? (RuntimeException) t : new ActiveRecordException(t);
       
}
    }

   
@Override
   
public boolean start() {
       
if (isStarted) {
           
return true;
       
} else {
           
ds = new AtomikosDataSourceBean();
            if
(StrKit.notBlank(name)) {
                
ds.setUniqueResourceName(name);
           
}
           
ds.setMinPoolSize(minPoolSize);
           
ds.setMaxPoolSize(maxPoolSize);
           
//是否开启本地事务与jta事务混合
           
ds.setLocalTransactionMode(localTransactionMode);
           
ds.setXaDataSourceClassName(xaDataSourceStr);
           
Properties properties = new Properties();
           
properties.put("URL", url);
           
properties.put("user", username);
           
properties.put("password", password);
           
ds.setXaProperties(properties);
            this
.isStarted = true;
            return true;
       
}
    }

   
@Override
   
public boolean stop() {
       
if (ds != null) {
           
ds.close();
       
}

       
ds = null;
       
isStarted = false;
        return true;
   
}

   
@Override
   
public DataSource getDataSource() {
       
return ds;
   
}
   
}

4)设置数据库配置信息

    /**
     *
配置数据库 Atomikos
     */
   
private void configDbForAtomikos(Plugins me) {
        loadPropertyFile(
"database.properties");
       
String dbConfig = getProperty("database.register", "main");
       
String[] dataBases = dbConfig.split(",");
        for
(String dbKey : dataBases) {
            AtomikosPlugin ap =
new AtomikosPlugin(getProperty(dbKey + ".jdbcUrl"), getProperty(dbKey + ".user"), getProperty(dbKey + ".password"), getProperty(dbKey + ".xa.datasource"));
           
ap.setName(dbKey);
//            //设置连接池数量
//            if (StringUtil.isNotEmpty(getPropertyToInt(dbKey + ".initialSize"), getPropertyToInt(dbKey + ".minIdle"), getPropertyToInt(dbKey + ".maxActive"))) {
//                ap.set(getPropertyToInt(dbKey + ".initialSize"), getPropertyToInt(dbKey + ".minIdle"), getPropertyToInt(dbKey + ".maxActive"));
//            }
           
ActiveRecordPlugin arp;
            if
("main".equals(dbKey)) {
                arp =
new ActiveRecordPlugin(ap);
           
} else {
                arp =
new ActiveRecordPlugin(dbKey, ap);
           
}
            arp.setDialect(getDbDialectByDriver(getProperty(dbKey +
".driver")));
           
arp.setContainerFactory(new CaseInsensitiveContainerFactory());
           
arp.setShowSql(true);
           
me.add(ap);
           
me.add(arp);
       
}
    }

 

#数据库配置注册器,配置到此处的数据库,自动注册到程序
database.register=main,test1,test2

#region main数据库
main.driver=com.mysql.cj.jdbc.Driver
main.xa.datasource=com.mysql.cj.jdbc.MysqlXADataSource
main.jdbcUrl=jdbc:mysql://127.0.0.1:3306/source?serverTimezone=Asia/Shanghai&characterEncoding=utf8
main.user=
main.password=
#endregion

test1.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
test1.xa.datasource=com.microsoft.sqlserver.jdbc.SQLServerXADataSource
test1.jdbcUrl=jdbc:sqlserver://127.0.0.1:1433;databaseName=test
test1.user=
test1.password=

test2.driver=oracle.jdbc.OracleDriver
test2.xa.datasource=oracle.jdbc.xa.client.OracleXADataSource
test2.jdbcUrl=jdbc:oracle:thin:@127.0.0.1:1521:XE
test2.user=
test2.password=

5)测试代码

Record record = new Record();
record.set("id", 1);
AtomikosPlugin.tx(() -> {
    Db.save(
"test", record);
   
Db.use("test1").save("test", record);
   
Db.use("test2").save("test", record);
    return false;
});

6待完善问题

Atomike更多参数调优设置,性能测试

7)实际运行中问题处理

Oracle XA事务授权

Oracle 授权XA事务脚本
GRANT SELECT ON
   sys.dba_pending_transactions TO name;
GRANT SELECT ON
   sys.pending_trans$ TO name;
GRANT SELECT ON
   sys.dba_2pc_pending TO name;
GRANT execute ON sys.dbms_system TO name;

sqlserver配置XA事务 (https://www.ibm.com/support/knowledgecenter/zh/SSTLXK_8.5.5/com.ibm.wbpm.imuc.ebpm.doc/topics/db_xa_nd_win_man.html)

减少Atomikos日志
log4j.properties追加配置
#override atomikos info msg
log4j.logger.com.atomikos=WARN

 

修改Atomikos日志路径

jta.properties修改 com.atomikos.icatch.log_base_dir=./logs/transprovincially

 

修改AtomikoslocalTransactionMode属性为true

不修改单独使用本地事务会报错

 

PS.本人是新手如果有错误敬请谅解,本项目只是测试项目。

 

参考资料:

jfinal数据库事务相关源码

记一次Atomikos分布式事务的使用 https://www.jianshu.com/p/86b4ab4f2d18


评论区

杜福忠

2019-06-13 14:36

我没做过分布式事务,请教下,不知道和下面这样写,原理上有啥不同了?或者说下面的写法会存在什么问题了?
Db.tx(() -> {
return Db.use("test1").tx(() -> {
return Db.use("test2").tx(() -> {
Db.save("test", record);
Db.use("test1").save("test", record);
Db.use("test2").save("test", record);
return false;
});
});
});

sourceTT

2019-06-13 14:53

你这个只是本地事务的嵌套 ,每个事务只管自己数据库的sql。
比如你上面这样执行
在事务传递test2执行完成到test1返回时 出错则无法回滚test2的事务

杜福忠

2019-06-13 15:09

@sourceTT 嗯,是有这种问题存在的可能, 再问下,Atomikos的实现原理是怎么保证全部回滚的了?

sourceTT

2019-06-13 15:14

@杜福忠 参考资料里面介绍的比较详细,你可以看看

JFinal

2019-06-13 15:41

这是高手,点赞 + 收藏

杜福忠

2019-06-13 15:56

@sourceTT 好的~ 学习学习争取弄明白原理

热门分享

扫码入社