大数据的时代,没有hadoop hbase怎么可以,还是老样子,从我公司的spring版本的项目中抽取出来,做成jfinal插件分享给大家。
插件很粗糙,大家可以自己修改
一个很简单的封装,大部分都是原生操作,配置是最简配置,大家可以自己扩展Configuration里面的set属性
最新版本的hbase-client已经实现了内部连接池,所以connection只需要在项目启动的时候获得,项目关闭的时候close即可。
 
首先加入依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.3</version> </dependency>
添加HbasePlugin
package jfinal.plugin.hbase;
import com.jfinal.plugin.IPlugin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
 * jfinal hbase插件
 *
 * @author 孙宇
 */
public class HbasePlugin implements IPlugin {
    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
    private String quorum;
    private String znode = "/hbase";
    private String encoding = "UTF-8";
    public HbasePlugin(String quorum) {
        this.quorum = quorum;
    }
    @Override
    public boolean start() {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", quorum);
        config.set("hbase.zookeeper.znode.parent", znode);
        config.set("hbase.encoding", encoding);
        try {
            Hbase.connection = ConnectionFactory.createConnection(config);
        } catch (IOException e) {
            e.printStackTrace();
        }
        Hbase.aggregationClient = new AggregationClient(config);
        return true;
    }
    @Override
    public boolean stop() {
        if (!Hbase.connection.isClosed()) {
            try {
                Hbase.connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            Hbase.aggregationClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }
}
添加Hbase工具类
package jfinal.plugin.hbase;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
 * hbase原生java操作工具类
 *
 * @author 孙宇
 */
public class Hbase {
    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
    private static final String aggregateImplementationCoprocessor = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
    public static Connection connection;
    public static AggregationClient aggregationClient;
    public static LongColumnInterpreter longColumnInterpreter = new LongColumnInterpreter();
    private static Admin getAdmin(Connection connection) {
        try {
            return connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static void createTable(String tableName,
                                   String[] familyNames) {
        Admin admin = getAdmin(connection);
        try {
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            hTableDescriptor.addCoprocessor(aggregateImplementationCoprocessor);
            for (String familyName : familyNames) {
                HColumnDescriptor family = new HColumnDescriptor(familyName);
                hTableDescriptor.addFamily(family);
            }
            admin.createTable(hTableDescriptor);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void addCoprocessor(String tableName,
                                      String className) {
        Admin admin = getAdmin(connection);
        try {
            admin.disableTable(TableName.valueOf(tableName));
            HTableDescriptor hTableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
            hTableDescriptor.addCoprocessor(className);
            admin.modifyTable(TableName.valueOf(tableName), hTableDescriptor);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.enableTable(TableName.valueOf(tableName));
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void deleteTable(String tableName) {
        Admin admin = getAdmin(connection);
        try {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static HTableDescriptor tableDescriptor(String tableName) {
        Admin admin = getAdmin(connection);
        HTableDescriptor desc = null;
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            desc = table.getTableDescriptor();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return desc;
    }
    public static void put(String tableName,
                           Put put) {
        put(tableName, Arrays.asList(put));
    }
    public static void put(String tableName,
                           List<Put> puts) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            table.put(puts);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static List<Result> scan(String tableName) {
        return scan(tableName, new Scan());
    }
    public static List<Result> scan(String tableName,
                                    Integer pageSize) {
        return scan(tableName, new Scan(), pageSize);
    }
    public static List<Result> scan(String tableName,
                                    Scan scan) {
        return scan(tableName, scan, null);
    }
    public static List<Result> scan(String tableName,
                                    Scan scan,
                                    Integer pageSize) {
        List<Result> results = new ArrayList<>();
        ResultScanner resultScanner = null;
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            resultScanner = table.getScanner(scan);
            if (pageSize != null) {
                Result[] rs = resultScanner.next(pageSize);
                for (Result result : rs) {
                    results.add(result);
                }
            } else {
                for (Result result : resultScanner) {
                    results.add(result);
                }
            }
        } catch (IOException e) {
            logger.error(ExceptionUtils.getStackTrace(e));
        } finally {
            if (resultScanner != null) {
                resultScanner.close();
            }
            try {
                table.close();
            } catch (IOException e) {
                logger.error(ExceptionUtils.getStackTrace(e));
            }
        }
        return results;
    }
    public static void delete(String tableName,
                              Delete del) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            table.delete(del);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static Result get(String tableName,
                             Get g) {
        Result[] result = get(tableName, Arrays.asList(g));
        if (result != null && result.length > 0) {
            return result[0];
        }
        return null;
    }
    public static Result[] get(String tableName,
                               List<Get> gets) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            return table.get(gets);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
    public static long rowCount(String tableName,
                                Scan scan) {
        try {
            return aggregationClient.rowCount(TableName.valueOf(tableName), longColumnInterpreter, scan);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        return 0;
    }
}
再来一个测试类吧,都是原生操作,很容易,就不写注释了,英文都很简单
package test.jfinal.plugin.hbase;
import jfinal.plugin.hbase.Hbase;
import jfinal.plugin.hbase.HbasePlugin;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
/**
 * @author 孙宇
 */
public class TestHbase {
    private static HbasePlugin p;
    @BeforeClass
    public static void beforeClass() {
        // p = new HbasePlugin("127.0.0.1:2181");
        p = new HbasePlugin("node67:2181,node68:2181,node69:2181");
        p.start();
    }
    @AfterClass
    public static void afterClass() {
        p.stop();
    }
    @Test    public void t1() {
        Hbase.createTable("sunyutable", new String[]{"f1", "f2"});
    }
    @Test    @Ignore    public void t2() {
        Hbase.deleteTable("sunyutable");
    }
    @Test    public void t3() {
        HTableDescriptor desc = Hbase.tableDescriptor("sunyutable");
        System.out.println(desc);
    }
    @Test    public void t4() {
        Put put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("f1列族q1列的值111"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q2"), Bytes.toBytes("f1列族q2列的值111"));
        Hbase.put("sunyutable", put);
    }
    @Test    public void t5() {
        List<Result> results = Hbase.scan("sunyutable");
        for (Result result : results) {
            byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
            System.out.println(Bytes.toString(value));
        }
    }
    @Test    public void t7() {
        Delete del = new Delete(Bytes.toBytes("row1"));
        Hbase.delete("sunyutable", del);
    }
    @Test    public void t8() {
        Get g = new Get(Bytes.toBytes("row1"));
        Result result = Hbase.get("sunyutable", g);
        byte[] value = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
        System.out.println(Bytes.toString(value));
    }
    @Test    @Ignore    public void t9() {
        Hbase.addCoprocessor("sunyutable", "org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
    }
    @Test    public void t10() {
        System.out.println(Hbase.rowCount("sunyutable", new Scan()));
    }
}
更多的API调用与操作,请看这里
https://hbase.apache.org/book.html
源码的话,请看这里
https://git.oschina.net/sypro/jfinalplugin.git
spring版本的话,请看这里
https://git.oschina.net/sypro/demo.git
 
 
 
 
 
 
 
