我试图将一个小的RCFile(约200行数据)读入HashMap中以进行Map-Side连接,但是我很难将文件中的数据转换为可用状态。

这是我到目前为止的内容,其中大部分是从this example提取的:

    public void configure(JobConf job)
    {
        try
        {
            FileSystem fs = FileSystem.get(job);
            RCFile.Reader rcFileReader = new RCFile.Reader(fs, new Path("/path/to/file"), job);
            int counter = 1;
            while (rcFileReader.next(new LongWritable(counter)))
            {
                System.out.println("Fetching data for row " + counter);
                BytesRefArrayWritable dataRead = new BytesRefArrayWritable();
                rcFileReader.getCurrentRow(dataRead);
                System.out.println("dataRead: " + dataRead + " dataRead.size(): " + dataRead.size());
                for (int i = 0; i < dataRead.size(); i++)
                {
                    BytesRefWritable bytesRefRead = dataRead.get(i);
                    byte b1[] = bytesRefRead.getData();
                    Text returnData = new Text(b1);
                    System.out.println("READ-DATA = " + returnData.toString());
                }
                counter++;
            }
        }
        catch (IOException e)
        {
            throw new Error(e);
        }
    }

但是,我得到的输出在第一行中将每列中的所有数据连接在一起,而在其他任何行中都没有数据。
Fetching data for row 1
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@7f26d3df dataRead.size(): 5
READ-DATA = 191606656066860670
READ-DATA = United StatesAmerican SamoaGuamNorthern Mariana Islands
READ-DATA = USASGUMP
READ-DATA = USSouth PacificSouth PacificSouth Pacific
READ-DATA = 19888
Fetching data for row 2
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@1cb1a4e2 dataRead.size(): 0
Fetching data for row 3
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@52c00025 dataRead.size(): 0
Fetching data for row 4
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@3b49a794 dataRead.size(): 0

如何正确读取此数据,以便一次访问一行
(191, United States, US, US, 19)

最佳答案

经过更多的挖掘之后,我找到了解决方案。此处的关键是不使用RCFile.Reader而是使用RCFileRecordReader

这是我最终得到的结果,它也适合于打开多个文件:

try
{
    FileSystem fs = FileSystem.get(job);
    FileStatus [] fileStatuses = fs.listStatus(new Path("/path/to/dir/"));
    LongWritable key = new LongWritable();
    BytesRefArrayWritable value = new BytesRefArrayWritable();
    int counter = 1;
    for (int i = 0; i < fileStatuses.length; i++)
    {
        FileStatus fileStatus = fileStatuses[i];
        if (!fileStatus.isDir())
        {
            System.out.println("File: " + fileStatus);
            FileSplit split = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), job);
            RCFileRecordReader reader = new RCFileRecordReader(job, split);
            while (reader.next(key, value))
            {
                System.out.println("Getting row " + counter);
                AllCountriesRow acr = AllCountriesRow.valueOf(value);
                System.out.println("ROW: " + acr);
                counter++;
            }
        }
    }
}
catch (IOException e)
{
    throw new Error(e);
}

和AllCountryiesRow.valueOf:

(请注意Column是列的枚举,按它们在每一行中出现的顺序,而serDeColumnarSerDe的实例)
public static AllCountriesRow valueOf(BytesRefArrayWritable braw) throws IOException
{
    try
    {
        StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector();
        Object row = serDe.deserialize(braw);
        List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();

        Object fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.ID.ordinal()));
        ObjectInspector oi = fieldRefs.get(Column.ID.ordinal()).getFieldObjectInspector();
        int id = ((IntObjectInspector)oi).get(fieldData);

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.NAME.ordinal()));
        oi = fieldRefs.get(Column.NAME.ordinal()).getFieldObjectInspector();
        String name = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.CODE.ordinal()));
        oi = fieldRefs.get(Column.CODE.ordinal()).getFieldObjectInspector();
        String code = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.REGION_NAME.ordinal()));
        oi = fieldRefs.get(Column.REGION_NAME.ordinal()).getFieldObjectInspector();
        String regionName = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.CONTINENT_ID.ordinal()));
        oi = fieldRefs.get(Column.CONTINENT_ID.ordinal()).getFieldObjectInspector();
        int continentId = ((IntObjectInspector)oi).get(fieldData);

        return new AllCountriesRow(id, name, code, regionName, continentId);
    }
    catch (SerDeException e)
    {
        throw new IOException(e);
    }
}

最后以AllCountriesRow对象结束,该对象中包含相关行的所有信息。

关于java - 如何读取RCFile,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/25416114/

10-11 06:31