本文介绍了操作hadoop中的行数据以添加缺失的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有来自存储在hdfs中的IIS的日志文件,但由于web服务器配置,一些日志没有所有列或它们以不同的顺序出现。我希望生成具有公共模式的文件,以便我可以为它们定义一个Hive表。



示例良好日志:

  #Fields:date time s-ip cs-method cs -uri-stem useragent 
2013-07-16 00:00:00 10.1.15.8 GET / common / viewFile / 1232 Mozilla / 5.0 + Chrome / 27.0.1453.116

缺少列的日志示例(cs-方法和useragent缺失):

  #Fields:date time s-ip cs -uri-stem 
2013-07 -16 00:00:00 10.1.15.8 / common / viewFile / 1232

缺少列的日志需要映射到如下所示的完整模式:

  #Fields:date time s-ip cs-method cs-uri- stem useragent 
2013-07-16 00:00:00 10.1.15.8 null / common / viewFile / 1232 null

不良日志可以启用不同顺序的任何列组合。

如何映射可用列到完整模式accor ding到日志文件中的Fields行?

编辑:
通常我会通过将列架构定义为索引的字典映射列名来处理此问题。即:col ['date'] = 0 col ['time'] = 1等等然后我会从文件中读取#Fields行并解析出启用的列并生成头文件字典映射头文件名到文件中的列索引。然后,对于剩余的数据行,我通过索引知道它的标题,通过header =列名将其映射到我的列模式,然后按正确的顺序生成新行,并插入空数据的缺失列。我的问题是我不明白如何在hadoop中执行此操作,因为每个地图都是单独执行的,因此如何与每个地图共享#Fields信息?

您可以使用将标题应用于创建地图的列。从那里你可以使用一个UDF:

myudf.py



<$ p $
$输出格式('newM:map []')
def完整映射(M):
if M是无:
返回无
to_add = ['A','D','F']
to_add中的项目:
如果项目不在M中:
M [item] = None
return M

@outputSchema('A:chararray,B:chararray,C:chararray,D:chararray,E:chararray,F:chararray' )
def completemap_v2(M):
如果M是None:
return(None,
None,
None,
None,
无,
无)
返回(M.get('A',None),
M.get('B',None),
M.get('C ',None),
M.get('D',None),
M.get('E',None),
M.get('F',None))

要添加

示例输入:

  csv1。 in csv2.in 
------- ---------
A | B | CD | E | F
您好|此|请填写|工作| FOO
FOO | BAR | BING或|一切|将
BANG | BOSH BE | FOR | NAUGHT

示例脚本:

  A = LOAD'tests / csv'使用myudfs.ExampleCSVLoader('\\\ \\')AS(M:map []); 
B = FOREACH A GENERATE FLATTEN(myudf.completemap_v2(M));

输出:

  B:{null :: A:chararray,null :: B:chararray,null :: C:chararray,null :: D:chararray,null :: E:chararray,null :: F:chararray} 
(,,,,,)
(,,, PLEASE,WORK,FOO)
(,,,或,一切,将)
(,,, BE,FOR ,NAUGHT)
(,,,,,)
(你好,这是,,,)
(FOO,BAR,BING ,,,)
(BANG,BOSH ,,,,)


I have log files from IIS stored in hdfs, but due to webserver configuration some of the logs do not have all the columns or they appear in different order. I want to generate files that have a common schema so I can define a Hive table over them.

Example good log:

#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 GET /common/viewFile/1232 Mozilla/5.0+Chrome/27.0.1453.116

Example log with missing columns (cs-method and useragent missing):

#Fields: date time s-ip cs-uri-stem 
2013-07-16 00:00:00 10.1.15.8 /common/viewFile/1232

The log with missing columns needs to be mapped to the full schema like this:

#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 null /common/viewFile/1232 null

The bad logs can have any combination of columns enabled and in different order.

How can I map the available columns to the full schema according to the Fields row within the log file?

Edit:Normally I would approach this by defining my column schema as a dict mapping column name to index. ie: col['date']=0 col['time']=1 etc. Then I would read the #Fields row from the file and parse out the enabled columns and generate header dict mapping header name to column index in the file. Then for remaining rows of data I know its header by index, map that to my column schema by header=column name and generate new row in correct order inserting missing columns with null data. My issue is I do not understand how to do this within hadoop since each map executes alone and therefore how can I share the #Fields information with each map?

解决方案

You can use this to apply the header to the columns creating a map. From there you can use a UDF like:

myudf.py

#!/usr/bin/python

@outputSchema('newM:map[]')
def completemap(M):
    if M is None:
        return None
    to_add = ['A', 'D', 'F']
    for item in to_add:
        if item not in M:
            M[item] = None
    return M

@outputSchema('A:chararray, B:chararray, C:chararray, D:chararray, E:chararray, F:chararray')
def completemap_v2(M):
    if M is None:
        return (None,
                None,
                None,
                None,
                None,
                None)
    return (M.get('A', None),
            M.get('B', None),
            M.get('C', None),
            M.get('D', None),
            M.get('E', None),
            M.get('F', None))

To add in the missing tuples to the map.

Sample Input:

csv1.in             csv2.in
-------            ---------
A|B|C               D|E|F
Hello|This|is       PLEASE|WORK|FOO
FOO|BAR|BING        OR|EVERYTHING|WILL
BANG|BOSH           BE|FOR|NAUGHT

Sample Script:

A = LOAD 'tests/csv' USING myudfs.ExampleCSVLoader('\\|') AS (M:map[]); 
B = FOREACH A GENERATE FLATTEN(myudf.completemap_v2(M));

Output:

B: {null::A: chararray,null::B: chararray,null::C: chararray,null::D: chararray,null::E: chararray,null::F: chararray}
(,,,,,)
(,,,PLEASE,WORK,FOO)
(,,,OR,EVERYTHING,WILL)
(,,,BE,FOR,NAUGHT)
(,,,,,)
(Hello,This,is,,,)
(FOO,BAR,BING,,,)
(BANG,BOSH,,,,)

这篇关于操作hadoop中的行数据以添加缺失的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 22:27