一、 已知的问题和不足

在上一个版本中,实现了使用HBase的协处理器将HBase的二级索引同步到Solr中,但是仍旧有几个缺陷:

  1. 写入Solr的Collection是写死在代码里面,且是唯一的。如果我们有一张表的数据希望将不同的字段同步到Solr中该如何做呢?
  2. 目前所有配置相关信息都是写死到了代码中的,是否可以添加外部配置文件。
  3. 原来的方法是每次都需要编译新的Jar文件单独运行,能否将所有的同步使用一段通用的代码完成?

二、解决思路

针对上面的三个主要问题,我们一一解决

  1. 通常一张表会对应多个SolrCollection以及不同的Column。我们可以使用Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]这样的类型,根据表名获取所有的Collection和Column。
  2. 通过Typesafe Config读取外部配置文件,达到所有信息可配的目的。
  3. 所有的数据都只有Put和Delete,只要我们拦截到具体的消息之后判断当前的表名,然后根据问题一中的Collection和Column即可写入对应的SolrServer。在协处理器中获取表名的是e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()其中e是ObserverContext;

三、代码

3.1 读取config文件内容

使用typesafe的config组件读取morphlines.conf文件,将内容转换为 Map<String,List<HBaseIndexerMappin>>。具体代码如下

  1. public class ConfigManager {
  2. private static SourceConfig sourceConfig = new SourceConfig();
  3. public static Config config;
  4. static {
  5. sourceConfig.setConfigFiles("morphlines.conf");
  6. config = sourceConfig.getConfig();
  7. }
  8. public static Map<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){
  9. Map<String,List<HBaseIndexerMappin>> mappin = new HashMap<String, List<HBaseIndexerMappin>>();
  10. Config mappinConf = config.getConfig("Mappin");
  11. List<String> tables = mappinConf.getStringList("HBaseTables");
  12. for (String table :tables){
  13. List<Config> confList = (List<Config>) mappinConf.getConfigList(table);
  14. List<HBaseIndexerMappin> maps = new LinkedList<HBaseIndexerMappin>();
  15. for(Config tmp :confList){
  16. HBaseIndexerMappin map = new HBaseIndexerMappin();
  17. map.solrConnetion = tmp.getString("SolrCollection");
  18. map.columns = tmp.getStringList("Columns");
  19. maps.add(map);
  20. }
  21. mappin.put(table,maps);
  22. }
  23. return mappin;
  24. }
  25. }

3.2 封装SolrServer的获取方式

因为目前我使用的环境是Solr和HBase公用的同一套Zookeeper,因此我们完全可以借助HBase的Zookeeper信息。HBase的协处理器是运行在HBase的环境中的,自然可以通过HBase的Configuration获取当前的Zookeeper节点和端口,然后轻松的获取到Solr的地址。

  1. public class SolrServerManager implements LogManager {
  2. static Configuration conf = HBaseConfiguration.create();
  3. public static String ZKHost = conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2");
  4. public static String ZKPort = conf.get("hbase.zookeeper.property.clientPort","2181");
  5. public static String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr";
  6. public static int zkClientTimeout = 1800000;// 心跳
  7. public static int zkConnectTimeout = 1800000;// 连接时间
  8. public static CloudSolrServer create(String defaultCollection){
  9. log.info("Create SolrCloudeServer .This collection is " + defaultCollection);
  10. CloudSolrServer solrServer = new CloudSolrServer(SolrUrl);
  11. solrServer.setDefaultCollection(defaultCollection);
  12. solrServer.setZkClientTimeout(zkClientTimeout);
  13. solrServer.setZkConnectTimeout(zkConnectTimeout);
  14. return solrServer;
  15. }
  16. }

3.3 编写提交数据到Solr的代码

理想状态下,我们时时刻刻都需要提交数据到Solr中,但是事实上我们数据写入的时间是比较分散的,可能集中再每一天的某几个时间点。因此我们必须保证在高并发下能达到一定数据量自动提交,在低并发的情况下能隔一段时间写入一次。只有两种机制并存的情况下才能保证数据能即时写入。

  1. public class SolrCommitTimer extends TimerTask implements LogManager {
  2. public Map<String,List<SolrInputDocument>> putCache = new HashMap<String, List<SolrInputDocument>>();//Collection名字->更新(插入)操作缓存
  3. public Map<String,List<String>> deleteCache = new HashMap<String, List<String>>();//Collection名字->删除操作缓存
  4. Map<String,CloudSolrServer> solrServers = new HashMap<String, CloudSolrServer>();//Collection名字->SolrServers
  5. int maxCache = ConfigManager.config.getInt("MaxCommitSize");
  6. // 任何时候,保证只能有一个线程在提交索引,并清空集合
  7. final static Semaphore semp = new Semaphore(1);
  8. //添加Collection和SolrServer
  9. public void addCollecttion(String collection,CloudSolrServer server){
  10. this.solrServers.put(collection,server);
  11. }
  12. //往Solr添加(更新)数据
  13. public UpdateResponse put(CloudSolrServer server,SolrInputDocument doc) throws IOException, SolrServerException {
  14. server.add(doc);
  15. return server.commit(false, false);
  16. }
  17. //往Solr添加(更新)数据
  18. public UpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs) throws IOException, SolrServerException {
  19. server.add(docs);
  20. return server.commit(false, false);
  21. }
  22. //根据ID删除Solr数据
  23. public UpdateResponse delete(CloudSolrServer server,String rowkey) throws IOException, SolrServerException {
  24. server.deleteById(rowkey);
  25. return server.commit(false, false);
  26. }
  27. //根据ID删除Solr数据
  28. public UpdateResponse delete(CloudSolrServer server,List<String> rowkeys) throws IOException, SolrServerException {
  29. server.deleteById(rowkeys);
  30. return server.commit(false, false);
  31. }
  32. //将doc添加到缓存
  33. public void addPutDocToCache(String collection, SolrInputDocument doc) throws IOException, SolrServerException, InterruptedException {
  34. semp.acquire();
  35. log.debug("addPutDocToCache:" + "collection=" + collection + "data=" + doc.toString());
  36. if(!putCache.containsKey(collection)){
  37. List<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
  38. docs.add(doc);
  39. putCache.put(collection,docs);
  40. }else {
  41. List<SolrInputDocument> cache = putCache.get(collection);
  42. cache.add(doc);
  43. if (cache.size() >= maxCache) {
  44. try {
  45. this.put(solrServers.get(collection), cache);
  46. } finally {
  47. putCache.get(collection).clear();
  48. }
  49. }
  50. }
  51. semp.release();//释放信号量
  52. }
  53. //添加删除操作到缓存
  54. public void addDeleteIdCache(String collection,String rowkey) throws IOException, SolrServerException, InterruptedException {
  55. semp.acquire();
  56. log.debug("addDeleteIdCache:" + "collection=" + collection + "rowkey=" + rowkey);
  57. if(!deleteCache.containsKey(collection)){
  58. List<String> rowkeys = new LinkedList<String>();
  59. rowkeys.add(rowkey);
  60. deleteCache.put(collection,rowkeys);
  61. }else{
  62. List<String> cache = deleteCache.get(collection);
  63. cache.add(rowkey);
  64. if (cache.size() >= maxCache) {
  65. try{
  66. this.delete(solrServers.get(collection),cache);
  67. }finally {
  68. putCache.get(collection).clear();
  69. }
  70. }
  71. }
  72. semp.release();//释放信号量
  73. }
  74. @Override
  75. public void run() {
  76. try {
  77. semp.acquire();
  78. log.debug("开始插入....");
  79. Set<String> collections = solrServers.keySet();
  80. for(String collection:collections){
  81. if(putCache.containsKey(collection) && (!putCache.get(collection).isEmpty()) ){
  82. this.put(solrServers.get(collection),putCache.get(collection));
  83. putCache.get(collection).clear();
  84. }
  85. if(deleteCache.containsKey(collection) && (!deleteCache.get(collection).isEmpty())){
  86. this.delete(solrServers.get(collection),deleteCache.get(collection));
  87. deleteCache.get(collection).clear();
  88. }
  89. }
  90. } catch (InterruptedException e) {
  91. e.printStackTrace();
  92. } catch (Exception e) {
  93. log.error("Commit putCache to Solr error!Because :" + e.getMessage());
  94. }finally {
  95. semp.release();//释放信号量
  96. }
  97. }
  98. }

3.4 拦截HBase的Put和Delete操作信息

在每个prePut和preDelete中拦截操作信息,记录表名、列名、值。将这些信息根据表名和Collection名进行分类写入缓存。

  1. public class HBaseIndexerToSolrObserver extends BaseRegionObserver implements LogManager{
  2. Map<String,List<HBaseIndexerMappin>> mappins = ConfigManager.getHBaseIndexerMappin();
  3. Timer timer = new Timer();
  4. int maxCommitTime = ConfigManager.config.getInt("MaxCommitTime"); //最大提交时间,s
  5. SolrCommitTimer solrCommit = new SolrCommitTimer();
  6. public HBaseIndexerToSolrObserver(){
  7. log.info("Initialization HBaseIndexerToSolrObserver ...");
  8. for(Map.Entry<String,List<HBaseIndexerMappin>> entry : mappins.entrySet() ){
  9. List<HBaseIndexerMappin> solrmappin = entry.getValue();
  10. for(HBaseIndexerMappin map:solrmappin){
  11. String collection = map.solrConnetion;//获取Collection名字
  12. log.info("Create Solr Server connection .The collection is " + collection);
  13. CloudSolrServer solrserver = SolrServerManager.create(collection);//根据Collection初始化SolrServer连接
  14. solrCommit.addCollecttion(collection,solrserver);
  15. }
  16. }
  17. timer.schedule(solrCommit, 10 * 1000L, maxCommitTime * 1000L);
  18. }
  19. @Override
  20. public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
  21. Put put, WALEdit edit, Durability durability) throws IOException {
  22. String table = e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();//获取表名
  23. String rowkey= Bytes.toString(put.getRow());//获取主键
  24. SolrInputDocument doc = new SolrInputDocument();
  25. List<HBaseIndexerMappin> mappin = mappins.get(table);
  26. for(HBaseIndexerMappin mapp : mappin){
  27. for(String column : mapp.columns){
  28. String[] tmp = column.split(":");
  29. String cf = tmp[0];
  30. String cq = tmp[1];
  31. if(put.has(Bytes.toBytes(cf),Bytes.toBytes(cq))){
  32. Cell cell = put.get(Bytes.toBytes(cf),Bytes.toBytes(cq)).get(0);//获取制定列的数据
  33. Map<String, String > operation = new HashMap<String,String>();
  34. operation.put("set",Bytes.toString(CellUtil.cloneValue(cell)));
  35. doc.setField(cq,operation);//使用原子更新的方式将HBase二级索引写入Solr
  36. }
  37. }
  38. doc.addField("id",rowkey);
  39. try {
  40. solrCommit.addPutDocToCache(mapp.solrConnetion,doc);//添加doc到缓存
  41. } catch (SolrServerException e1) {
  42. e1.printStackTrace();
  43. } catch (InterruptedException e1) {
  44. e1.printStackTrace();
  45. }
  46. }
  47. }
  48. @Override
  49. public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
  50. Delete delete,
  51. WALEdit edit,
  52. Durability durability) throws IOException{
  53. String table = e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();
  54. String rowkey= Bytes.toString(delete.getRow());
  55. List<HBaseIndexerMappin> mappin = mappins.get(table);
  56. for(HBaseIndexerMappin mapp : mappin){
  57. try {
  58. solrCommit.addDeleteIdCache(mapp.solrConnetion,rowkey);//添加删除操作到缓存
  59. } catch (SolrServerException e1) {
  60. e1.printStackTrace();
  61. } catch (InterruptedException e1) {
  62. e1.printStackTrace();
  63. }
  64. }
  65. }
  66. }

四、 使用

首先需要添加morphlines.conf文件。里面包含了需要同步数据到Solr的HBase表名、对应的Solr Collection的名字、要同步的列、多久提交一次、最大批次容量的相关信息。具体配置如下:

  1. #最大提交时间(单位:秒)
  2. MaxCommitTime = 30
  3. #最大批次容量
  4. MaxCommitSize = 10000
  5. Mappin {
  6. HBaseTables: ["HBASE_OBSERVER_TEST"] #需要同步的HBase表名
  7. "HBASE_OBSERVER_TEST": [
  8. {
  9. SolrCollection: "bqjr" #Solr Collection名字
  10. Columns: [
  11. "cf1:test_age", #需要同步的列,格式<列族:列>
  12. "cf1:test_name"
  13. ]
  14. },
  15. ]
  16. }

该配置文件默认放在各个节点的/etc/hbase/conf/下。如果你希望将配置文件路径修改为其他路径,请修改com.bqjr.bigdata.HBaseObserver.comm.config.SourceConfig类中的configHome路径。

然后将代码打包,上传到HDFS中,将协处理器添加到对应的表中。

  1. #先禁用这张表
  2. disable 'HBASE_OBSERVER_TEST'
  3. #为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)
  4. alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://hostname:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'
  5. #启用这张表
  6. enable 'HBASE_OBSERVER_TEST'
  7. #删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同
  8. alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'

如果需要新增一张表同步到Solr。只需要修改morphlines.conf文件,分发倒各个节点。然后将协处理器添加到HBase表中,这样就不用再次修改代码了。

05-11 00:17