考虑以下日志文件格式:
id v1 v2 v3
1 15 30 25
2 10 10 20
3 50 30 30
我们将使用dumbo计算Hadoop集群上每个数据行的平均值频率(AVF)。具有m个属性的数据点的AVF定义为:
avf = (1/m)* sum (frequencies of attributes 1..m)
因此对于第一行,avf =(1/3)*(1 + 2 + 1)〜= 1.33。异常值由较低的AVF标识。
编程问题
我们有以下伪/ python代码:
H = {} # stores attribute frequencies
map1(_, datapoint): #
for attr in datapoint.attrs:
yield (attr, 1)
reduce1(attr, values):
H[attr] = sum(values)
map2(_, datapoint):
sum = 0
m = len(datapoint.attrs)
for attr in datapoint.attrs:
sum += H[attr]
yield (1/m)*sum, datapoint
reduce2(avf, datapoints): # identity reducer, only sorts datapoints on avf
yield avf, datapoints
问题是,如何将我们的数据点集插入
map1
和map2
中,以及如何在map2中使用中间哈希H
。像上面那样全局定义H
似乎违反了MapReduce概念。 最佳答案
据我了解,第一步是计算直方图:
[attr, value] => frequency
其中
frequency
是value
列中attr
发生的次数。下一步是获取直方图表和原始数据,为每一行计算AVF并将其排序。
我将分两步进行:一次是通过map-reduce步来计算直方图,第二次是m-r步以使用直方图来找到AVF。我还将使用单个恒定的无散列无哈希值,因为将直方图值和单元格值设置为相同位置将是一团糟。 (例如,让map1发出以
[attr val id]
作为键的[attr val]
;并让reduce1累积每个键的所有记录,对它们进行计数,然后发出[id attr val count]
。第二遍使用id
作为键来重新组合,然后平均每一行)。要计算直方图,可以将中间步骤视为“组”而不是“排序”。这是这样的:由于reduce输入是按键排序的,因此要让它累积给定键的所有记录,并且一旦看到另一个键,就发出计数。悟空,相当于dumbo的 ruby ,具有一个
Accumulator
,我认为dumbo也是如此。 (有关工作代码,请参见下文)。这让你
attr1 val1a frequency
attr1 val1b frequency
attr2 val2a frequency
...
attrN attrNz frequency
在下一个过程中,我将数据加载到哈希表中(如果内存中可以容纳一个简单的
Hash
(dictionary
),否则将其加载一个快速键值存储),然后像您所拥有的那样计算每条记录的AVF。这是用于计算avf的有效ruby代码;见http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rb
首过
module AverageValueFrequency
# Names for each column's attribute, in order
ATTR_NAMES = %w[length width height]
class HistogramMapper < Wukong::Streamer::RecordStreamer
def process id, *values
ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
end
end
#
# For an accumulator, you define a key that is used to group records
#
# The Accumulator calls #start! on the first record for that group,
# then calls #accumulate on all records (including the first).
# Finally, it calls #finalize to emit a result for the group.
#
class HistogramReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :count
# use the attr and val as the key
def get_key attr, val, *_
[attr, val]
end
# start the sum with 0 for each key
def start! *_
self.count = 0
end
# ... and count the number of records for this key
def accumulate *_
self.count += 1
end
# emit [attr, val, count]
def finalize
yield [key, count].flatten
end
end
end
Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run
第二关
module AverageValueFrequency
class AvfRecordMapper < Wukong::Streamer::RecordStreamer
# average the frequency of each value
def process id, *values
sum = 0.0
ATTR_NAMES.zip(values).each do |attr, val|
sum += histogram[ [attr, val] ].to_i
end
avf = sum / ATTR_NAMES.length.to_f
yield [id, avf, *values]
end
# Load the histogram from a tab-separated file with
# attr val freq
def histogram
return @histogram if @histogram
@histogram = { }
File.open(options[:histogram_file]).each do |line|
attr, val, freq = line.chomp.split("\t")
@histogram[ [attr, val] ] = freq
end
@histogram
end
end
end