前阵子发了一个ActiveMQ的插件,我们项目还使用了Kafka,所以抽个时间把spring版的转换了jFinal plugin出来给大家分享。
至于kafka是个什么东西,我这里就不描述了,节约jfinal的数据库^_^,你能看到这篇分享,说明你对kafka有一定了解。
首先定义一个消费者模版KafkaConsumerTemplate
package jfinal.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
 * 消费者模版
 *
 * @author 孙宇
 */
public class KafkaConsumerTemplate {
    private boolean runner = true;
    public KafkaConsumerTemplate(String servers,
                                 String keyDeserializer,
                                 String valueDeserializer,
                                 String group,
                                 String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("key.deserializer", keyDeserializer);
        props.put("value.deserializer", valueDeserializer);
        props.put("group.id", group);//不同ID 可以同时订阅消息
        /*
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");*/
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));//订阅TOPIC
        try {
            while (runner) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        //可以自定义Handler,处理对应的TOPIC消息(partitionRecords.key())
                        System.out.println(record.offset() + ": " + record.value());
                    }
                    /*consumer.commitSync();//同步*/
                }
            }
        } finally {
            consumer.close();
        }
    }
    public void setRunner(boolean runner) {
        this.runner = runner;
    }
}
编写Kafka工具类
package jfinal.plugin.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
 * kafka工具类
 *
 * @author 孙宇
 */
public class Kafka {
    public static final String defaultName = "main";
    private static final Logger logger = LoggerFactory.getLogger(Kafka.class);
    public static final ConcurrentHashMap<String, Producer> producerMap = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, KafkaConsumerTemplate> consumerMap = new ConcurrentHashMap<>();
    public static void addProducer(String name,
                                   String servers,
                                   String keySerializer,
                                   String valueSerializer) {
        logger.info("添加生产者:{}", name);
        if (producerMap.containsKey(name)) {
            logger.error("{}已存在!", name);
        } else {
            Properties props = new Properties();
            props.put("bootstrap.servers", servers);
            props.put("key.serializer", keySerializer);
            props.put("value.serializer", valueSerializer);
            /*props.put("acks", "all");
            props.put("retries ", 1);
            props.put("buffer.memory", 33554432);
            */
            Producer<String, String> producer = new KafkaProducer<>(props);
            producerMap.put(name, producer);
        }
    }
    public Producer getProducer(String name) {
        return producerMap.get(name);
    }
    public static Future send(String producerName,
                              ProducerRecord record) {
        return producerMap.get(producerName).send(record);
    }
    public static void addConsumer(String name,
                                   KafkaConsumerTemplate consumer) {
        logger.info("添加消费者:{}", name);
        if (consumerMap.containsKey(name)) {
            logger.error("{}已存在!", name);
        } else {
            consumerMap.put(name, consumer);
        }
    }
    public KafkaConsumerTemplate getConsumer(String name) {
        return consumerMap.get(name);
    }
}
编写插件KafkaPlugin
package jfinal.plugin.kafka;
import com.jfinal.plugin.IPlugin;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
 * jFinal kafka插件
 *
 * @author 孙宇
 */
public class KafkaPlugin implements IPlugin {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    public KafkaPlugin(String name,
                       String servers,
                       String keySerializer,
                       String valueSerializer) {
        Kafka.addProducer(name, servers, keySerializer, valueSerializer);
    }
    @Override
    public boolean start() {
        return true;
    }
    @Override
    public boolean stop() {
        logger.info("销毁所有生产者和消费者开始");
        for (Map.Entry<String, Producer> entry : Kafka.producerMap.entrySet()) {
            entry.getValue().close();
        }
        for (Map.Entry<String, KafkaConsumerTemplate> entry : Kafka.consumerMap.entrySet()) {
            entry.getValue().setRunner(false);
        }
        logger.info("销毁所有生产者和消费者结束");
        return true;
    }
}
在使用的时候,首先
//添加生产者 KafkaPlugin p = new KafkaPlugin(Kafka.defaultName, "127.0.0.1:9092", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer"); p.start();
有了生产者,也要有消费者
//添加消费者
KafkaConsumerTemplate consumer = new KafkaConsumerTemplate("127.0.0.1:9092", "org.apache.kafka.common.serialization.StringDeserializer", "org.apache.kafka.common.serialization.StringDeserializer", "test.group", "test.topic");
Kafka.addConsumer(Kafka.defaultName, consumer);
发送消息时,调用
Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊"));
当然,如果你需要测试,即时看到消息,那么还可以
Future f = Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊" + new Date()));
f.get();
项目结束的时候调用
p.stop();
我们来编写个测试吧
首先我们定义消费者类
package test.jfinal.plugin.kafka;
import jfinal.plugin.kafka.Kafka;
import jfinal.plugin.kafka.KafkaConsumerTemplate;
import java.util.concurrent.ExecutionException;
/**
 * kafka测试类
 *
 * @author 孙宇
 */
public class TestKafkaConsumer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //添加消费者
        KafkaConsumerTemplate consumer = new KafkaConsumerTemplate("127.0.0.1:9092", "org.apache.kafka.common.serialization.StringDeserializer", "org.apache.kafka.common.serialization.StringDeserializer", "test.group", "test.topic");
        Kafka.addConsumer(Kafka.defaultName, consumer);
    }
}
然后定义生产者和发送消息的类
package test.jfinal.plugin.kafka;
import jfinal.plugin.kafka.Kafka;
import jfinal.plugin.kafka.KafkaPlugin;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
 * kafka测试类
 *
 * @author 孙宇
 */
public class TestKafkaProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //添加生产者
        KafkaPlugin p = new KafkaPlugin(Kafka.defaultName, "127.0.0.1:9092", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer");
        p.start();
        for (int i = 0; i < 10; i++) {
            TimeUnit.SECONDS.sleep(1);
            //模拟发送消息
            Future f = Kafka.send(Kafka.defaultName, new ProducerRecord<String, String>("test.topic", "keykey", "msgmsg中文消息啊" + new Date()));
            f.get();
        }
        p.stop();
    }
}
然后就可以进行测试了,
注意::注意::注意::
启动是有顺序的,由于测试类我没有使用线程,所以你要先启动
TestKafkaConsumer
等TestKafkaConsumer启动完毕后,再启动TestKafkaProducer
然后切换到TestKafkaConsumer的Console,就可以看到生产者发送的消息了
一定要切换控制台哦。
只是在测试的时候有启动顺序, 你真正项目使用的时候,生产者和消费者初始化时没顺序要求的。
老样子,源码如下:
jfinal版的
https://git.oschina.net/sypro/jfinalplugin.git
spring版的
https://git.oschina.net/sypro/demo.git
要求jFinal支持markdown!!!这个编辑器太难用了,文章太长,调格式的时候,上下滑动,把编辑器的工具条固定置顶也行啊。。。。。
markdown 逐步添上,现在没有时间,试试这个编辑器的全屏编辑功能,上下滑动就大大减少了