【分享】JFinal + ActiveMQ 插件

该插件是基于官网用户分享的基础上进行改造而来

http://www.jfinal.com/share/77

【maven】

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.10</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.7.0</version>
</dependency>

基础】注解类

package plus.jfinal.plugin.acitvemq;

import javax.jms.DeliveryMode;
import java.lang.annotation.*;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JmsListener {

    /**
     * 队列分组标识,默认 default
     * @return
     */
    String group() default "default";

    /**
     * 类型:主题还是队列
     * @return
     */
    Destination type();

    /**
     * 主题/队列名称
     * @return
     */
    String subject();

    /**
     * 交付模式;存储还是丢弃
     * @return
     */
    int deliveryMode() default DeliveryMode.NON_PERSISTENT;

}

【基础】发送者类

package plus.jfinal.plugin.acitvemq;

import lombok.Data;

import javax.jms.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 发送者
 */
@Data
public abstract class JmsSender {
    /**
     * 消息队列分组
     */
    private String group;

    private String subject;

    private Destination type;

    private int deliveryMode;
    /**
     * 会话
     */
    private Session session;
    /**
     * 消息生产者
     */
    private MessageProducer producer;

    //private static JmsSender sender;

    public static ConcurrentMap<String, JmsSender> sender = new ConcurrentHashMap<>();

    public JmsSender() {
        JmsListener listener = this.getClass().getAnnotation(JmsListener.class);
        if (listener != null) {
            this.subject = listener.subject();
            this.group = listener.group();
            this.type = listener.type();
            try {
                init();
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(String.format("%s 初始化失败", this.getClass().getName()));
            }
        } else {
            System.out.println("接收者丢失相关配置,请采用注解或者参数实例化方式");
        }
    }

    private void init() throws JMSException {
        String key = this.getClass().getName();
        if (sender.get(key) != null) {
            return;
        }
        this.group = group;
        // 事务性会话,自动确认消息
        this.session = ActiveMQ.getConnection(group).createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 消息的目的地(Queue/Topic)
        if (type.equals(Destination.Topic)) {
            Topic destination = session.createTopic(subject);
            producer = session.createProducer(destination);
        } else {
            Queue destination = session.createQueue(subject);
            producer = session.createProducer(destination);
        }
        this.deliveryMode = deliveryMode;
        // 不持久化消息
        producer.setDeliveryMode(deliveryMode);

        sender.put(key, this);
    }

    /*public void senderTextMessage(String text) {
        JmsSender _sender = sender.get(this.getClass().getName());
        try {
            _sender.getProducer().send(_sender.getSession().createTextMessage(text));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }*/

    public static void sendText(Class<? extends JmsSender> senderClass,String text){
        JmsSender s = sender.get(senderClass.getName());
        try {
            s.getProducer().send(s.getSession().createTextMessage(text));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

【基础】接收者类

package plus.jfinal.plugin.acitvemq;

import javax.jms.*;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;

public abstract class JmsReceiver implements MessageListener {

    private String group;

    private Destination type;

    private String subject;

    private Session session;

    private MessageConsumer consumer;

    public JmsReceiver(){
        JmsListener listener = this.getClass().getAnnotation(JmsListener.class);
        if(listener!=null){
            this.subject = listener.subject();
            this.group = listener.group();
            this.type = listener.type();
            try {
                init();
            } catch (JMSException e) {
                e.printStackTrace();
                System.out.println(String.format("%s 初始化失败", this.getClass().getName()));
            }
        }else{
            System.out.println("接收者丢失相关配置,请采用注解或者参数实例化方式");
        }
    }

    private void init() throws JMSException {
        // 事务性会话,自动确认消息
        session = ActiveMQ.getConnection(group).createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 消息的目的地(Queue/Topic)
        if (type.equals(Destination.Topic)) {
            Topic destination = session.createTopic(subject);
            consumer = session.createConsumer(destination);
        } else {
            Queue destination = session.createQueue(subject);
            consumer = session.createConsumer(destination);
        }
        consumer.setMessageListener(this);
    }

    public abstract void onTextMessage(String text);

    public abstract void onMapMessage(Map map);

    public abstract void onStreamMessage(StreamMessage streamMessage);

    public abstract void onObjectMessage(ObjectMessage objectMessage);

    public abstract void onBytesMessage(BytesMessage bytesMessage);

    public abstract void onOtherMessage(Message message);

    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage msg = (TextMessage) message;
                onTextMessage(msg.getText());
            } else if (message instanceof MapMessage) {
                MapMessage msg = (MapMessage) message;
                Map data = new HashMap();
                Enumeration enumer = msg.getMapNames();
                while (enumer.hasMoreElements()) {
                    Object obj = enumer.nextElement();
                    data.put(obj.toString(),msg.getObject(obj.toString()));
                }
                onMapMessage(data);
            } else if (message instanceof StreamMessage) {
                onStreamMessage((StreamMessage) message);
            } else if (message instanceof ObjectMessage) {
                onObjectMessage((ObjectMessage) message);
            } else if (message instanceof BytesMessage) {
                onBytesMessage((BytesMessage) message);
            } else {
                onOtherMessage(message);
            }
            /*try {
                if (message instanceof TextMessage) {
                    TextMessage msg = (TextMessage) message;
                    System.out.println(msg.getText());
                } else if (message instanceof MapMessage) {
                    MapMessage msg = (MapMessage) message;
                    Enumeration enumer = msg.getMapNames();
                    while (enumer.hasMoreElements()) {
                        Object obj = enumer.nextElement();
                        System.out.println(msg.getObject(obj.toString()));
                    }
                } else if (message instanceof StreamMessage) {
                    StreamMessage msg = (StreamMessage) message;
                    System.out.println(msg.readString());
                    System.out.println(msg.readBoolean());
                    System.out.println(msg.readLong());
                } else if (message instanceof ObjectMessage) {
                    ObjectMessage msg = (ObjectMessage) message;
                    System.out.println(msg);
                } else if (message instanceof BytesMessage) {
                    BytesMessage msg = (BytesMessage) message;
                    byte[] byteContent = new byte[1024];
                    int length = -1;
                    StringBuffer content = new StringBuffer();
                    while ((length = msg.readBytes(byteContent)) != -1) {
                        content.append(new String(byteContent, 0, length));
                    }
                    System.out.println(content.toString());
                } else {
                    System.out.println(message);
                }*/
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

【基础】工具类

package plus.jfinal.plugin.acitvemq;

import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.activemq.pool.PooledConnection;

import java.util.concurrent.ConcurrentHashMap;

public class ActiveMQ {

    private static final Log log = LogFactory.get(Thread.currentThread().getClass());

    public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>();
    public static final String defaultName = "default";


    public static void addConnection(String connectionName,
                                     PooledConnection connection) {
        pooledConnectionMap.put(connectionName, connection);
    }

    public static PooledConnection getConnection() {
        return pooledConnectionMap.get(defaultName);
    }

    public static PooledConnection getConnection(String connectionName) {
        return pooledConnectionMap.get(connectionName);
    }
}

【基础】枚举

package plus.jfinal.plugin.acitvemq;

public enum Destination {
    Queue, Topic
}

【插件】插件类以及main方法

package plus.jfinal.plugin.acitvemq;

import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jfinal.plugin.IPlugin;
import lombok.Data;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.Date;@Datapublic class JFinalActiveMQPlugin implements IPlugin {

    private static final Log log = LogFactory.get();

    private String url;
    private String name;
    private String username;

    private String password;

    public JFinalActiveMQPlugin(String url,
                                String name, String username, String password) {
        this.url = url;
        this.name = name;
        this.username = username;
        this.password = password;
    }

    public JFinalActiveMQPlugin(String url) {
        this.url = url;
        this.name = ActiveMQ.defaultName;
        this.username = ActiveMQConnection.DEFAULT_USER;
        this.password = ActiveMQConnection.DEFAULT_PASSWORD;
    }

    public JFinalActiveMQPlugin(String url,String username,String password){
        this.url = url;
        this.name = ActiveMQ.defaultName;
        this.username = StrUtil.isBlank(username)?ActiveMQConnection.DEFAULT_USER:username;
        this.password = StrUtil.isBlank(password)?ActiveMQConnection.DEFAULT_PASSWORD:password;
    }


    @Override
    public boolean start() {
        log.info("初始化 activemq [{}] url = {}",name,url);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setUserName(username);
        activeMQConnectionFactory.setPassword(password);
        activeMQConnectionFactory.setBrokerURL(url);
        activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
        pooledConnectionFactory.setMaximumActiveSessionPerConnection(200);
        pooledConnectionFactory.setIdleTimeout(120);
        pooledConnectionFactory.setMaxConnections(5);
        pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
        try {
            PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection();
            connection.start();
            ActiveMQ.pooledConnectionMap.put(name, connection);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return true;
    }

    @Override
    public boolean stop() {
        return true;
    }

    public static void main(String[] args) throws JMSException {
        JFinalActiveMQPlugin p = new JFinalActiveMQPlugin("failover://(tcp://192.168.71.155:61616)?initialReconnectDelay=1000","l*******","l*******");
        p.start();

        String subject = "test";
        // 注解方式
        new TestSender();
        new TestReceiver();
        // 参数实例化方式
        //new TestSender("default",Destination.Queue, subject);
        //new TestReceiver("default",Destination.Queue, subject);
        for (int i = 0; i < 10; i++) {
            new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    TestSender.senderText("测试" + new Date());
                }
            }.run();
        }
    }
}

【demo】发送者

package plus.jfinal.plugin.acitvemq;

import javax.jms.JMSException;

@JmsListener(type=Destination.Queue,subject = "test")
public class TestSender extends JmsSender{
    
}

【demo】接收者

package plus.jfinal.plugin.acitvemq;

import javax.jms.*;
import java.util.Map;

@JmsListener(type=Destination.Queue,subject = "test")
public class TestReceiver extends JmsReceiver{


    @Override
    public void onTextMessage(String text) {
        System.out.println("收到消息:"+text);
    }

    @Override
    public void onMapMessage(Map map) {

    }

    @Override
    public void onStreamMessage(StreamMessage streamMessage) {

    }

    @Override
    public void onObjectMessage(ObjectMessage objectMessage) {

    }

    @Override
    public void onBytesMessage(BytesMessage bytesMessage) {

    }

    @Override
    public void onOtherMessage(Message message) {

    }
}

【配置文件】

[default]
url=tcp://192.168.71.155:61616
initialReconnectDelay=1000
username=l***
password=l***

【运行方式】

public static void main(String[] args) throws JMSException {
    JFinalActiveMQPlugin p = new JFinalActiveMQPlugin("failover://(tcp://192.168.71.155:61616)?initialReconnectDelay=1000","l****","l****");
    p.start();

    String subject = "test";
    
    /**
     * 以下实例化方式有两种
     * 1)注解方式,直接无参构造
     * 2)参数实例化方式
     */
    
    // 注解方式
    new TestSender();
    new TestReceiver();
    // 参数实例化方式
    //new TestSender("default",Destination.Queue, subject);
    //new TestReceiver("default",Destination.Queue, subject);
    
    
    for (int i = 0; i < 10; i++) {
        new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                JmsSender.sendText(TestSender.class,"测试" + new Date());
            }
        }.run();
    }
}


## 关于发送者和接受者的实例化,大家可以考虑在 服务启动后进行注解扫描启动【推荐】,或者在程序启动完成后的代码逻辑中实例化;不要在 插件 add 之后就立马实例化发送或者接受者,这个会有问题的,因为程序并未真正的执行了启动方法哦~


-----

2019/11/14 修改

发现之前版本一个bug,故修改了调用方式

JmsSender.sendText(TestSender.class,"测试" + new Date());


评论区

JFinal

2019-11-08 15:19

JFinal + ActiveMQ 的分享极为稀有,谢谢你的分享,赞

bluezqh

2019-11-08 22:24

正需要,非常感谢

prelove

2019-11-09 16:43

感谢!!!