6.824分布式lab1准备工作


go race检测

例如一个全局变量,一个项目下来,可能完全不知道是不是引起了多个协程或者线程竞争。

demo:

func main() {
    a := 1
    go func() {
        a = 2
    }()
    a = 3
    fmt.Printf("a: %v\n", a)
    time.Sleep(time.Second * 3)
    // a: 3, 并没有发现或者提示go func修改了a的值
}

加了-race之后:

$ go run -race main.go
a: 3
==================
WARNING: DATA RACE
Write at 0x00c0000180a8 by goroutine 7:
  main.main.func1()
      /home/hqinglau/6.824/src/run/main.go:11 +0x30

Previous write at 0x00c0000180a8 by main goroutine:
  main.main()
      /home/hqinglau/6.824/src/run/main.go:13 +0xb8

Goroutine 7 (running) created at:
  main.main()
      /home/hqinglau/6.824/src/run/main.go:10 +0xae
==================
Found 1 data race(s)
exit status 66

以便在测试环境发现问题。

go Plugin

go 1.8出的一个插件功能。

demo:

package main

// pk必须是main

import "fmt"

// 插件模块加载的时候自动初始化
func init() {
    fmt.Println("hi")
}

// 调用的
func Hello() {
    fmt.Println("hello")
}

之后编译:

go build --buildmode=plugin testplugin.go

生成了.so文件:

image-20220525160135884

可以控制插件的版本:

go build -o testplugin_v1.so -buildmode=plugin testplugin.go

使用

package main

import (
    "plugin"
)

func main() {
    p, err := plugin.Open("testplugin.so")
    if err != nil {
        panic(err)
    }
    s, err2 := p.Lookup("Hello")
    if err2 != nil {
        panic(err2)
    }
    s.(func())()

    // $ go run main.go 
    // hi
    // hello
}

在执行函数之前,执行了init代码。

lab1准备工作

排序:自带串行化代码。首先wc.go生成wc.so文件,然后运行mrsequential.go

$ go build -race -buildmode=plugin ../mrapps/wc.go
$ go run -race mrsequential.go wc.so pg*.txt
$ more mr-out-0 
A 509
ABOUT 2
ACT 8
ACTRESS 1
ACTUAL 8
ADLER 1
ADVENTURE 12
ADVENTURES 7
AFTER 2
AGREE 16
AGREEMENT 8
AK 8
ALABAMA 1
AN 1
AND 46
ANY 24
ANYTHING 8
ARAT 1
ARTHUR 1
AS 10
ASCII 17
ASHPUTTEL 2
AT 2

wc.go

单词计数用的插件。

package main

//
// a word-count application "plugin" for MapReduce.
//
// go build -buildmode=plugin wc.go
//

import "6.824/mr"
import "unicode"
import "strings"
import "strconv"

// 每个文件调用一次map函数,第一个参数是文件名,第二个参数是全部内容。
// 文件名暂时用不到,忽略。返回值是键值对切片。
func Map(filename string, contents string) []mr.KeyValue {
    // function to detect word separators.
    ff := func(r rune) bool { return !unicode.IsLetter(r) }

    // split contents into an array of words.
    words := strings.FieldsFunc(contents, ff)

    kva := []mr.KeyValue{}
    for _, w := range words {
        kv := mr.KeyValue{w, "1"}
        kva = append(kva, kv)
    }
    return kva
}

//type KeyValue struct {
//    Key   string
//    Value string
//}

//
// map产生的每个键,调用一次reduce, values是这个键的所有值
func Reduce(key string, values []string) string {
    // return the number of occurrences of this word.
    return strconv.Itoa(len(values))
}

至此,只是两个函数,具体的调用过程要看mrsequential.go

mrsequential.go

package main

//
// simple sequential MapReduce.
//
// go run mrsequential.go wc.so pg*.txt
//

import "fmt"
import "6.824/mr"
import "plugin"
import "os"
import "log"
import "io/ioutil"
import "sort"

// for sorting by key.
type ByKey []mr.KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func main() {
    if len(os.Args) < 3 {
        fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
        os.Exit(1)
    }
    
    // 得到我们插件里定义的map和reduce函数,实现动态加载
    mapf, reducef := loadPlugin(os.Args[1])

    //
    // read each input file,
    // pass it to Map,
    // accumulate the intermediate Map output.
    //
    
    // 读取每一个文件,内容放入mapf里面,也就是map函数里面,得到【单词:1】
    // 这样的序列,都放入一个intermediate切片中
    intermediate := []mr.KeyValue{}
    for _, filename := range os.Args[2:] {
        file, err := os.Open(filename)
        if err != nil {
            log.Fatalf("cannot open %v", filename)
        }
        content, err := ioutil.ReadAll(file)
        if err != nil {
            log.Fatalf("cannot read %v", filename)
        }
        file.Close()
        kva := mapf(filename, string(content))
        intermediate = append(intermediate, kva...)
    }

    //
    // a big difference from real MapReduce is that all the
    // intermediate data is in one place, intermediate[],
    // rather than being partitioned into NxM buckets.
    //
    // 根据单词进行排序
    sort.Sort(ByKey(intermediate))

    oname := "mr-out-0"
    ofile, _ := os.Create(oname)

    //
    // call Reduce on each distinct key in intermediate[],
    // and print the result to mr-out-0.
    //
    i := 0
    for i < len(intermediate) {
        j := i + 1
        for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
            j++
        }
        // 排序后查找,遍历一遍,计数,都放入values里面了,其实这里
        // 实现的话都是1,所以reduce查看len就可以了
        values := []string{}
        for k := i; k < j; k++ {
            values = append(values, intermediate[k].Value)
        }
        // 把这些属于同一个单词的计数放入reduce函数,得到len。
        output := reducef(intermediate[i].Key, values)

        // 保存到文件
        // this is the correct format for each line of Reduce output.
        fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

        i = j
    }

    ofile.Close()
}

// 加载插件并验证函数的存在,返回函数类型
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
    p, err := plugin.Open(filename)
    if err != nil {
        log.Fatalf("cannot load plugin %v", filename)
    }
    xmapf, err := p.Lookup("Map")
    if err != nil {
        log.Fatalf("cannot find Map in %v", filename)
    }
    mapf := xmapf.(func(string, string) []mr.KeyValue)
    xreducef, err := p.Lookup("Reduce")
    if err != nil {
        log.Fatalf("cannot find Reduce in %v", filename)
    }
    reducef := xreducef.(func(string, []string) string)

    return mapf, reducef
}

按着代码理一遍还是比较清晰的。问题:

go run -race mrsequential.go wc.so pg*.txt

有个通配符啊,咋读的?

答:命令行的通配符机制,不需要做额外的处理,具体咋做看例子。

func main() {
    // 测试的话
    // $ go build main.go
    // $ ./main t*
    for _, v := range os.Args[0] {
        fmt.Println(v)
    }
    // 46
    // 47
    // 109
    // 97
    // 105
    // 110
    for _, v := range os.Args[0:] {
        fmt.Println(v)
    }
    // testplugin.go
    // testplugin.so
}

总结

OK,到此为止,计数就比较清晰了。目前是串行,下一步就是并行化。