Lecture 02 Infrastructure: RPC & threads

一、多线程挑战

  • 共享数据: 使用互斥信号量、或者避免共享
  • 线程间协作: 使用channels 或者 waitgroup 来等待所有map线程结束
  • 并发粒度:
    • 粗粒度: 简单,但是并发性不高
    • 细粒度: 更多的并发,但是处理复杂,可能会有更多的冲突和死锁

以下这段代码就能说明并发的粒度问题:

    constructTaskArgs := func(phase jobPhase, task int) DoTaskArgs {
        debug("task: %d\n", task)
        var taskArgs DoTaskArgs
        taskArgs.Phase = phase
        taskArgs.JobName = jobName
        taskArgs.NumOtherPhase = n_other
        taskArgs.TaskNumber = task
        if phase == mapPhase {
            taskArgs.File = mapFiles[task]
        }
        return taskArgs
    }

    tasks := make(chan int) // act as task queue
    go func() {
        for i := 0; i < ntasks; i++ {
            tasks <- i
        }
    }()
    successTasks := 0
    success := make(chan int)

loop:
    for {
        select {
        case task := <-tasks:
            go func() {
                worker := <-registerChan
                status := call(worker, "Worker.DoTask", constructTaskArgs(phase, task), nil)
                if status {
                    success <- 1
                    go func() { registerChan <- worker }()
                } else {
                    tasks <- task
                }
            }()
        case <-success:
            successTasks += 1
        default:
            if successTasks == ntasks {
                break loop
            }
        }
    }

里面不仅使用了task的channel, 还使用了success (channel) 来控制 successTask 的共享。

二、爬虫并发的问题

网络是一个有环的图,但是我们设计爬虫需要避免环。

  • 一方面是因为重复遍历url,没有任何意义
  • 另一方面只访问一次url可以减轻目标服务器负担

单线程爬虫:

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
    if fetched[url] {
        return
    }
    fetched[url] = true
    urls, err := fetcher.Fetch(url)
    if err != nil {
        return
    }
    for _, u := range urls {
        Serial(u, fetcher, fetched)
    }
    return
}

2.1 并发互斥爬虫

因此需要维护一张visited表来记录是否遍历过url,这里就会出现并发问题。

当T1 检查visited[url] , T2也检查visited[url] 两个线程都会认为没有访问过该url,这时候就会发生冲突,发生WW(write + write) 。解决办法是,维护一个Mutex 互斥信号量来访问visited这张表。

  • 判断线程结束

使用sync.WaitGroup来保证线程执行完成

type fetchState struct {
    mu      sync.Mutex
    fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
    f.mu.Lock()
    if f.fetched[url] {
        f.mu.Unlock()
        return
    }
    f.fetched[url] = true
    f.mu.Unlock()

    urls, err := fetcher.Fetch(url)
    if err != nil {
        return
    }
    var done sync.WaitGroup
    for _, u := range urls {
        done.Add(1)
        go func(u string) {
            defer done.Done()
            ConcurrentMutex(u, fetcher, f)
        }(u)
    }
    done.Wait()
    return
}

func makeState() *fetchState {
    f := &fetchState{}
    f.fetched = make(map[string]bool)
    return f
}

2.2 并发通道爬虫

master启动worker去爬取url, worker将url送到同一个通道里面, master从通道获取url去爬取内容

共享的数据:

  • 通道
  • 发送到 通道的 slices 和 字符串
  • 从master发送到worker的参数
//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
    urls, err := fetcher.Fetch(url)
    if err != nil {
        ch <- []string{}
    } else {
        ch <- urls
    }
}

func master(ch chan []string, fetcher Fetcher) {
    n := 1
    fetched := make(map[string]bool)
    for urls := range ch {
        for _, u := range urls {
            if fetched[u] == false {
                fetched[u] = true
                n += 1
                go worker(u, ch, fetcher)
            }
        }
        n -= 1
        if n == 0 {
            break
        }
    }
}

func ConcurrentChannel(url string, fetcher Fetcher) {
    ch := make(chan []string)
    go func() {
        ch <- []string{url}
    }()
    master(ch, fetcher)
}

三、什么时候使用共享空间和锁 vs 通道

state -- 共享空间和锁

communication -- 通道

waiting for events -- 通道

使用go 的 race dector

四、Remote Procedure Call(RPC)

4.1 软件架构:

客户端 handlers

stubs dispatcher(调度器)

rpc lib rpc lib


网络 ----- 网络

4.2 rpc过程:

  • 首先双方定义发送的参数, 和返回的结构体
  • 客户端 Dial()创建tcp连接请求 call() 调用rpc库来执行远程调用
  • 服务器 声明一个带返回方法的对象 作为rpc处理器, 然后使用rpc库的Register函数来注册服务, rpc库:
    • 读取每一个请求
    • 为每一个请求创建一个goroutine
    • 反序列化请求
    • 调用目标函数
    • 序列化返回值
    • 将返回值通过tcp连接返回

4.3rpc 示例

源码

client:

//
// Client
//

func connect() *rpc.Client {
    client, err := rpc.Dial("tcp", ":1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }
    return client
}

func get(key string) string {
    client := connect()
    args := GetArgs{"subject"}
    reply := GetReply{}
    err := client.Call("KV.Get", &args, &reply)
    if err != nil {
        log.Fatal("error:", err)
    }
    client.Close()
    return reply.Value
}

func put(key string, val string) {
    client := connect()
    args := PutArgs{"subject", "6.824"}
    reply := PutReply{}
    err := client.Call("KV.Put", &args, &reply)
    if err != nil {
        log.Fatal("error:", err)
    }
    client.Close()
}

server

//
// Server
//

type KV struct {
    mu   sync.Mutex
    data map[string]string
}

func server() {
    kv := new(KV)
    kv.data = map[string]string{}
    rpcs := rpc.NewServer()
    rpcs.Register(kv)
    l, e := net.Listen("tcp", ":1234")
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go func() {
        for {
            conn, err := l.Accept()
            if err == nil {
                go rpcs.ServeConn(conn)
            } else {
                break
            }
        }
        l.Close()
    }()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
    kv.mu.Lock()
    defer kv.mu.Unlock()

    val, ok := kv.data[args.Key]
    if ok {
        reply.Err = OK
        reply.Value = val
    } else {
        reply.Err = ErrNoKey
        reply.Value = ""
    }
    return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
    kv.mu.Lock()
    defer kv.mu.Unlock()

    kv.data[args.Key] = args.Value
    reply.Err = OK
    return nil
}

4.3 rpc怎么处理失败

问题:

  • 网络延迟
  • 丢包
  • 服务器慢或者崩溃

处理办法:

  • best effort:
    • client调用call( ) 等待响应, 如果过了一会没收到响应那就再发送一个call( )
    • 这个过程重复几次,然后放弃并且返回一个错误
  • at most once:
    • 针对服务端说的:当服务端收到相同的请求时
      • 根据xid(client id 判断)如果收到相同请求 返回之前的处理结果
      • xid 怎么保证唯一性
  • exactly once:
    • 无限重试
    • 冗余检查
    • 容错服务
12-16 23:35
查看更多