本文介绍了如何阅读RCFile的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在一个小的RCFile(大约200行数据)中读入一个HashMap来做一个Map-Side连接,但是我在文件中的数据变为可用状态时遇到了很多麻烦。



以下是我迄今为止的内容,其中大部分内容已从:

  public void configure (JobConf作业)
{
尝试
{
FileSystem fs = FileSystem.get(job);
RCFile.Reader rcFileReader = new RCFile.Reader(fs,new Path(/ path / to / file),job);
int counter = 1;
while(rcFileReader.next(new LongWritable(counter)))
{
System.out.println(获取行的数据+ counter);
BytesRefArrayWritable dataRead = new BytesRefArrayWritable();
rcFileReader.getCurrentRow(dataRead);
System.out.println(dataRead:+ dataRead +dataRead.size():+ dataRead.size());
for(int i = 0; i< dataRead.size(); i ++)
{
BytesRefWritable bytesRefRead = dataRead.get(i);
byte b1 [] = bytesRefRead.getData();
Text returnData = new Text(b1);
System.out.println(READ-DATA =+ returnData.toString());
}
counter ++;

$ b $ catch(IOException e)
{
throw new Error(e);
}
}

但是,我得到的输出全部是每列中的数据在第一行中连接在一起,而在任何其他行中都没有数据。

 获取第1行的数据
dataRead:org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@7f26d3df dataRead.size():5
READ-DATA = 191606656066860670
READ-DATA =美国美属萨摩亚关岛北马里亚纳群岛
READ-DATA = USASGUMP
READ-DATA =南太平洋南太平洋南太平洋
READ-DATA = 19888
获取第2行的数据
dataRead:org.apache.hadoop .hive.serde2.columnar.BytesRefArrayWritable @ 1cb1a4e2 dataRead.size():0
获取第3行的数据
dataRead:org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@52c00025 dataRead.size ():0
获取第4行的数据
dataRead:org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@3b49a794 dataRead.size():0

如何正确读取这些数据,以便我可以一次访问一行,例如



(191,美国,美国,美国,19)

解决方案

经过一些更深入的挖掘,我找到了一个解决方案。这里的关键是不要使用 RCFile.Reader ,而是使用 RCFileRecordReader



下面是我最终的结果,也适用于打开多个文件:

  try 
{
FileSystem fs = FileSystem.get(job);
FileStatus [] fileStatuses = fs.listStatus(new Path(/ path / to / dir /));
LongWritable key = new LongWritable();
BytesRefArrayWritable value = new BytesRefArrayWritable();
int counter = 1;
for(int i = 0; i< fileStatuses.length; i ++)
{
FileStatus fileStatus = fileStatuses [i];
if(!fileStatus.isDir())
{
System.out.println(File:+ fileStatus);
FileSplit split = new FileSplit(fileStatus.getPath(),0,fileStatus.getLen(),job);
RCFileRecordReader reader = new RCFileRecordReader(job,split);
while(reader.next(key,value))
{
System.out.println(Getting row+ counter);
AllCountriesRow acr = AllCountriesRow.valueOf(value);
System.out.println(ROW:+ acr);
counter ++;



$ b catch(IOException e)
{
throw new Error(e);
}

和AllCountryiesRow.valueOf:
$ b $ (请注意, Column 是按列出现在每一行中的列的枚举,并且 serDe 是一个 ColumnarSerDe 的实例)
$ b $ pre $ public static AllCountriesRow valueOf(BytesRefArrayWritable braw)throws IOException
{
try
{
StructObjectInspector soi =(StructObjectInspector)serDe.getObjectInspector();
Object row = serDe.deserialize(braw);
列表< ;?扩展StructField> fieldRefs = soi.getAllStructFieldRefs();

Object fieldData = soi.getStructFieldData(row,fieldRefs.get(Column.ID.ordinal()));
ObjectInspector oi = fieldRefs.get(Column.ID.ordinal())。getFieldObjectInspector();
int id =((IntObjectInspector)oi).get(fieldData);

fieldData = soi.getStructFieldData(row,fieldRefs.get(Column.NAME.ordinal()));
oi = fieldRefs.get(Column.NAME.ordinal())。getFieldObjectInspector();
String name =((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);

fieldData = soi.getStructFieldData(row,fieldRefs.get(Column.CODE.ordinal()));
oi = fieldRefs.get(Column.CODE.ordinal())。getFieldObjectInspector();
String code =((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);

fieldData = soi.getStructFieldData(row,fieldRefs.get(Column.REGION_NAME.ordinal()));
oi = fieldRefs.get(Column.REGION_NAME.ordinal())。getFieldObjectInspector();
String regionName =((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);

fieldData = soi.getStructFieldData(row,fieldRefs.get(Column.CONTINENT_ID.ordinal()));
oi = fieldRefs.get(Column.CONTINENT_ID.ordinal())。getFieldObjectInspector();
int continentId =((IntObjectInspector)oi).get(fieldData);

返回新的AllCountriesRow(id,name,code,regionName,continentId);
}
catch(SerDeException e)
{
抛出new IOException(e);


$ / code $ / pre
$ b $ p这最终得到了一个AllCountriesRow对象,相关行的信息。


I am trying to read in a small RCFile (~200 rows of data) into a HashMap to do a Map-Side join, but I having a lot of trouble getting the data in the file into a usable state.

Here is what I have so far, most of which is lifted from this example:

    public void configure(JobConf job)                                                                                                   
    {   
        try
        {                                                                                                                                
            FileSystem fs = FileSystem.get(job);                                                                                         
            RCFile.Reader rcFileReader = new RCFile.Reader(fs, new Path("/path/to/file"), job);          
            int counter = 1;   
            while (rcFileReader.next(new LongWritable(counter)))
            {
                System.out.println("Fetching data for row " + counter);                                                  
                BytesRefArrayWritable dataRead = new BytesRefArrayWritable();                                                            
                rcFileReader.getCurrentRow(dataRead);                                                                                    
                System.out.println("dataRead: " + dataRead + " dataRead.size(): " + dataRead.size());
                for (int i = 0; i < dataRead.size(); i++)                                                                                
                {
                    BytesRefWritable bytesRefRead = dataRead.get(i);                               
                    byte b1[] = bytesRefRead.getData();                                                                                  
                    Text returnData = new Text(b1);
                    System.out.println("READ-DATA = " + returnData.toString());                                                          
                }                                                        
                counter++;
            } 
        }
        catch (IOException e)
        {             
            throw new Error(e);
        }             
    }   

However, the output that I am getting has all of the data in each column concatenated together in the first row and no data in any of the other rows.

Fetching data for row 1
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@7f26d3df dataRead.size(): 5
READ-DATA = 191606656066860670
READ-DATA = United StatesAmerican SamoaGuamNorthern Mariana Islands
READ-DATA = USASGUMP
READ-DATA = USSouth PacificSouth PacificSouth Pacific
READ-DATA = 19888
Fetching data for row 2
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@1cb1a4e2 dataRead.size(): 0
Fetching data for row 3
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@52c00025 dataRead.size(): 0
Fetching data for row 4
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@3b49a794 dataRead.size(): 0

How do I read in this data properly so that I have access to one row at a time e.g

(191, United States, US, US, 19)?

解决方案

After some more digging, I've found a solution. The key here is to not use RCFile.Reader but to use RCFileRecordReader.

Here is what I ended up with, adapted to open multiple files as well:

try                                                                                                                                     
{                                                                     
    FileSystem fs = FileSystem.get(job);                                                                                         
    FileStatus [] fileStatuses = fs.listStatus(new Path("/path/to/dir/"));                               
    LongWritable key = new LongWritable();                                                                                       
    BytesRefArrayWritable value = new BytesRefArrayWritable();                                                                   
    int counter = 1;                                                                                                             
    for (int i = 0; i < fileStatuses.length; i++)                                                                                
    {                                                                                                                            
        FileStatus fileStatus = fileStatuses[i];                                                                                 
        if (!fileStatus.isDir())                                                                                                 
        {                                                                                                                        
            System.out.println("File: " + fileStatus);                                                                           
            FileSplit split = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), job);                                  
            RCFileRecordReader reader = new RCFileRecordReader(job, split);                                                      
            while (reader.next(key, value))                                                                                      
            {                                                                                                                    
                System.out.println("Getting row " + counter);                                                                    
                AllCountriesRow acr = AllCountriesRow.valueOf(value);                                                            
                System.out.println("ROW: " + acr);                                                                                                                                                        
                counter++;                                                                                                       
            }                                                                                                                    
        }                                                                                                                        
    }                                                                                                                                                                                                                                                         
}                                                                                                                                
catch (IOException e)                                                                                                            
{                                                                                                                                
    throw new Error(e);                                                                                                          
}

And AllCountryiesRow.valueOf:

(note that Column is an enum of the columns in the order that they appear in each row and serDe is an instance of ColumnarSerDe)

public static AllCountriesRow valueOf(BytesRefArrayWritable braw) throws IOException                                                     
{   
    try                                                                                                                                  
    {
        StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector();                                                  
        Object row = serDe.deserialize(braw);                                                                                                                                                                                 
        List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();                                                                                                                                              

        Object fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.ID.ordinal()));                                                                  
        ObjectInspector oi = fieldRefs.get(Column.ID.ordinal()).getFieldObjectInspector();                                               
        int id = ((IntObjectInspector)oi).get(fieldData);                                                                                

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.NAME.ordinal()));                                                   
        oi = fieldRefs.get(Column.NAME.ordinal()).getFieldObjectInspector();                                                             
        String name = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);                                                     

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.CODE.ordinal()));                                                   
        oi = fieldRefs.get(Column.CODE.ordinal()).getFieldObjectInspector();
        String code = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);                                                     

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.REGION_NAME.ordinal()));                                            
        oi = fieldRefs.get(Column.REGION_NAME.ordinal()).getFieldObjectInspector();                                                      
        String regionName = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);                                               

        fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.CONTINENT_ID.ordinal()));                                           
        oi = fieldRefs.get(Column.CONTINENT_ID.ordinal()).getFieldObjectInspector();                                                     
        int continentId = ((IntObjectInspector)oi).get(fieldData);                                                                       

        return new AllCountriesRow(id, name, code, regionName, continentId);                                                             
    }               
    catch (SerDeException e)
    {               
        throw new IOException(e);                                                                                                        
    }                   
}                       

This ends up with an AllCountriesRow object that has all the information of the relevant row in it.

这篇关于如何阅读RCFile的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 08:20