在Java中,我必须使用MapReduce将tsv文件(约21 * 10 ^ 6行)中的某些数据导入到HBase表中。
每行是:
XYZ | XZS YSY | SDS | XDA | JKX | SDS 0.XXXXXXXXX
HTable具有5个列族:A,B,C,D,E
文件的每一行的前几行是我的HBase行键。
第二组包括5个列的限定符为5:
最后是要插入单元格内的值。
我还必须使用相同的限定符(1或2或3或4或5)以总和Σ汇总所有值(这将是我的Reducer的一部分)。
这是我的驱动程序:
public class Driver {
private static final String COLUMN_FAMILY_1 = "A";
private static final String COLUMN_FAMILY_2 = "B";
private static final String COLUMN_FAMILY_3 = "C";
private static final String COLUMN_FAMILY_4 = "D";
private static final String COLUMN_FAMILY_5 = "E";
private static final String TABLENAME = "abe:data";
private static final String DATA_SEPARATOR = "\t";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HBaseConfiguration.create();
//Configuration Settings about hbase table
configuration.set("hbase.table.name", TABLENAME);
configuration.set("colomn_family_1", COLUMN_FAMILY_1);
configuration.set("colomn_family_2", COLUMN_FAMILY_2);
configuration.set("colomn_family_3", COLUMN_FAMILY_3);
configuration.set("colomn_family_4", COLUMN_FAMILY_4);
configuration.set("colomn_family_5", COLUMN_FAMILY_5);
configuration.set("data_separator", DATA_SEPARATOR);
if (args.length!= 2){
System.out.println("Usage: ");
System.out.println("-\t args[0] -> HDFS input path");
System.err.println("-\r args[1] -> HDFS output path ");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
Path inputHdfsPath = new Path(inputPath);
Path outputHdfsPath = new Path(outputPath);
Job job = null;
try {
job = Job.getInstance(configuration);
} catch (IOException e) {
System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n");
e.printStackTrace();
}
job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation.");
job.setJarByClass(Driver.class);
//MAPPER
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MappingClass.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
try {
FileInputFormat.addInputPath(job, inputHdfsPath);
} catch (IllegalArgumentException | IOException e) {
System.out.println("Error setting inputPath in FileInputFormat");
e.printStackTrace();
}
try {
FileSystem.get(configuration).delete(outputHdfsPath, true);
} catch (IllegalArgumentException | IOException e) {
System.out.println("");
e.printStackTrace();
}
//Setting output FileSystem.Path to save HFile to bulkImport
FileOutputFormat.setOutputPath(job, outputHdfsPath);
FileSystem hdfs;
//Deleting output folder if exists
try {
hdfs = FileSystem.get(configuration);
if(hdfs.exists(outputHdfsPath)){
hdfs.delete(outputHdfsPath, true); //Delete existing Directory
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
//Variables to access to HBase
Connection hbCon = ConnectionFactory.createConnection(configuration);
Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME));
RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME));
Admin admin = hbCon.getAdmin();
HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);
// Wait for HFiles creations
boolean result = job.waitForCompletion(true);
LoadIncrementalHFiles loadFfiles = null;
try {
loadFfiles = new LoadIncrementalHFiles(configuration);
} catch (Exception e) {
System.out.println("Error configuring LoadIncrementalHFiles.");
e.printStackTrace();
}
if (result){
loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator);
System.out.println("Bulk Import Completed.");
}
else {
System.out.println("Error in completing job. No bulkimport.");
}
}
}
我的映射器是:
public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{
private String separator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
separator = configuration.get("data_separator");
}
@Override
public void map(LongWritable key,Text line,Context context){
String[] values = line.toString().split(separator);
String rowkey = values[0];
String[] allQualifiers = values[1].split("\\|");
String percentage = values[2];
System.out.println(percentage);
String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]);
String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]);
String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]);
String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]);
String toQ5 = new String(allQualifiers[0]);
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage));
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
这是我的 reducer :
public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> {
private String columnFamily_1;
private String columnFamily_2;
private String columnFamily_3;
private String columnFamily_4;
private String columnFamily_5;
private float sum;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
columnFamily_1 = configuration.get("colomn_family_1");
columnFamily_2 = configuration.get("colomn_family_2");
columnFamily_3 = configuration.get("colomn_family_3");
columnFamily_4 = configuration.get("colomn_family_4");
columnFamily_5 = configuration.get("colomn_family_5");
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){
String[] rk_cq = key.toString().split("_");
String rowkey = rk_cq[0];
String cq = rk_cq[1];
String colFamily = this.getFamily(cq);
sum = 0;
for(FloatWritable fw : values)
sum += fw.get();
ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes());
KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());;
try {
context.write(ibw, kv);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private String getFamily(String cq){
String cf = new String();
switch (cq.split("\\|").length) {
case 1:
cf = columnFamily_1;
break;
case 2:
cf = columnFamily_2;
break;
case 3:
cf = columnFamily_3;
break;
case 4:
cf = columnFamily_4;
break;
case 5:
cf = columnFamily_5;
break;
default:
break;
}
return cf;
}
}
现在错误:
谢谢你的帮助。
最佳答案
我修好了它。在驱动程序中,我忘记了:
job.setReducerClass(ReducingClass.class);