简单了解

官方帮助文档

http://hbase.apache.org/book.html#cp

协处理器出现的原因

HBase作为列族数据库经常被人诟病的就是无法轻易建立“二级索引”难执行求和、计数、排序等操作。在0.92版本以前的HBase虽然在数据存储层集成了MapReduce,能够有效用于数据表的分布式计算,然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。所以HBase在0.92之后引入了协处理器(coprocessors),添加了一些新的特性:能够轻易建立二次索引、复杂过了长期以及访问控制等

协处理器的分类

Observer

对数据进行前置或者后置的拦截操作

Endpoint

主要实现数据的一些统计功能,例如 COUNT,SUM,GROUP BY 等等

Phoenix

其实就是使用了大量的协处理器来实现的


协处理器的使用

加载方式

协处理器的加载方式有两种,静态加载方式(Static Load)动态加载方式(Dynamic Load)。 静态加载的协处理器称之为 System Coprocessor,动态加载的协处理器称之为 Table Coprocessor

静态加载

通过修改hbase-site.xml,然后重启hbase实现,这是对所有表都有效

cd /export/servers/hbase-1.2.0-cdh5.14.0/conf
vim hbase-site.xml
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
动态加载

动态加载可以对指定表生效

首先在禁用指定表

disable 'mytable'

然后添加aggregation

alter 'mytable', METHOD => 'table_att','coprocessor'=>
'|org.apache.Hadoop.hbase.coprocessor.AggregateImplementation||'

重启指定表

enable 'mytable'
协处理器的卸载

禁用表

disable 'test'

卸载协处理器

alter 'test',METHOD => 'table_att_unset',NAME => 'coprocessor$1'

启用表

enable 'test'

协处理器Observer应用实战

需求

通过协处理器Observer实现hbase当中一张表插入数据,然后通过协处理器,将数据复制一份保存到另外一张表当中去,但是只取当第一张表当中的部分列数据保存到第二张表当中去

步骤
一、HBase当中创建第一张表proc1和第二张表proc2
create 'proc1','info'
create 'proc2','info'
二、开发HBase的协处理器
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import java.io.IOException; public class ObserverProcessor extends BaseRegionObserver { @Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
//连接到HBase
Configuration configuration = HBaseConfiguration.create();
//设置连接配置
configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
//
Connection connection = ConnectionFactory.createConnection(configuration);
Cell nameCell = put.get("info".getBytes(), "name".getBytes()).get(0);
Put put1 = new Put(put.getRow());
put1.add(nameCell);
Table reverseuser = connection.getTable(TableName.valueOf("proc2"));
reverseuser.put(put1);
reverseuser.close();
}
}
三、将java打成Jar包,上传到HDFS

先将jar包上传到linux的/export/servers路径下,然后执行以下命令

mv original-day12_HBaseANDMapReduce-1.0-SNAPSHOT.jar  processor.jar
hdfs dfs -mkdir -p /processor
hdfs dfs -put processor.jar /processor
四、将jar包挂载到proc1表

hbase shell执行以下命令

describe 'proc1'

alter 'proc1',METHOD => 'table_att','Coprocessor'=>'hdfs://node01:8020/processor/processor.jar|cn.itcast.mr.demo7.ObserverProcessor|1001|'

describe 'proc1'

【HBase】协处理器是什么?又能干什么?怎么用?-LMLPHP

五、用JavaAPI想proc1表中添加数据
/**
*
*/
@Test
public void testPut() throws Exception{
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01,node02");
Connection connection = ConnectionFactory.createConnection(configuration);
Table user5 = connection.getTable(TableName.valueOf("proc1"));
Put put1 = new Put(Bytes.toBytes("hello_world"));
put1.addColumn(Bytes.toBytes("info"),"name".getBytes(),"helloworld".getBytes());
put1.addColumn(Bytes.toBytes("info"),"gender".getBytes(),"abc".getBytes());
put1.addColumn(Bytes.toBytes("info"),"nationality".getBytes(),"test".getBytes());
user5.put(put1);
byte[] row = put1.getRow();
System.out.println(Bytes.toString(row));
user5.close();
}
六、查看proc1和proc2表的数据

【HBase】协处理器是什么?又能干什么?怎么用?-LMLPHP

七、如果要卸载协处理器
disable 'proc1'
alter 'proc1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'proc1'
05-16 23:14