jFinal Redis插件,支持单机和集群模式

jFinal自带的redis插件只支持单机模式,不支持集群,我写了个支持集群模式的插件

这个插件不但支持集群模式,还修复了集群模式下,没有keys、flushDB、dbSize等方法的调用。
注意:这个插件依赖阿里的fastjson。


集群模式与单机模式,完全不同的东西,单机是先创建JedisPool,然后调用里面getResource()方法来获得Jedis对象,然后使用Jedis来操作redis。

集群使用的是JedisCluster,他本身就实现了pool功能,所以不需要自己关闭连接,但是创建的方式与单机完全不一样,他是需要构造

Set<HostAndPort> jedisClusterNodes

来告诉redis使用了哪些服务器,而且集群模式是不支持密码的。


首先定义插件类,用于初始化集群模式的JedisCluster对象

package test.sunyu.tools.redis;
import com.alibaba.fastjson.JSON;
import com.jfinal.plugin.IPlugin;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * jFinal的redis插件不支持redis集群模式,我自己扩展一个吧
 *
 * @author 孙宇
 */
public class RedisClusterPlugin implements IPlugin {
    private Integer maxTotal = 1000;
    private Integer maxIdle = 200;
    private Long maxWaitMillis = 2000L;
    private Boolean testOnBorrow = true;
    private String cluster;
    private String defaultName = "main";
    public RedisClusterPlugin(String cluster) {
        this.cluster = cluster;
    }
    @Override
    public boolean start() {
        if (cluster != null) {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(maxTotal);
            jedisPoolConfig.setMaxIdle(maxIdle);
            jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
            jedisPoolConfig.setTestOnBorrow(testOnBorrow);
            Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
            List<Map> clusterList = JSON.parseArray(cluster, Map.class);
            if (clusterList != null && clusterList.size() > 0) {
                for (Map<String, String> c : clusterList) {
                    jedisClusterNodes.add(new HostAndPort(c.get("host"), Integer.parseInt(c.get("port"))));
                }
                JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, jedisPoolConfig);
                RedisClusterTools.addJedisCluster(defaultName, jedisCluster);
            }
            return true;
        }
        return false;
    }
    @Override
    public boolean stop() {
        return true;
    }
    public Integer getMaxTotal() {
        return maxTotal;
    }
    public void setMaxTotal(Integer maxTotal) {
        this.maxTotal = maxTotal;
    }
    public Integer getMaxIdle() {
        return maxIdle;
    }
    public void setMaxIdle(Integer maxIdle) {
        this.maxIdle = maxIdle;
    }
    public Long getMaxWaitMillis() {
        return maxWaitMillis;
    }
    public void setMaxWaitMillis(Long maxWaitMillis) {
        this.maxWaitMillis = maxWaitMillis;
    }
    public Boolean getTestOnBorrow() {
        return testOnBorrow;
    }
    public void setTestOnBorrow(Boolean testOnBorrow) {
        this.testOnBorrow = testOnBorrow;
    }
    public String getCluster() {
        return cluster;
    }
    public void setCluster(String cluster) {
        this.cluster = cluster;
    }
    public String getDefaultName() {
        return defaultName;
    }
    public void setDefaultName(String defaultName) {
        this.defaultName = defaultName;
    }
}


然后编写集群模式的工具类,其实这里我也可以将所有的方法写在这个类中,比如get、put等等,就像jFinal的Cache那样,但是我嫌麻烦,所以就没那样写,只写了keys、flushDB、dbSize这三个方法的直接调用。

package test.sunyu.tools.redis;
import org.nutz.lang.Lang;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import sunyu.tools.redis.JedisClusterCallback;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
 * redis集群工具类
 *
 * @author 孙宇
 */
public class RedisClusterTools {
    private static String defaultName = "main";
    private static final ConcurrentHashMap<String, JedisCluster> jedisClusterMap = new ConcurrentHashMap<>();
    public static void addJedisCluster(String jedisClusterName,
                                       JedisCluster jedisCluster) {
        if (jedisCluster != null && jedisClusterName != null && !jedisClusterMap.containsKey(jedisClusterName)) {
            jedisClusterMap.put(jedisClusterName, jedisCluster);
        }
    }
    /**
     * 获得一个JedisCluster
     *
     * @param jedisClusterName
     *
     * @return
     */
    public static JedisCluster getJedisCluster(String jedisClusterName) {
        return jedisClusterMap.get(jedisClusterName);
    }
    /**
     * 执行集群指令
     *
     * @param jedisClusterName
     * @param action
     * @param <T>
     *
     * @return
     */
    public static <T> T clusterExecute(String jedisClusterName,
                                       JedisClusterCallback<T> action) {
        JedisCluster jedis = getJedisCluster(jedisClusterName);
        try {
            return action.doInJedisCluster(jedis);
        } catch (Throwable throwable) {
            throw Lang.wrapThrow(throwable);
        }
    }
    /**
     * 执行集群指令
     *
     * @param action
     * @param <T>
     *
     * @return
     */
    public static <T> T clusterExecute(JedisClusterCallback<T> action) {
        JedisCluster jedis = getJedisCluster(defaultName);
        try {
            return action.doInJedisCluster(jedis);
        } catch (Throwable throwable) {
            throw Lang.wrapThrow(throwable);
        }
    }
    /**
     * 由于JedisCluster没有实现keys操作,这里自己实现以下
     *
     * @param pattern
     *
     * @return
     */
    public static Set<String> clusterKeys(String jedisClusterName,
                                          String pattern) {
        Set<String> keys = new HashSet<>();
        Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
        return clusterKeys(pattern, keys, clusterNodes);
    }
    private static Set<String> clusterKeys(String pattern,
                                           Set<String> keys,
                                           Map<String, JedisPool> clusterNodes) {
        for (String k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                keys.addAll(connection.keys(pattern));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
        return keys;
    }
    /**
     * 由于JedisCluster没有实现keys操作,这里自己实现以下
     *
     * @param pattern
     *
     * @return
     */
    public static Set<String> clusterKeys(String pattern) {
        Set<String> keys = new HashSet<>();
        Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
        return clusterKeys(pattern, keys, clusterNodes);
    }
    public static Set<byte[]> clusterKeys(String jedisClusterName,
                                          byte[] pattern) {
        Set<byte[]> keys = new HashSet<>();
        Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
        return clusterKeys(pattern, keys, clusterNodes);
    }
    private static Set<byte[]> clusterKeys(byte[] pattern,
                                           Set<byte[]> keys,
                                           Map<String, JedisPool> clusterNodes) {
        for (Object k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                keys.addAll(connection.keys(pattern));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
        return keys;
    }
    public static Set<byte[]> clusterKeys(byte[] pattern) {
        Set<byte[]> keys = new HashSet<>();
        Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
        return clusterKeys(pattern, keys, clusterNodes);
    }
    public static void clusterFlushDB(String jedisClusterName) {
        Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
        clusterFlushDb(clusterNodes);
    }
    private static void clusterFlushDb(Map<String, JedisPool> clusterNodes) {
        for (Object k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                connection.flushDB();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
    }
    public static void clusterFlushDB() {
        Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
        clusterFlushDb(clusterNodes);
    }
    public static Long clusterDbSize(String jedisClusterName) {
        Long total = 0L;
        Map<String, JedisPool> clusterNodes = getJedisCluster(jedisClusterName).getClusterNodes();
        return clusterDbSize(total, clusterNodes);
    }
    private static Long clusterDbSize(Long total,
                                      Map<String, JedisPool> clusterNodes) {
        for (Object k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                total += connection.dbSize();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
        return total;
    }
    public static Long clusterDbSize() {
        Long total = 0L;
        Map<String, JedisPool> clusterNodes = getJedisCluster(defaultName).getClusterNodes();
        return clusterDbSize(total, clusterNodes);
    }
}



因为上面工具类没有将所有jedis集群模式的方法全都写出来,所以我使用了一个回调,将方法的调用交给了程序员,偷懒的做法。

package sunyu.tools.redis;
import redis.clients.jedis.JedisCluster;
/**
 * 懒得写所有方法,回调一下得了
 *
 * @author 孙宇
 */
public interface JedisClusterCallback<T> {
    T doInJedisCluster(JedisCluster jedis) throws Throwable;
}



测试类,能直接调用的只有keys、flushDB、dbSize这三个方法。。如果想调用get/put/hget等等。。。。请传递回调方法,然后自己在里面实现,当然,集群模式是不需要自己close链接的。并且集群模式内部实现了连接池,也不用用户去管理了。


集群模式需要知道所有的服务器节点,所以为了初始化方便,我使用了json格式

String clusterJson = "[{host:'192.168.11.81',port:'7001'},{host:'192.168.11.81',port:'7002'},{host:'192.168.11.81',port:'7003'},{host:'192.168.11.81',port:'7007'},{host:'192.168.11.81',port:'7008'},{host:'192.168.11.82',port:'7004'},{host:'192.168.11.82',port:'7005'},{host:'192.168.11.82',port:'7006'},{host:'192.168.11.82',port:'7009'},{host:'192.168.11.82',port:'7010'}]";

就是个集合,每一项都是一个节点,必须有host和port属性


package test.sunyu.tools.redis;
import com.alibaba.fastjson.JSON;
import org.junit.Test;
import redis.clients.jedis.JedisCluster;
import sunyu.tools.redis.JedisClusterCallback;
import java.util.Set;
/**
 * @author 孙宇
 */
public class TestRedisClusterTools {
    @Test    public void t1() {
        String clusterJson = "[{host:'192.168.11.81',port:'7001'},{host:'192.168.11.81',port:'7002'},{host:'192.168.11.81',port:'7003'},{host:'192.168.11.81',port:'7007'},{host:'192.168.11.81',port:'7008'},{host:'192.168.11.82',port:'7004'},{host:'192.168.11.82',port:'7005'},{host:'192.168.11.82',port:'7006'},{host:'192.168.11.82',port:'7009'},{host:'192.168.11.82',port:'7010'}]";
        RedisClusterPlugin p = new RedisClusterPlugin(clusterJson);
        p.start();
        //集群模式本身是没有keys命令的,这里我自己实现了
        Set<String> allKeys = RedisClusterTools.clusterKeys("*");
        System.out.println(JSON.toJSONString(allKeys));
        //大小
        //RedisClusterTools.clusterDbSize();
        //删除
        //RedisClusterTools.clusterFlushDB();
        //其他操作,其他操作全都是回调方式,因为我懒得写里面所有方法了,只要是jedis支持的方法,集群模式这个工具都支持
        RedisClusterTools.clusterExecute(new JedisClusterCallback<Object>() {
            @Override
            public Object doInJedisCluster(JedisCluster jedis) throws Throwable {
                //jedis.get("");
                //jedis.set("", "");
                //jedis.del("");
                //jedis.hget("", "");
                //....不写了,自己看文档吧
                return null;
            }
        });
    }
}







评论区

热门分享

扫码入社