基于Jfinal的TaskExecutor,看有没有人用啊。

在前:之前使用SpringMVC,现在有项目使用Jfinal。

现在的项目对多线程这块使用的比较多,所以线程池就需要一些考虑。SpringMVC中有TaskExecutor,想着根据这个思路给Jfinal来个插件。因为之前没有写过插件,所以可能考虑的不周。我先本地测试了下,线程池的管理没有测出比较大的问题。

    之前@JFinal 有提过,一般的线程池在使用tomcat或者Jetty管理。


有需要的可以试试,新鲜出炉,没有特别的测试。


链接: https://pan.baidu.com/s/1eSx87p4 密码: zvvn


1.再AppConf.java中使用的configPlugin()


TaskPlugin taskPlugin = new TaskPlugin();
me.add(taskPlugin);


2.使用的话,这样使用:

TaskKit.taskExecutor.execute(new Runnable() {
@Override
public void run() {
 //println();
}
});

3.配置文件中配置:

task.core_pool_size=200
task.max_pool_size=10000
task.queue_capacity=500
task.keep_alive_seconds=30000

4.TaskPlugin的大体代码:


	private boolean isStarted = false;
	
	private JfinalTaskExecutor jfinalTaskExecutor;
	
	@Override
	public boolean start() {
		if (isStarted)
			return true;
		jfinalTaskExecutor = new JfinalTaskExecutor();
		
		jfinalTaskExecutor.setCorePoolSize(PropKit.getInt("task.core_pool_size"));
		jfinalTaskExecutor.setKeepAliveSeconds(PropKit.getInt("task.keep_alive_seconds"));
		jfinalTaskExecutor.setQueueCapacity(PropKit.getInt("task.queue_capacity"));
		jfinalTaskExecutor.setMaxPoolSize( PropKit.getInt("task.max_pool_size"));
		jfinalTaskExecutor.afterPropertiesSet();
		
		TaskKit.setTaskExecutor(jfinalTaskExecutor);
		isStarted = true;
		return true;
	}

	
	@Override
	public boolean stop() {
		jfinalTaskExecutor.shutdown();
		isStarted = false;
		return true;
	}


补充-----------------------

5.重要的JfinalTaskExecutor类

/**
 * @Title: TaskExecutor.java
 * @Package com.jfinal.plugin
 * @Description: TODO
 * Copyright: Copyright (c) 2016 
 * Company:QDUM
 * 
 * @author qdum-ivy
 * @date 2016年9月13日 上午10:36:42
 * @version V1.0
 */

package com.jfinal.plugin.task.support;

import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.management.RuntimeErrorException;

import com.jfinal.plugin.IPlugin;


/**
  * @ClassName: TaskExecutor
  * @Description: TODO
  * @author qdum-ivy
  * @date 2016年9月13日 上午10:36:42
  *
  */

public class JfinalTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor, Serializable {
	

	private static final long serialVersionUID = 8691977770206223903L;

	private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private boolean allowCoreThreadTimeOut = false;

	private int queueCapacity = Integer.MAX_VALUE;

	private ThreadPoolExecutor threadPoolExecutor;


	public void setCorePoolSize(int corePoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.corePoolSize = corePoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setCorePoolSize(corePoolSize);
			}
		}
	}

	public int getCorePoolSize() {
		synchronized (this.poolSizeMonitor) {
			return this.corePoolSize;
		}
	}

	public void setMaxPoolSize(int maxPoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.maxPoolSize = maxPoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
			}
		}
	}

	public int getMaxPoolSize() {
		synchronized (this.poolSizeMonitor) {
			return this.maxPoolSize;
		}
	}

	public void setKeepAliveSeconds(int keepAliveSeconds) {
		synchronized (this.poolSizeMonitor) {
			this.keepAliveSeconds = keepAliveSeconds;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
			}
		}
	}

	public int getKeepAliveSeconds() {
		synchronized (this.poolSizeMonitor) {
			return this.keepAliveSeconds;
		}
	}

	public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
		this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
	}

	public void setQueueCapacity(int queueCapacity) {
		this.queueCapacity = queueCapacity;
	}


	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
		ThreadPoolExecutor executor  = new ThreadPoolExecutor(
				this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
				queue, threadFactory, rejectedExecutionHandler);
		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

	protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
		if (queueCapacity > 0) {
			return new LinkedBlockingQueue<Runnable>(queueCapacity);
		}
		else {
			return new SynchronousQueue<Runnable>();
		}
	}

	public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
		if (threadPoolExecutor==null) {
			new RuntimeException("threadPoolExecutor == null");
		}
		return this.threadPoolExecutor;
	}

	public int getPoolSize() {
		return getThreadPoolExecutor().getPoolSize();
	}

	public int getActiveCount() {
		return getThreadPoolExecutor().getActiveCount();
	}


	public void execute(Runnable task) {
		Executor executor = getThreadPoolExecutor();
		try {
			executor.execute(task);
		}
		catch (RuntimeErrorException ex) {
			throw new RuntimeException("execute error "+ex);
		}
	}

	public void execute(Runnable task, long startTimeout) {
		execute(task);
	}

	public Future<?> submit(Runnable task) {
		ExecutorService executor = getThreadPoolExecutor();
		try {
			return executor.submit(task);
		}
		catch (RejectedExecutionException ex) {
			throw new RuntimeException("execute error "+ex);
		}
	}

	public <T> Future<T> submit(Callable<T> task) {
		ExecutorService executor = getThreadPoolExecutor();
		try {
			return executor.submit(task);
		}
		catch (RejectedExecutionException ex) {
			throw new RuntimeException("execute error "+ex);
		}
	}

	public boolean prefersShortLivedTasks() {
		return true;
	}

}

6.父类:ExecutorConfigurationSupport.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

@SuppressWarnings("serial")
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory{

	protected final Log logger = LogFactory.getLog(getClass());

	private ThreadFactory threadFactory = this;

	private boolean threadNamePrefixSet = false;

	private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

	private boolean waitForTasksToCompleteOnShutdown = false;

	private int awaitTerminationSeconds = 0;

	private String beanName;

	private ExecutorService executor;

	public void setThreadFactory(ThreadFactory threadFactory) {
		this.threadFactory = (threadFactory != null ? threadFactory : this);
	}

	@Override
	public void setThreadNamePrefix(String threadNamePrefix) {
		super.setThreadNamePrefix(threadNamePrefix);
		this.threadNamePrefixSet = true;
	}
	public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
		this.rejectedExecutionHandler =
				(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
	}
	public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
		this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
	}

	public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
		this.awaitTerminationSeconds = awaitTerminationSeconds;
	}

	public void setBeanName(String name) {
		this.beanName = name;
	}


	/**
	 * Calls {@code initialize()} after the container applied all property values.
	 * @see #initialize()
	 */
	public void afterPropertiesSet() {
		initialize();
	}

	/**
	 * Set up the ExecutorService.
	 */
	public void initialize() {
		if (logger.isInfoEnabled()) {
			logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
		}
		if (!this.threadNamePrefixSet && this.beanName != null) {
			setThreadNamePrefix(this.beanName + "-");
		}
		this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
	}

	protected abstract ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);


	public void destroy() {
		shutdown();
	}

	public void shutdown() {
		if (logger.isInfoEnabled()) {
			logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
		}
		if (this.waitForTasksToCompleteOnShutdown) {
			this.executor.shutdown();
		}
		else {
			this.executor.shutdownNow();
		}
		awaitTerminationIfNecessary();
	}
	private void awaitTerminationIfNecessary() {
		if (this.awaitTerminationSeconds > 0) {
			try {
				if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
					if (logger.isWarnEnabled()) {
						logger.warn("Timed out while waiting for executor" +
								(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
					}
				}
			}
			catch (InterruptedException ex) {
				if (logger.isWarnEnabled()) {
					logger.warn("Interrupted while waiting for executor" +
							(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
				}
				Thread.currentThread().interrupt();
			}
		}
	}

}



这两个类大体管理了生命周期。另外还有一些类四五个类代码太多,等找时间整理下一块打包发出来。

评论区

JFinal

2016-09-13 14:43

核心的 TaskKit 没分享出来哈,线程池的创建,生命周期的管理这部分是关键,没有发出来呢

IvyHelen

2016-09-13 14:58

@JFinal 好的。我是根据SpringMVC里面管理的,那我整理下都分享出来。可能牵扯到了好几个类。现在还精简。

IvyHelen

2016-09-13 15:10

这几个类的代码上传到百度云盘了:
链接: https://pan.baidu.com/s/1eSx87p4 密码: zvvn

JFinal

2016-09-13 15:11

@IvyHelen 贴子里面放上这个链接,有需要的朋友可以获取到,感谢支持

hotsmile

2016-12-20 09:29

在使用这个了,谢谢 @IvyHelen

hotsmile

2016-12-20 09:42

@IvyHelen ,我启动就报空指针了
@Override
public void addOtherPlugin(Plugins me) {
TaskPlugin taskPlugin=new TaskPlugin();
me.add(taskPlugin);
TaskKit.taskExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("----shdsdhsld");
}
});

// System.out.println("开始执行线程池插件");
}

hotsmile

2016-12-20 10:08

在Taskkit。taskExecuor这个就报错了

IvyHelen

2016-12-22 21:25

@hotsmile 麻烦上传头像,支持社区发展啊。嘿嘿

suruozhong

2018-08-23 14:05

非常好,顶一个,在用了

热门反馈

扫码入社