MapReduce是Google在2004年发表的论文《MapReduce: Simplified Data Processing on Large Clusters》中提出的一个用于分布式的用于大规模数据处理的编程模型。

原理

MapReduce将数据的处理分成了两个步骤,Map和Reduce。Map将输入的数据集拆分成一批KV对并输出,对于每一个<k1, v1>,Map将输出一批<k2, v2>;Reduce将Map对Map中产生的结果进行汇总,对于每一个<k2, list(v2)>list(v2)是所有key为k2的value),Reduce将输出结果<k3, v3>

以单词出现次数统计程序为例,map对文档中每个单词都输出<word, 1>,reduce则会统计每个单词对应的list的长度,输出<word, n>

map(String key, String value):
 // key: document name
 // value: document contents
 for each word w in value:
   EmitIntermediate(w, “1″);

reduce(String key, Iterator values):
 // key: a word
 // values: a list of counts
 int result = 0;
 for each v in values:
   result += ParseInt(v);
 Emit(AsString(result));

流程

MapReduce的流程如下:

  1. 将输入拆分成M个段,产生M个Map任务和R个Reduce任务。
  2. 创建1个master和n个worker,master会将Map和Reduce分派给worker执行。
  3. 被分配了Map任务的worker从输入中读取解析出KV对,传递给用户提供的Map函数,得到中间的一批KV对。
  4. 将中间的KV对使用分区函数分配到R个区域上,并保存到磁盘中,当Map任务执行完成后将保存的位置返回给master。
  5. Reduce worker根据master传递的参数从文件系统中读取数据,解析出KV对,并对具有相同key的value进行聚合,产生<k2, list(v2)>。如果无法在内存中进行排序,就需要使用外部排序。
  6. 对于每一个唯一的key,将<k2, list(v2)>传递给用户提供的Reduce函数,将函数的返回值追加到输出文件中。
  7. 当所有任务都完成后,MapReduce程序返回

MapReduce的整个流程并不复杂,就是将数据分片后提交给map执行,执行产生的中间结果经过处理后再交给reduce执行,产生最终结果。

容错

当worker发生故障时,可以通过心跳等方法进行检测,当检测到故障之后就可以将任务重新分派给其他worker重新执行。

当master发生故障时,可以通过检查点(checkpoint)的方法来进行恢复。然而由于master只有一个,比较难进行恢复,因此可以让用户检测并重新执行任务。

对于输出文件来说,需要保证仍在写入中的文件不被读取,即保证操作的原子性。可以通过文件系统重命名操作的原子性来实现,先将结果保存在临时文件中,当执行完成后再进行重命名。使用这种方法就可以将有副作用的write变为幂等(总是产生相同结果的运算,如a = 2就是幂等的,而a += 2则不是)的重命名。

落伍者

影响任务的总执行时间的重要因素就是落伍者:在运算中某个机器用了很长时间才完成了最后的几个任务,从而增加了总的执行时间。对于这种情况,可以在任务即将完成时,将剩余的任务交给备用者进程来执行,无论是最初的worker完成了任务还是备用者完成了,都可以将任务标记为完成。

分区函数

对于map产生的结果,通过分区函数来将相同key的KV对分配给同一个reduce来执行。默认的分区函数是hash(key) % R,但在某些情况下也可以选择其他分区函数。如key为URL时,希望相同主机的结果在同一个输出中,那么就可以用hash(hostname(key)) % R作为分区函数。

实现

实现部分是基于MIT 6.824的实验完成的。

type Coordinator struct {
	mapJobs      []Job
	reduceJobs   []Job
	status       int
	nMap         int
	remainMap    int
	nReduce      int
	remainReduce int
	lock         sync.Mutex
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}
	c.status = MAP
	c.nMap = len(files)
	c.remainMap = c.nMap
	c.nReduce = nReduce
	c.remainReduce = c.nReduce
	c.mapJobs = make([]Job, len(files))
	c.reduceJobs = make([]Job, nReduce)
	for idx, file := range files {
		c.mapJobs[idx] = Job{[]string{file}, WAITTING, idx}
	}
	for idx := range c.reduceJobs {
		c.reduceJobs[idx] = Job{[]string{}, WAITTING, idx}
	}
	c.server()
	return &c
}

func (c *Coordinator) timer(status *int) {
	time.Sleep(time.Second * 10)

	c.lock.Lock()
	if *status == RUNNING {
		log.Printf("timeout\n")
		*status = WAITTING
	}
	c.lock.Unlock()
}

func (c *Coordinator) AcquireJob(args *AcquireJobArgs, reply *AcquireJobReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	fmt.Printf("Acquire: %+v\n", args)
	if args.CommitJob.Index >= 0 {
		if args.Status == MAP {
			if c.mapJobs[args.CommitJob.Index].Status == RUNNING {
				c.mapJobs[args.CommitJob.Index].Status = FINISHED
				for idx, file := range args.CommitJob.Files {
					c.reduceJobs[idx].Files = append(c.reduceJobs[idx].Files, file)
				}
				c.remainMap--
			}
			if c.remainMap == 0 {
				c.status = REDUCE
			}
		} else {
			if c.reduceJobs[args.CommitJob.Index].Status == RUNNING {
				c.reduceJobs[args.CommitJob.Index].Status = FINISHED
				c.remainReduce--
			}
			if c.remainReduce == 0 {
				c.status = FINISH
			}
		}
	}
	if c.status == MAP {
		for idx := range c.mapJobs {
			if c.mapJobs[idx].Status == WAITTING {
				reply.NOther = c.nReduce
				reply.Status = MAP
				reply.Job = c.mapJobs[idx]
				c.mapJobs[idx].Status = RUNNING
				go c.timer(&c.mapJobs[idx].Status)
				return nil
			}
		}
		reply.NOther = c.nReduce
		reply.Status = MAP
		reply.Job = Job{Files: make([]string, 0), Index: -1}
	} else if c.status == REDUCE {
		for idx := range c.reduceJobs {
			if c.reduceJobs[idx].Status == WAITTING {
				reply.NOther = c.nMap
				reply.Status = REDUCE
				reply.Job = c.reduceJobs[idx]
				c.reduceJobs[idx].Status = RUNNING
				go c.timer(&c.reduceJobs[idx].Status)
				return nil
			}
		}
		reply.NOther = c.nMap
		reply.Status = REDUCE
		reply.Job = Job{Files: make([]string, 0), Index: -1}
	} else {
		reply.Status = FINISH
	}
	return nil
}

Coordinator中保存所有的任务信息以及执行状态,worker通过AcquireJob来提交和申请任务,要等待所有map任务完成后才能执行reduce任务。这里就简单的将每一个文件都作为一个任务。

func doMap(mapf func(string, string) []KeyValue, job *Job, nReduce int) (files []string) {
	outFiles := make([]*os.File, nReduce)
	for idx := range outFiles {
		outFile, err := ioutil.TempFile("./", "mr-tmp-*")
		if err != nil {
			log.Fatalf("create tmp file failed: %v", err)
		}
		defer outFile.Close()
		outFiles[idx] = outFile
	}
	for _, filename := range job.Files {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		content, err := ioutil.ReadAll(file)
		if err != nil {
			log.Fatalf("cannot read %v", filename)
		}
		file.Close()
		kva := mapf(filename, string(content))
		for _, kv := range kva {
			hash := ihash(kv.Key) % nReduce
			js, _ := json.Marshal(kv)
			outFiles[hash].Write(js)
			outFiles[hash].WriteString("\n")
		}
	}
	for idx := range outFiles {
		filename := fmt.Sprintf("mr-%d-%d", job.Index, idx)
		os.Rename(outFiles[idx].Name(), filename)
		files = append(files, filename)
	}
	return
}

func doReduce(reducef func(string, []string) string, job *Job, nMap int) {
	log.Printf("Start reduce %d", job.Index)
	outFile, err := ioutil.TempFile("./", "mr-out-tmp-*")
	defer outFile.Close()
	if err != nil {
		log.Fatalf("create tmp file failed: %v", err)
	}
	m := make(map[string][]string)
	for _, filename := range job.Files {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			kv := KeyValue{}
			if err := json.Unmarshal(scanner.Bytes(), &kv); err != nil {
				log.Fatalf("read kv failed: %v", err)
			}
			m[kv.Key] = append(m[kv.Key], kv.Value)
		}
		if err := scanner.Err(); err != nil {
			log.Fatal(err)
		}
		file.Close()
	}
	for key, value := range m {
		output := reducef(key, value)
		fmt.Fprintf(outFile, "%v %v\n", key, output)
	}
	os.Rename(outFile.Name(), fmt.Sprintf("mr-out-%d", job.Index))
	log.Printf("End reduce %d", job.Index)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	CallExample()
	var status int = MAP
	args := AcquireJobArgs{Job{Index: -1}, MAP}
	for {
		args.Status = status
		reply := AcquireJobReply{}
		call("Coordinator.AcquireJob", &args, &reply)
		fmt.Printf("AcReply: %+v\n", reply)
		if reply.Status == FINISH {
			break
		}
		status = reply.Status
		if reply.Job.Index >= 0 {
			// get a job, do it
			commitJob := reply.Job
			if status == MAP {
				commitJob.Files = doMap(mapf, &reply.Job, reply.NOther)
			} else {
				doReduce(reducef, &reply.Job, reply.NOther)
				commitJob.Files = make([]string, 0)
			}
			// job finished
			args = AcquireJobArgs{commitJob, status}
		} else {
			// no job, sleep to wait
			time.Sleep(time.Second)
			args = AcquireJobArgs{Job{Index: -1}, status}
		}
	}
}

worker通过RPC调用向Coordinator.AcquireJob申请和提交任务,之后根据任务类型执行doMapdoReduce

doMap函数读取目标文件并将<filename, content>传递给map函数,之后将返回值根据hash(key) % R写入到目标中间文件中去。

doReduce函数则从目标文件中读取KV对并加载到内存中,对相同的key进行合并(这里我是用map来做的,但是之后看论文发现是用排序来做的,这样可以保证在每个输出文件中的key是有序的)。合并之后就将<key, list(value)>交给reduce函数处理,最后把返回值写入到结果文件中去。

02-22 06:33