文章目录
- MIT 6.824 分布式系统 lab1:MapReduce
- Notes
- wordcount's MapReduce Model look like
- a simple sequential mapreduce implementation(mrsequential.go)
MIT 6.824 分布式系统 lab1:MapReduce
文档 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
MapReduce论文 https://pdos.csail.mit.edu/6.824/schedule.html
知乎笔记 zhuanlan.zhihu.com/p/54243727
博客 https://www.cnblogs.com/haoweizh/p/10395016.html
Notes
单词计数模块:We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go. a simple sequential mapreduce implementation
[kou@python mrapps]$ pwd
/home/kou/MIT/6.824-golabs-2020/src/mrapps
[kou@python mrapps]$ ls
wc.go
wordcount’s MapReduce Model look like
[kou@python mrapps]$ cat wc.go
package main// a word-count application "plugin" for MapReduce.
//
// go build -buildmode=plugin wc.goimport "../mr"
import "unicode"
import "strings"
import "strconv"// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {// function to detect word separators.ff := func(r rune) bool { return !unicode.IsLetter(r) }// split contents into an array of words.words := strings.FieldsFunc(contents, ff)kva := []mr.KeyValue{}for _, w := range words {kv := mr.KeyValue{w, "1"}kva = append(kva, kv)}return kva
}// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.func Reduce(key string, values []string) string {// return the number of occurrences of this word.return strconv.Itoa(len(values))
}
a simple sequential mapreduce implementation(mrsequential.go)
mrsequential.go is a imple sequential MapReduce and leaves its output in the file mr-out-0. The input is from the text files named pg-xxx.txt.
[kou@python main]$ cat mrsequential.go
package main// simple sequential MapReduce.// go run mrsequential.go wc.so pg*.txtimport "fmt"
import "../mr"
import "plugin"
import "os"
import "log"
import "io/ioutil"
import "sort"// for sorting by key.
type ByKey []mr.KeyValue// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }func main() {if len(os.Args) < 3 {fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")os.Exit(1)}mapf, reducef := loadPlugin(os.Args[1])// os.Args[1] wc.so//// read each input file,// pass it to Map,// accumulate the intermediate Map output.//intermediate := []mr.KeyValue{}for _, filename := range os.Args[2:] {//os.Args[2:] pg*.txtfile, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)}file.Close()kva := mapf(filename, string(content))intermediate = append(intermediate, kva...)}//// a big difference from real MapReduce is that all the// intermediate data is in one place, intermediate[],// rather than being partitioned into NxM buckets.//sort.Sort(ByKey(intermediate))oname := "mr-out-0"ofile, _ := os.Create(oname)//// call Reduce on each distinct key in intermediate[],// and print the result to mr-out-0.//i := 0for i < len(intermediate) {j := i + 1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, intermediate[k].Value)}output := reducef(intermediate[i].Key, values)// this is the correct format for each line of Reduce output.fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)i = j}ofile.Close()
}//
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {p, err := plugin.Open(filename)if err != nil {log.Fatalf("cannot load plugin %v", filename)}xmapf, err := p.Lookup("Map")if err != nil {log.Fatalf("cannot find Map in %v", filename)}mapf := xmapf.(func(string, string) []mr.KeyValue)xreducef, err := p.Lookup("Reduce")if err != nil {log.Fatalf("cannot find Reduce in %v", filename)}reducef := xreducef.(func(string, []string) string)return mapf, reducef
}