项目中用到了ActiveMQ,本来是用spring方式写的,现在转换成jFinal给大家参考
只是一个粗略的插件,没有更严谨,大家可以自己改改
这个插件可以实现多链接(内部也有连接池)、多消息类型的发送
首先我是maven项目,加入依赖
<dependency> <groupId>com.jfinal</groupId> <artifactId>jfinal</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.14.0</version> </dependency>
定义Sender类
package jfinal.plugin.activemq;
import org.apache.activemq.pool.PooledConnection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
/**
* 消息发送者
*
* @author 孙宇
*/
public class JmsSender {
private String name;
private Session session;
private MessageProducer producer;
public JmsSender(String name,
PooledConnection connection,
Destination type,
String subject) throws JMSException {
this.name = name;
// 事务性会话,自动确认消息
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
if (type.equals(Destination.Topic)) {
Topic destination = session.createTopic(subject);
producer = session.createProducer(destination);
} else {
Queue destination = session.createQueue(subject);
producer = session.createProducer(destination);
}
// 不持久化消息
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public String getName() {
return name;
}
public Session getSession() {
return session;
}
public void sendMessage(Message message) throws JMSException {
producer.send(message);
}
}
定义Receiver类
package jfinal.plugin.activemq;
import org.apache.activemq.pool.PooledConnection;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Enumeration;
/**
* 消息接收者
*
* @author 孙宇
*/
public class JmsReceiver implements MessageListener {
private String name;
private Session session;
private MessageConsumer consumer;
public JmsReceiver(String name,
PooledConnection connection,
Destination type,
String subject) throws JMSException {
this.name = name;
// 事务性会话,自动确认消息
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
if (type.equals(Destination.Topic)) {
Topic destination = session.createTopic(subject);
consumer = session.createConsumer(destination);
} else {
Queue destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
consumer.setMessageListener(this);
}
public String getName() {
return name;
}
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
System.out.println(msg.getText());
} else if (message instanceof MapMessage) {
MapMessage msg = (MapMessage) message;
Enumeration enumer = msg.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(msg.getObject(obj.toString()));
}
} else if (message instanceof StreamMessage) {
StreamMessage msg = (StreamMessage) message;
System.out.println(msg.readString());
System.out.println(msg.readBoolean());
System.out.println(msg.readLong());
} else if (message instanceof ObjectMessage) {
ObjectMessage msg = (ObjectMessage) message;
System.out.println(msg);
} else if (message instanceof BytesMessage) {
BytesMessage msg = (BytesMessage) message;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = msg.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
定义类型
package jfinal.plugin.activemq;
/**
* 目标类型
*
* @author 孙宇
*/
public enum Destination {
Queue, Topic
}
activeMq插件
package jfinal.plugin.activemq;
import com.jfinal.plugin.IPlugin;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
/**
* @author 孙宇
*/
public class ActiveMQPlugin implements IPlugin {
private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
private String url;
private String name;
public ActiveMQPlugin(String url,
String name) {
this.url = url;
this.name = name;
}
public ActiveMQPlugin(String url) {
this.url = url;
this.name = ActiveMQ.defaultName;
}
@Override
public boolean start() {
logger.info("初始化activeMQ配置");
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER);
activeMQConnectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD);
activeMQConnectionFactory.setBrokerURL(url);
activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(200);
pooledConnectionFactory.setIdleTimeout(120);
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
try {
PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection();
connection.start();
ActiveMQ.pooledConnectionMap.put(name, connection);
} catch (JMSException e) {
e.printStackTrace();
}
return true;
}
@Override
public boolean stop() {
return true;
}
}
activeMQ工具
package jfinal.plugin.activemq;
import org.apache.activemq.pool.PooledConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 孙宇
*/
public class ActiveMQ {
public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>();
public static final ConcurrentHashMap<String, JmsSender> senderMap = new ConcurrentHashMap<>();
public static final ConcurrentHashMap<String, JmsReceiver> receiverMap = new ConcurrentHashMap<>();
public static final String defaultName = "main";
private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
public static void addSender(JmsSender sender) {
senderMap.put(sender.getName(), sender);
}
public static JmsSender getSender(String name) {
return senderMap.get(name);
}
public static void addReceiver(JmsReceiver receiver) {
receiverMap.put(receiver.getName(), receiver);
}
public static JmsReceiver getReceiver(String name) {
return receiverMap.get(name);
}
public static void addConnection(String connectionName,
PooledConnection connection) {
pooledConnectionMap.put(connectionName, connection);
}
public static PooledConnection getConnection() {
return pooledConnectionMap.get(defaultName);
}
public static PooledConnection getConnection(String connectionName) {
return pooledConnectionMap.get(connectionName);
}
}
使用的时候,
ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
p.start();
当然,这样只是创建了链接池,但是没有生产者和消费者
还要添加这两个对象
String subject = "test";
ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
发送一个消息
JmsSender sq1 = ActiveMQ.getSender("testSender1");
TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
sq1.sendMessage(msg);
我里面定义了很多消息类型,不一定非要文本型
一个测试类
package test.jfinal.plugin.activemq;
import jfinal.plugin.activemq.ActiveMQ;
import jfinal.plugin.activemq.ActiveMQPlugin;
import jfinal.plugin.activemq.Destination;
import jfinal.plugin.activemq.JmsReceiver;
import jfinal.plugin.activemq.JmsSender;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.Date;
/**
* @author 孙宇
*/
public class TestActiveMQPlugin {
public static void main(String[] args) throws JMSException {
ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
p.start();
String subject = "test";
ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
for (int i = 0; i < 10; i++) {
new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
JmsSender sq1 = ActiveMQ.getSender("testSender1");
TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
sq1.sendMessage(msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}.run();
}
}
}
有兴趣你也可以下载源码跑一下,执行TestActiveMQPlugin类就行
https://git.oschina.net/sypro/jfinalplugin.git
想看spring版的,看这个源码
https://git.oschina.net/sypro/demo.git