使用JAVA API 解析ORC File
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.*;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public void readOrcFile(String fileName) throws SerDeException, IOException {
JobConf conf = new JobConf(hadoopConf);
Path orcFilePath = new Path(fileName);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
Properties p = new Properties();
p.setProperty("columns", "url,word,freq,weight");
p.setProperty("columns.types", "string:string:string:string");
OrcSerde serde = new OrcSerde();
serde.initialize(conf, p);
StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
OrcInputFormat in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, orcFilePath);
InputSplit[] splits = in.getSplits(conf, 1);
System.out.println("splits.length==" + splits.length);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
Long count = 0 L;
while (reader.next(key, value)) {
count ++;
}
reader.close();
}
org.apache.hadoop.mapred.InputFormat接口的getSplits方法定义如下:
InputSplit[] getSplits(JobConf job,
int numSplits)
throws IOException
其中numSplits参数的含义时期望得到分片数, 如上的例子中,期望输入文件的分片为1个,如果ORC文件有多个分片则会被合并成一个分片。但是hdfs的中设置的一个分片最大为256M,所以合并成1个分片就会少300-256=44M的数据,造成了上面的问题。
如果 numSplits 参数的值设置为小于0的负数,则会按照ORC File的正常的 stripe个数生成split。
InputSplit[] splits = in.getSplits(conf, -1) 得到的 splits 个数是6个,6个splits中记录数是预期
中的180条。
(二)使用 org.apache.hadoop.hive.ql.io.orc.Reader 类读取ORC文件
可以通过reader.getSchema(); // 获取ORC文件的schema文件。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.*;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public void readOrc(String INPUT) throws IOException {
Configuration conf = new Configuration();
Path file_in = new Path(INPUT);
Reader reader = OrcFile.createReader(FileSystem.getLocal(conf), file_in);
TypeDescription schema = reader.getSchema(); // 获取ORC文件的schema文件
System.out.println(schema.toJson());
System.out.println(schema.toString());
System.out.println("--------------------------------");
StructObjectInspector inspector = (StructObjectInspector) reader.getObjectInspector();
RecordReader records = reader.rows();
Object row = null;
Long count = 0L;
while (records.hasNext()) {
row = records.next(row);
// System.out.println(row.toString());
count++;
List value_lst = inspector.getStructFieldsDataAsList(row);
}
System.out.println("--------total line=" + count);
}