第一次写一个HBase mapreduce,我在删除HBase中的行时遇到了问题(试图将它作为一个纯地图作业运行)。作业成功并且能够扫描HBase表,并且我能够从HBase中读取映射器中的正确rowkeys(通过sysout进行验证)。但是,对于 Delete del = new Delete(row.get())
的调用似乎并没有实际做任何事情。 下面是我试图运行的代码:
<$ p $ ($ {
public static void main(String [] args)throws Exception {
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,log_table);
布尔型b = job.waitForCompletion(true);
HBaseDeleteMapper。 java
public class HBaseDeleteMapper extends TableMapper< ImmutableBytesWritable,Delete> {
public void map ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
Delete delete = new Delete(row.get());
您正在写入上下文,而不是写入表格,您的映射器应该看起来有点类似于这个: p>
public class HBaseDeleteMapper extends TableMapper< ImmutableBytesWritable,NullWritable> {
private HTable myTable;
$ b $保护无效设置(上下文上下文)抛出IOException,InterruptedException {
/ *可删除的HTable实例* /
myTable = new HTable(HBaseConfiguration.create(),myTable .getBytes());
$ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
myTable.delete(new Delete(row.get())); / *删除表中的行* /
//context.write (row,NullWritable.get()); / *如果你需要某些东西(如果你不需要它),只是一个带有删除行的输出* /
保护无效清理(上下文上下文)抛出IOException,InterruptedException {
myTable.close(); / *关闭表* /
public class HBaseDeleteMapper extends TableMapper< NullWritable,NullWritable> {
private HTable myTable;
私人清单<删除> deleteList = new ArrayList< Delete>();
final private int buffer = 10000; / *缓冲区大小,根据需要调整* /
保护无效设置(上下文上下文)抛出IOException,InterruptedException {
/ *可删除的HTable实例* /
myTable = new HTable(HBaseConfiguration.create(),myTable.getBytes());
$ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
deleteList.add(new Delete(row.get())); / *将删除添加到批处理* /
if(deleteList.size()== buffer){
myTable.delete(deleteList); / *提交批* /
deleteList.clear(); / *清除批* /
保护无效清理(上下文上下文)抛出IOException,InterruptedException {
if(deleteList.size()> 0 ){
myTable.delete(deleteList); / *提交剩余的批次* /
myTable.close(); / *关闭表* /
First time writing a HBase mapreduce and I'm having trouble deleting rows in HBase (trying to run it as a map-only job). The job succeeds and is able to scan the HBase table and I'm able to get the correct rowkeys in the mapper read from HBase (verified through sysout). However, it seems like the call to Delete del = new Delete(row.get())
isn't actually doing anything.
Below is the code I'm trying to run:
public class HBaseDelete {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "log_table");
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
Delete delete = new Delete(row.get());
context.write(row, delete);
Is there something missing to 'commit' the deletion?
You're writing to the context, not to the table, your mapper should look somewhat similar to this one:
public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{
private HTable myTable;
protected void setup(Context context) throws IOException, InterruptedException {
/* HTable instance for deletes */
myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
myTable.delete(new Delete(row.get())); /* Delete the row from the table */
//context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
protected void cleanup(Context context) throws IOException, InterruptedException {
myTable.close(); /* Close table */
Please notice that delete operations don't use the write buffer, this code will issue 1 RPC operation per delete, which is not good for this type of jobs. To address that you can build your own List<Delete>
to batch them:
public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{
private HTable myTable;
private List<Delete> deleteList = new ArrayList<Delete>();
final private int buffer = 10000; /* Buffer size, tune it as desired */
protected void setup(Context context) throws IOException, InterruptedException {
/* HTable instance for deletes */
myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
deleteList.add(new Delete(row.get())); /* Add delete to the batch */
if (deleteList.size()==buffer) {
myTable.delete(deleteList); /* Submit batch */
deleteList.clear(); /* Clear batch */
protected void cleanup(Context context) throws IOException, InterruptedException {
if (deleteList.size()>0) {
myTable.delete(deleteList); /* Submit remaining batch */
myTable.close(); /* Close table */