Quartz定时任务Redis集群方案

前言

加入jfinal club也有1个月之多了,学习到很多,总是向人家索取,自己并未付出的这种状态不是我想要的。本人实力有限也想尽可能的提供自己的绵薄之力,这次就抛砖引入分享一下Quartz定时任务Redis集群方案,参考社区之前有人已经提供了的QuartzPlugin和波总在cron4j中解答的如何去现实cron4j的集群方案

为什么要做redis集群方案

有人会说Quartz已经有一套成熟的集群方案了,但是那个是数据库版本,而且配置我也觉得挺麻烦,我不喜欢直连数据库,感觉很费资源

QuartzPlugin我就不贴出来了,之前就有人分享过

主要实现的接口,里面用到的一些redis操作是自己写的RedisCache,模仿jfinal ehcache的写法

主要流程

blob.png

package com.jfinalshop.task;

import com.jfinal.ext2.kit.JsonExtKit;
import com.jfinal.kit.JsonKit;
import com.jfinal.kit.StrKit;
import com.jfinalshop.util.IQuartzJobLoader;
import com.jfinalshop.util.RedisCacheKit;
import org.apache.commons.collections.map.HashedMap;
import org.quartz.Job;
import org.quartz.JobExecutionContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by little fish on 2017/8/5.
 */

public abstract class RedisJob implements Job {
    private final String JOB_CACHE_PREFIX = "quartz_job_";
    private final String JOB_LOCK_KEY = "isLock";
    private final String JOB_RUN_COUNT_KEY = "runCount";
    private final String JOB_FAIL_COUNT_KEY = "failCount";
    private final String JOB_FAIL_MESSAGE_KEY = "failMessage";
    private final String JOB_LAST_FIRE_TIME_KEY = "lastFireTime";
    private final Integer JOB_LOCK_TRUE = 1;
    private final Integer JOB_LOCK_FALSE = 0;

    private static ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<String, ReentrantLock>();

    private ReentrantLock getLock(String key) {
        ReentrantLock lock = lockMap.get(key);
        if (lock != null)
            return lock;

        lock = new ReentrantLock();
        ReentrantLock previousLock = lockMap.putIfAbsent(key, lock);
        return previousLock == null ? lock : previousLock;
    }

    private String genJobCacheName(JobExecutionContext jobExecutionContext){
        return String.format("%s%s", JOB_CACHE_PREFIX, jobExecutionContext.getJobInstance().getClass().getSimpleName());
    }

    private Map<Object, Object> genResetJobCacheMap(int runCount, Long fireTime,int failCount, String failMessage){
        Map<Object, Object> loadData = new HashedMap() ;
        loadData.put(JOB_LOCK_KEY, JOB_LOCK_FALSE);//去锁
        loadData.put(JOB_LAST_FIRE_TIME_KEY, fireTime);//同步最后一次触发时间
        loadData.put(JOB_RUN_COUNT_KEY, runCount++);//运行次数加1
        loadData.put(JOB_FAIL_COUNT_KEY, failCount);//错误次数
        loadData.put(JOB_FAIL_MESSAGE_KEY, failMessage);//错误信息
        return loadData;
    }

    public boolean checkJobValid(JobExecutionContext jobExecutionContext, IQuartzJobLoader jobLoader){
        String cacheName = genJobCacheName(jobExecutionContext);
        Integer isLock = RedisCacheKit.hget(cacheName, JOB_LOCK_KEY);
        if(isLock == JOB_LOCK_TRUE){
            return false;
        }

        Lock lock = getLock(cacheName);
        lock.lock();
        //初始化一些默认值
        Integer runCount = 0;
        Integer failCount = 0;
        Long lastFireTime = 0L;
        String failMessageJsonStr = "";
        //实际插入redis的数据
        Map<Object, Object> loadData = null;

        try {
            //再次拿出数据做double check检查并发
            Map<Object, Object> redisCacheData =  RedisCacheKit.hgetAll(cacheName);
            isLock = (Integer)redisCacheData.get(JOB_LOCK_KEY);
            lastFireTime = (Long)redisCacheData.get(JOB_LAST_FIRE_TIME_KEY);
            lastFireTime = lastFireTime != null?lastFireTime:0;
            runCount = (Integer)redisCacheData.get(JOB_RUN_COUNT_KEY);
            runCount = runCount != null?runCount:0;
            failCount = (Integer)redisCacheData.get(JOB_FAIL_COUNT_KEY);
            failCount = failCount != null?failCount:0;
            failMessageJsonStr = (String) redisCacheData.get(JOB_FAIL_MESSAGE_KEY);
            failMessageJsonStr = failMessageJsonStr != null?failMessageJsonStr:"";
            //检查是否满足执行条件
            if ((isLock == null || isLock == JOB_LOCK_FALSE) && lastFireTime < jobExecutionContext.getFireTime().getTime()) {
                //Redis加锁
                RedisCacheKit.hset(cacheName, JOB_LOCK_KEY, JOB_LOCK_TRUE);
                //处理Task主要业务逻辑
                jobLoader.load();
                //处理成功后把最新的状态同步到Redis中
                loadData = genResetJobCacheMap(JOB_LOCK_FALSE, jobExecutionContext.getFireTime().getTime(), failCount, failMessageJsonStr);
            }

            return true;
        }catch (Exception e){
            e.printStackTrace();
            failCount++;//失败次数+1

            //添加失败信息以便之后检查
            List<RedisJobFailMessage> faileMessageArray = null;
            if(StrKit.isBlank(failMessageJsonStr)){
                faileMessageArray = new ArrayList<RedisJobFailMessage>();
            }else{
                try {
                    faileMessageArray = JsonExtKit.jsonToJSONArray(failMessageJsonStr).toJavaList(RedisJobFailMessage.class);
                }catch (Exception ex){
                    ex.printStackTrace();
                }

                if(faileMessageArray == null){
                    faileMessageArray = new ArrayList<RedisJobFailMessage>();
                }
            }

            faileMessageArray.add(new RedisJobFailMessage(lastFireTime, e.getMessage()));
            failMessageJsonStr = JsonKit.toJson(faileMessageArray);
            loadData = genResetJobCacheMap(JOB_LOCK_FALSE, jobExecutionContext.getFireTime().getTime(), failCount, failMessageJsonStr);

        }finally {
            RedisCacheKit.hmsetForever(cacheName, loadData);
            lock.unlock();
        }

        return false;
    }
}

一个Loader接口其实就是和IDataLoader一模一样

package com.jfinalshop.util;

/**
 * Created by little fish on 2017/7/16.
 */
public interface IQuartzJobLoader {
    public void load(Object... args);
}


最后调用

package com.jfinalshop.task;

import com.jfinal.kit.LogKit;
import com.jfinalshop.util.IQuartzJobLoader;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * Created by little fish on 2017/8/5.
 */
public class TaskTest extends RedisJob {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        this.checkJobValid(jobExecutionContext, new IQuartzJobLoader() {
            @Override
            public void load(Object... args) {
                //验证通过执行任务
                LogKit.info(String.format("getClass = %s fireTime = %s runtime = %s nextFireTime = %s result = %s",jobExecutionContext.getJobInstance().getClass().getSimpleName(),jobExecutionContext.getFireTime(),jobExecutionContext.getJobRunTime(),jobExecutionContext.getNextFireTime(),jobExecutionContext.getResult()));
            }
        });

    }
}

刚写完的,代码写的很粗糙,只是做过冒烟测试,可以正常执行,没有做过优化

评论区

JFinal

2017-08-07 10:34

代码写得十分工整,这段时间功力精进不少啊

yjjdick1990

2017-08-07 15:24

@JFinal 加入club后跟着各位大佬学习,进步神速,主要看是看了波总jfinal的代码,模仿着写,但是有些实在写的太高深了只能一点点模仿了

寻找任大侠

2018-01-12 15:31

大佬们这个必须要用jfinal的框架吗

寻找任大侠

2018-03-26 10:11

@yjjdick1990 但是我照着抄了一遍没有完成集群~,大佬求个完整的Demo

yjjdick1990

2018-03-26 14:16

@寻找任大侠 上面代码就是完整demo了,我估计你Quartz的基础jar没有引入吧,这个直接拷贝过去就能用的,对了,用这个代码之前首先要让项目支持Quartz,Quartz可以在这个分享里找具体链接我也不记得了

寻找任大侠

2018-04-16 16:01

@yjjdick1990 quartz的基础jar应该引入了,问题应该是那个redisCacheKit类没有

热门分享

扫码入社