本文节选不保证论文的完整性和理解的准确性
原始的MapReduce。分Map,Shuffle,Reduce。
Map里包含shards。
Shuffle理解为groupByKey的事情。Reduce里包含Combiner,能够定义Sharder来控制key怎么和Reducer worker相应起来。
核心抽象和基本原语
PCollection<T>是一个不可变的bag。能够是有序的(Sequence),也能够是无序的(Collection)。PCollection能够来自于内存里的Java PCollection对象。也能够读取自文件。
PTable<K, V>,能够看成PCollection<Pair<K, V>>,不可变无序multi-map。
第一个原语是parallelDo()。把PCollection<T>变成新的PCollection<S>,处理方式定义在DoFn<T, S>里。emitFn是call-back,传给用户的process(…),使用emitFn.emit(outElem)发射出去。parallelDo()能够在map或reduce中使用,DoFn不应该使用闭包外全局的变量,(inline function)纯操作自己的inputs。
第二个原语是groupByKey()。把PTable<K, V>转变成PTable<K,Collection<V>>,
第三个原语是combineValues(),接收input为PTable<K,Collection<V>>和一个V的符合结合律的方法,返回PTable<K, V>。
第四个原语是flatten(),接收一个PCollection<T>的list,返回一个PCollection<T>
衍生原语(Derived Operations)
count(),接收PCollection<T>,返回PTable<T, Integer>
实现方式为parallelDo()。groupByKey()和combineValues()
join(),接收PTable<K, V1>,PTable<K, V2>。返回PTable<K,Tuple2<Collection<V1>, Collection<V2>>
实现方式为,第一步,使用parallelDo()把每一个input PTable<K, Vi>变成通用的PTable<K, TaggedUnion2<V1,V2>>。第二步使用flattern来combine tables;第三步。使用groupByKey()作用于被扁平过了tables。产生PTable<K,Collection<TaggedUnion2<V1, V2>>>
top(),接收比較函数和N,
实现方式为parallelDo(),groupByKey()和combineValues()
延迟分析(Deffered Evaluation)
PCollection对象有两种状态,defferred或materialized。
FlumeJava.run()真正触发execution plan的物化/运行。
PObjects
PObject<T>用于存储Java对象,物化过了之后能够使用getValue()方法获得PObject的值。有点像Future。
operate()方法
优化器
parallelDoFusion(融合)
Producer-Consumer and Sibling Fusion,例如以下图
一些中间结果也能够不要。
MapShuffleCombineReduce(MSCR) Operation
FlumeJava优化器的核心在于把ParallelDo。GroupByKey,CombineValues和Flattern的组合转换成一个个单个的MapReduce。
MSCR是一个中间层的操作,有M个input channels(每一个能够进行map操作)。有R个Reduce channels(每一个能够进行shuffle,或combine,或reduce操作)。
单个input channal m,接收PCollection<T>作为输入,运行R路output输出的ParallelDo “map”操作。产生R个PTable<K, V> outputs。每一个output channel r flatterns它的M个inputs,然后
a) 进行一次GroupByKey的“shuffle”,或CombineValues的“combine”。或O-output的ParallelDo “reduce”,然后把结果写出到O-output PCollections
b) 把inputs直接写出为outputs
前者这种output channel称为”Grouping” channel,后者称为”pass-through” channel。”pass-through” channel同意map的output成为一个MSCR操作的输出。
Ø 同意多个reducers和combiners。
Ø 同意每一个reducer产生多个outputs;
Ø 消除了每一个reducer必须以同样的key为input来产出output的约束;
Ø 同意pass-through形式的outputs。
所以MSCR是优化器里非常好的一个中间操作目标。
MSCR Fusion
MSCR操作产生于一些相关的GroupByKey操作集合。相关的GroupByKey操作是指产生于同样的input(如Flattern操作),或被同一个parallelDo操作制造出来的input。
这部分比較晦涩难懂啊。可是是理解核心
优化要达到的效果是最后的运行计划里包含尽可能少的又高效的MSCR操作。
1. Sink Flatterns。把扁平操作下沉,如h(f(a)+f(b))=> h(f(a))+h(f(b)),即分配律。然后又能和parallelDo的融合特性结合起来,如(hof)(a)+(hog)(b)
2. Lift CombineValues。假设CombineValues紧跟着GroupByKey操作。
3. Insert fusion blocks。假设俩GroupByKey操作是由生产者-消费者的ParallelDo chain连起来的。ParallelDo要在GroupByKey里做上调和下移。
4. Fuse ParallelDos。
5. Fuse MSCRs。
针对这几个策略的实施。后面举了个样例而且描绘了详细的运行图。非常帮助理解
优化器没有分析用户写的方法,比方估算input和output数据量大小。
也没有改动用户的代码来做优化。
须要做一些分析避免运算的反复,及去除不必要或不合理的groupByKey。
Executor
优化运行结束后。
它现在支持batch该模式提交作业。
在操作。FlumeJava做到人性化发展、debug,创建自己主动删除文件。自己主动识别数据并行调整操作量,改变操作模式(remote)而这样的事情。
掌声 :)