在Java中,我必须使用MapReduce将tsv文件(约21 * 10 ^ 6行)中的某些数据导入到HBase表中。
每行是:
XYZ | XZS YSY | SDS | XDA | JKX | SDS 0.XXXXXXXXX
HTable具有5个列族:A,B,C,D,E

文件的每一行的前几行是我的HBase行键。

第二组包括5个列的限定符为5:

  • YSY | SDS | XDA | JKX | SDS->用于列族A
  • YSY | SDS | XDA | JKX->对于列族B
  • YSY | SDS | XDA->用于列族C
  • 列族D的
  • YSY | SDS->
  • 列家族E
  • YSY->

  • 最后是要插入单元格内的值。
    我还必须使用相同的限定符(1或2或3或4或5)以总和Σ汇总所有值(这将是我的Reducer的一部分)。

    这是我的驱动程序:
    public class Driver {
    
        private static final String COLUMN_FAMILY_1 = "A";
        private static final String COLUMN_FAMILY_2 = "B";
        private static final String COLUMN_FAMILY_3 = "C";
        private static final String COLUMN_FAMILY_4 = "D";
        private static final String COLUMN_FAMILY_5 = "E";
        private static final String TABLENAME = "abe:data";
        private static final String DATA_SEPARATOR = "\t";
    
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = HBaseConfiguration.create();
    
            //Configuration Settings about hbase table
            configuration.set("hbase.table.name", TABLENAME);
            configuration.set("colomn_family_1", COLUMN_FAMILY_1);
            configuration.set("colomn_family_2", COLUMN_FAMILY_2);
            configuration.set("colomn_family_3", COLUMN_FAMILY_3);
            configuration.set("colomn_family_4", COLUMN_FAMILY_4);
            configuration.set("colomn_family_5", COLUMN_FAMILY_5);
            configuration.set("data_separator", DATA_SEPARATOR);
    
    
            if (args.length!= 2){
                System.out.println("Usage: ");
                System.out.println("-\t args[0] -> HDFS input path");
                System.err.println("-\r args[1] -> HDFS output path ");
                System.exit(1);
            }
    
            String inputPath = args[0];
            String outputPath = args[1];
            Path inputHdfsPath = new Path(inputPath);
            Path outputHdfsPath = new Path(outputPath);
    
            Job job = null;
    
            try {
                job = Job.getInstance(configuration);
            } catch (IOException e) {
                System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n");
                e.printStackTrace();
            }
    
            job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation.");
            job.setJarByClass(Driver.class);
    
            //MAPPER
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(MappingClass.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(FloatWritable.class);
    
            try {
    
                FileInputFormat.addInputPath(job, inputHdfsPath);
    
            } catch (IllegalArgumentException | IOException e) {
                System.out.println("Error setting inputPath in FileInputFormat");
                e.printStackTrace();
            }
    
            try {
    
                FileSystem.get(configuration).delete(outputHdfsPath, true);
    
            } catch (IllegalArgumentException | IOException e) {
                System.out.println("");
                e.printStackTrace();
            }
    
            //Setting output FileSystem.Path to save HFile to bulkImport
            FileOutputFormat.setOutputPath(job, outputHdfsPath);
            FileSystem hdfs;
    
    
            //Deleting output folder if exists
            try {
    
                hdfs = FileSystem.get(configuration);
                if(hdfs.exists(outputHdfsPath)){
                    hdfs.delete(outputHdfsPath, true); //Delete existing Directory
                }
    
            } catch (IllegalArgumentException | IOException e) {
                e.printStackTrace();
            }
    
    
            //Variables to access to HBase
            Connection hbCon = ConnectionFactory.createConnection(configuration);
            Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME));
            RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME));
            Admin admin = hbCon.getAdmin();
            HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);
    
            // Wait for HFiles creations
            boolean result =  job.waitForCompletion(true);
            LoadIncrementalHFiles loadFfiles = null;
    
            try {
                loadFfiles = new LoadIncrementalHFiles(configuration);
            } catch (Exception e) {
                System.out.println("Error configuring LoadIncrementalHFiles.");
                e.printStackTrace();
            }
    
            if (result){
                loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator);
                System.out.println("Bulk Import Completed.");
            }
            else {
                System.out.println("Error in completing job. No bulkimport.");
            }
    
        }
    
        }
    

    我的映射器是:
        public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{
            private String separator;
    
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                Configuration configuration = context.getConfiguration();
                separator = configuration.get("data_separator");
            }
    
            @Override
            public void map(LongWritable key,Text line,Context context){
    
                String[] values = line.toString().split(separator);
                String rowkey = values[0];
                String[] allQualifiers = values[1].split("\\|");
                String percentage = values[2];
                System.out.println(percentage);
    
                String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]);
                String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]);
                String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]);
                String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]);
                String toQ5 = new String(allQualifiers[0]);
    
    
                ImmutableBytesWritable ibw = new ImmutableBytesWritable();
                FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage));
    
                ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1)));
    
                try {
                    context.write(ibw, valueOut);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
    
    
                ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2)));
    
                try {
                    context.write(ibw, valueOut);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
    
    
                ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3)));
    
                try {
                    context.write(ibw, valueOut);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
    
    
                ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4)));
    
                try {
                    context.write(ibw, valueOut);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
    
    
                ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5)));
    
                try {
                    context.write(ibw, valueOut);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
    
        }
    

    这是我的 reducer :
     public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> {
            private String columnFamily_1;
            private String columnFamily_2;
            private String columnFamily_3;
            private String columnFamily_4;
            private String columnFamily_5;
            private float sum;
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                Configuration configuration = context.getConfiguration();
    
                columnFamily_1 = configuration.get("colomn_family_1");
                columnFamily_2 = configuration.get("colomn_family_2");
                columnFamily_3 = configuration.get("colomn_family_3");
                columnFamily_4 = configuration.get("colomn_family_4");
                columnFamily_5 = configuration.get("colomn_family_5");
            }
            @Override
            public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){
                String[] rk_cq = key.toString().split("_");
                String rowkey = rk_cq[0];
                String cq = rk_cq[1];
                String colFamily = this.getFamily(cq);
                sum = 0;
    
                for(FloatWritable fw : values)
                    sum += fw.get();
    
                ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes());
                KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());;
    
    
                try {
                    context.write(ibw, kv);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
    
            private String getFamily(String cq){
                String cf = new String();
    
                switch (cq.split("\\|").length) {
                case 1:
                    cf = columnFamily_1;
                    break;
    
                case 2:
                    cf = columnFamily_2;
                    break;
    
                case 3:
                    cf = columnFamily_3;
                    break;
    
                case 4:
                    cf = columnFamily_4;
                    break;
    
                case 5:
                    cf = columnFamily_5;
                    break;
    
                default:
                    break;
                }
    
                return cf;
            }
    
        }
    

    现在错误:



    谢谢你的帮助。

    最佳答案

    我修好了它。在驱动程序中,我忘记了:

    job.setReducerClass(ReducingClass.class);
    

    08-28 04:47