hbase 数据获取方式

  1. 直接根据 rowkey 查找,速度最快
  2. scan,指定 startrowkey、endrowkey 和 limit获取数据,在 rowkey 设计良好的情况下,效率也不错
  3. 全表扫,强烈不推荐这种做法,效率极差,在线业务不用考虑这种方式

hbase 数据排序怎么做?

我觉得这个分两种情况,一是数据量比较少,业务上每次拉取所有的数据,可以在客户端做排序,二是数据比较多,需要分页,这种情况下客户端做显然不合适,因为要从服务器拉取所有数据,排序完成,获取某一页,剩余的数据全都不用,资源损耗比较严重,比较推荐做法是充分利用 hbase rowkey 的特性,数据是按照 rowkey 字典序排列的,如果排序字段是不变的,可以把排序字段加到 rowkey 里,这样吐出的数据自然就是有序的。

如何排序的字段是可以变的呢?

假设业务上有个查询,排序的字段是可以变得,这样放到 rowkey 里就不合适了,因为排序字段变,意味着 rowkey 也会变,不是推荐的做法。这时候考虑的一种实现就是使用 redis 维护一个zset 索引,score 是排序字段,value 是对应记录的 rowkey,每次排序的字段变了,就去更新 zset 对应的数据。查询的话就相当于先去 redis 查出 rowkey 列表,然后根据 rowkey 列表去 hbase 批量查。

产品分页的方式?

常见的两种方式,一种是更多这种按钮,不支持跳转到某一页,一种是可以选择某个特定的页进行跳转。这两种方式 hbase 在实现上会有区别,下面会分别介绍下。

更多方式的分页

这种分页不支持跳转到某一页,只能不断地下一页下一页,使用 hbase 可以以一种比较简单的方式实现。由服务端告诉 app 端或者 web 端下一页请求的参数,假设某一页获取20条数据,服务端去获取21条,第21条数据的 rowkey 就是下一次扫描的 startrowkey,把它加到返回给 web 或者 app 的参数里,这样就可以实现分页。有人可能会说,假设下一页的 startrowkey 返回给前端之后,这时候有新的数据插入,不是会有问题吗?这种情况其实还好,首先互联网应用大多是读多写少,你浏览某个列表时,列表内容更新的概率本来就小,就算真的发生,数据会按照排序方式插入到列表首,你不刷新首屏内容,仅仅也就是新加的内容没展现出来,不影响其他内容的展示,而只要你一刷新首屏,新的内容就出来了。

直接跳转到某一页

假设分页可以直接跳转到某一页呢?这个用 hbase 实现确认比较尴尬,hbase scan 扫描的时候本来就是根据 rowkey 范围和 limit 扫描,想到的实现方式依旧是 zset,score 排序字段,value 是 rowkey。也不一定非用 redis,反正就是要有一个地方维护查询索引。去 hbase 直接根据 rowkey 查询。

简单Hbase分页方案

某位仁兄发给的,不知道怎么样::::

网上大多数分页方案分为从服务端分页或者从客户端分页 服务端分页方式主要利用PageFilter过滤器,首先太复杂,其次针对集群的兼容性不是很好,作者利用服务端分页+客户端分页结合方式给出一种简单易行的中间方案。

Filter pageSize = new PageFilter(pageSize *pageNo);
            List<Result> resultList = new ArrayList<>();
            // 计算起始页和结束页
            Integer firstPage = (pageNo) *pageSize;
            Integer endPage = firstPage + pageSize;

            //客户端分页
            int i = 0;

            for (Result rs : scanner) {
                if (!rs.isEmpty() && i >= firstPage && i < endPage) {
                    resultList.add(rs);
                }
                if (resultList.size() == log.getPageSize()) {
                    break;
                }
                i++;
            }

    public static void mai(String[] args){
            Connection connection = HBaseClientUtil.getConnection();
            table = connection.getTable(TableName.valueOf(ProcessLogUtil.HB_TB_NAME));
            Scan scan = new Scan();
            Filter pageSizeFilter = new PageFilter(pageSize *pageNo);
            scan.setFilter(pageSizeFilter );
            ResultScanner scanner = table.getScanner(scan);
            List<Result> resultList = new ArrayList<>();
            // 计算起始页和结束页
            Integer firstPage = (pageNo) *pageSize;
            Integer endPage = firstPage + pageSize;

            //客户端分页
            int i = 0;

            for (Result rs : scanner) {
                if (!rs.isEmpty() && i >= firstPage && i < endPage) {
                    resultList.add(rs);
                }
                if (resultList.size() == log.getPageSize()) {
                    break;
                }
                i++;
            }
    }
           
package cp.app.service.impl;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;

import cp.app.batch.utils.ConfigUtil;
import cp.app.comm.CpConstants;
import cp.app.service.HBaseService;

/**
 * HBase查询与插入操作工具类
 *
 * @author author
 *
 */
 //采用注入方式,HBaseService为定义的查询接口,可不需要。 
@Service
public class HBaseServiceImpl implements HBaseService{

    private static Logger log = Logger.getLogger(HBaseServiceImpl.class.getName());

    static ConfigUtil util = new ConfigUtil("conf/zookeeper.properties");
    private static final String HBASE_ZOOKEEPER_QUORUM = util
            .getString("hbase_zookeeper_quorum");
    private static final String ZOOKEEPER_ZNODE_PARENT = util
            .getString("zookeeper_znode_parent");
    private static Configuration conf = HBaseConfiguration.create();
    static {
        conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM);
        conf.set("zookeeper.znode.parent", ZOOKEEPER_ZNODE_PARENT);
    }

    /**
     * 创建表
     *
     * @param tableName
     *            表名
     * @param columnFamily
     *            列簇集合
     * @return 成功-true 失败-false
     */
    @SuppressWarnings("resource")
    public boolean createTable(String tableName, List<String> columnFamily) {
        try {
            if (StringUtils.isBlank(tableName) || columnFamily == null
                    || columnFamily.size() < 0) {
                log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
            }
            HBaseAdmin admin = new HBaseAdmin(conf);
            if (admin.tableExists(tableName)) {
                return true;
            } else {
                HTableDescriptor tableDescriptor = new HTableDescriptor(
                        TableName.valueOf(tableName));
                for (String cf : columnFamily) {
                    tableDescriptor.addFamily(new HColumnDescriptor(cf));
                }
                admin.createTable(tableDescriptor);
                log.info("===Create Table " + tableName
                        + " Success!columnFamily:" + columnFamily.toString()
                        + "===");
            }
        } catch (MasterNotRunningException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (ZooKeeperConnectionException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        }
        return true;
    }

    /**
     * 查询单条记录
     *
     * @param tableName
     *            表名
     * @param rowKey
     *            rowKey值
     * @return 返回单条记录
     */
    public List<Map<String, String>> selectOneByRowKey(String tableName,
            String rowKey) {
        if (StringUtils.isBlank(rowKey) || StringUtils.isBlank(tableName)) {
            log.error("===Parameters tableName|rowKey should not be blank,Please check!===");
            return null;
        }
        List<Map<String, String>> rowList = new ArrayList<Map<String, String>>();
        try {
            Get get = new Get(Bytes.toBytes(rowKey));
            HTableInterface hTable = getHTable(tableName);
            if (hTable != null) {
                Result result = hTable.get(get);
                Map<String, String> cellMap = getRowByResult(result);
                rowList.add(cellMap);
            }
            hTable.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return rowList;
    }

    /**
     * 分页查询表数据
     *
     * @param tableName
     *            表名
     * @param ddate
     *            数据日期
     * @param pageSize
     *            页大小
     * @param lastrowKey
     *            起始rowkey值
     * @return 返回查询数据结果集
     */
    public List<Map<String, String>> selectAllByPage(String tableName,
            String ddate, int pageSize, String lastrowKey) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")
                || StringUtils.isBlank(lastrowKey)) {
            log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!===");
            return null;
        }
        HTable hTable = (HTable) getHTable(tableName);
        Scan scan = new Scan();
        FilterList filterList = new FilterList(
                FilterList.Operator.MUST_PASS_ALL);
        Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,
                new SubstringComparator(ddate));
        Filter pageFilter = new PageFilter(pageSize);
        filterList.addFilter(rowFilter1);
        filterList.addFilter(pageFilter);
        if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) {
            Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER,
                    new BinaryComparator(Bytes.toBytes(lastrowKey)));
            filterList.addFilter(rowFilter2);
        }
        scan.setFilter(filterList);
        List<Map<String, String>> lists = new ArrayList<Map<String, String>>();
        try {
            ResultScanner rs = hTable.getScanner(scan);
            for (Result result : rs) {
                lists.add(getRowByResult(result));
            }
            hTable.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return lists;
    }

    /**
     * 根据状态分页查询表数据
     *
     * @param tableName
     *            表名
     * @param ddate
     *            数据日期
     * @param pageSize
     *            页大小
     * @param lastrowKey
     *            起始rowkey值
     * @param status
     *            发送状态
     * @return 返回查询数据结果集
     */
    public List<Map<String, String>> selectAllByPageStatus(String tableName,
            String ddate, int pageSize, String lastrowKey, String status) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")
                || StringUtils.isBlank(lastrowKey)) {
            log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!===");
            return null;
        }
        HTable hTable = (HTable) getHTable(tableName);
        Scan scan = new Scan();
        FilterList filterList = new FilterList(
                FilterList.Operator.MUST_PASS_ALL);
        filterList
                .addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),
                        Bytes.toBytes("status"), CompareOp.EQUAL, Bytes
                                .toBytes(status)));
        Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,
                new SubstringComparator(ddate));
        Filter pageFilter = new PageFilter(pageSize);
        filterList.addFilter(rowFilter1);
        filterList.addFilter(pageFilter);
        if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) {
            Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER,
                    new BinaryComparator(Bytes.toBytes(lastrowKey)));
            filterList.addFilter(rowFilter2);
        }
        scan.setFilter(filterList);
        List<Map<String, String>> lists = new ArrayList<Map<String, String>>();
        try {
            ResultScanner rs = hTable.getScanner(scan);
            for (Result result : rs) {
                lists.add(getRowByResult(result));
            }
            hTable.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return lists;
    }

    /**
     * 获取页数
     *
     * @param tableName
     *            表名
     * @param ddate
     *            数据日期
     * @param pageSize
     *            分页大小
     * @return 返回页数
     */
    public int getPages(String tableName, String ddate, int pageSize) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")) {
            log.error("===Parameters tableName|ddate|pageSize should not be blank,Please check!===");
            return 0;
        }
        enableAggregation(tableName);
        int total = 0;
        try {
            HTable hTable = (HTable) getHTable(tableName);
            Scan scan = new Scan();
            Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                    new SubstringComparator(ddate));
            scan.setFilter(rowFilter);
            AggregationClient aggregation = new AggregationClient(conf);
            Long count = aggregation.rowCount(hTable,
                    new LongColumnInterpreter(), scan);
            total = count.intValue();
            hTable.close();
        } catch (Throwable e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return (total % pageSize == 0) ? total / pageSize
                : (total / pageSize) + 1;
    }

    /**
     * 根据发送状态获取页数
     *
     * @param tableName
     *            表名
     * @param ddate
     *            数据日期
     * @param pageSize
     *            分页大小
     * @param status
     *            发送状态
     * @return 返回页数
     */
    public int getPagesByStatus(String tableName, String ddate, int pageSize,
            String status) {
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
                || StringUtils.isBlank(pageSize + "")
                || StringUtils.isBlank(status)) {
            log.error("===Parameters tableName|ddate|pageSize|status should not be blank,Please check!===");
            return 0;
        }
        enableAggregation(tableName);
        int total = 0;
        try {
            HTable hTable = (HTable) getHTable(tableName);
            Scan scan = new Scan();
            FilterList filterList = new FilterList(
                    FilterList.Operator.MUST_PASS_ALL);
            Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
                    new SubstringComparator(ddate));
            filterList.addFilter(rowFilter);
            filterList.addFilter(new SingleColumnValueFilter(Bytes
                    .toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL,
                    Bytes.toBytes(status)));
            scan.setFilter(filterList);
            AggregationClient aggregation = new AggregationClient(conf);
            Long count = aggregation.rowCount(hTable,
                    new LongColumnInterpreter(), scan);
            total = count.intValue();
            hTable.close();
        } catch (Throwable e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
        return (total % pageSize == 0) ? total / pageSize
                : (total / pageSize) + 1;
    }

    /**
     * 获取同一个rowkey下的记录集合
     *
     * @param result
     *            结果集
     * @return
     */
    private Map<String, String> getRowByResult(Result result) {
        if (result == null) {
            log.error("===Parameter |result| should not be null,Please check!===");
            return null;
        }
        Map<String, String> cellMap = new HashMap<String, String>();
        for (Cell cell : result.listCells()) {
            String rowkey = Bytes.toString(cell.getRowArray(),
                    cell.getRowOffset(), cell.getRowLength());
            String cf = Bytes.toString(cell.getFamilyArray(),
                    cell.getFamilyOffset(), cell.getFamilyLength());
            String qf = Bytes.toString(cell.getQualifierArray(),
                    cell.getQualifierOffset(), cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(),
                    cell.getValueOffset(), cell.getValueLength());
            cellMap.put(CpConstants.HBASE_TABLE_PROP_ROWKEY, rowkey);
            cellMap.put(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY, cf);
            cellMap.put(qf, value);
        }
        return cellMap;
    }

    /**
     * 获取HTableInterface
     *
     * @param tableName
     *            表名
     * @return 返回HTableInterface实例
     */
    private HTableInterface getHTable(String tableName) {
        if (StringUtils.isBlank(tableName)) {
            log.error("===Parameter |tableName| should not be blank,Please check!===");
            return null;
        }
        HTableInterface hTable = null;
        try {
            HConnection conn = HConnectionManager.createConnection(conf);
            hTable = conn.getTable(Bytes.toBytes(tableName));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return null;
        }
        return hTable;
    }

    /**
     * 批量插入或更新
     *
     * @param tableName
     *            表名
     * @param paraList
     *            组装成json或xml后的参数
     * @return 成功-true 失败-false
     */
    public boolean batchPut(String tableName, List<Map<String, String>> paraList) {
        try {
            List<Put> puts = new ArrayList<Put>();
            for (Map<String, String> map : paraList) {
                Put put = getPutByMap(map);
                puts.add(put);
            }
            HTable hTable = (HTable) getHTable(tableName);
            hTable.put(puts);
            hTable.close();
        } catch (RetriesExhaustedWithDetailsException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (InterruptedIOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
            return false;
        }
        return true;
    }

    /**
     * 根据map返回put
     *
     * @param paraMap
     *            参数map
     * @return 返回put
     */
    private Put getPutByMap(Map<String, String> paraMap) {
        if (paraMap == null) {
            log.error("===Parameter |paraMap| should not be null,Please check!===");
            return null;
        }
        Set<Entry<String, String>> set = paraMap.entrySet();
        Iterator<Entry<String, String>> it = set.iterator();
        byte[] rowkey = Bytes.toBytes(paraMap
                .get(CpConstants.HBASE_TABLE_PROP_ROWKEY));
        byte[] columnfamily = Bytes.toBytes(paraMap
                .get(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY));
        Put put = new Put(rowkey);
        while (it.hasNext()) {
            Entry<String, String> entry = it.next();
            String key = entry.getKey();
            if (!CpConstants.HBASE_TABLE_PROP_ROWKEY.equals(key)
                    && !CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY.equals(key)) {
                String value = entry.getValue();
                put.add(columnfamily, Bytes.toBytes(key), Bytes.toBytes(value));
            }
        }
        return put;
    }

    /**
     * 使表具有聚合功能
     *
     * @param tableName
     *            表名
     */
    @SuppressWarnings("resource")
    private void enableAggregation(String tableName) {
        String coprocessorName = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        try {
            HBaseAdmin admin = new HBaseAdmin(conf);
            HTableDescriptor htd = admin.getTableDescriptor(Bytes
                    .toBytes(tableName));
            List<String> coprocessors = htd.getCoprocessors();
            if (coprocessors != null && coprocessors.size() > 0) {
                return;
            } else {
                admin.disableTable(tableName);
                htd.addCoprocessor(coprocessorName);
                admin.modifyTable(tableName, htd);
                admin.enableTable(tableName);
            }
        } catch (TableNotFoundException e) {
            // TODO Auto-generated catch block
            log.error(e);
        } catch (MasterNotRunningException e) {
            // TODO Auto-generated catch block
            log.error(e);
        } catch (ZooKeeperConnectionException e) {
            // TODO Auto-generated catch block
            log.error(e);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e);
        }
    }
}



02-11 03:40