一.前言

1.1 为什么需要Singleflight?

一般情况下我们在写一写对外的服务的时候都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。

1.2 使用场景

如下图所示,可能存在来自桌面端和移动端的用户有 1000 的并发请求,他们都访问的获取文章列表的接口,获取前 20 条信息,如果这个时候我们服务直接去访问 redis 出现 cache miss 那么我们就会去请求 1000 次数据库,这时可能会给数据库带来较大的压力(这里的 1000 只是一个例子,实际上可能远大于这个值)导致我们的服务异常或者超时。

十一. Go并发编程--singleflight-LMLPHP

这时候就可以使用singleflight 库了,直译过来就是 单飞.

这个库的主要作用就是: 将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。 如下图所示

十一. Go并发编程--singleflight-LMLPHP

二. slingleFligh 库(使用教程)

2.1 函数签名

主要是一个Group结构体, 结构体包含三个方法,相见代码注释

type Group
    // Do 执行函数, 对同一个 key 多次调用的时候,在第一次调用没有执行完的时候
	// 只会执行一次 fn 其他的调用会阻塞住等待这次调用返回
	// v, err 是传入的 fn 的返回值
	// shared 表示是否真正执行了 fn 返回的结果,还是返回的共享的结果
    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

	// DoChan 和 Do 类似,只是 DoChan 返回一个 channel,也就是同步与异步的区别
	func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

    // Forget 用于通知 Group 删除某个 key 这样后面继续这个 key 的调用的时候就不会在阻塞等待了
	func (g *Group) Forget(key string)

2.2 使用示例

先使用一个普通的例子,这时一个获取文章详情的函数,我们在函数里面使用一个 count 模拟不同并发下的耗时的不同,并发越多请求耗时越多。

func getArticle(id int) (article string, err error) {
	// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
	// 使用原子操作保证并发安全
	atomic.AddInt32(&count, 1)
	time.Sleep(time.Duration(count) * time.Millisecond)

	return fmt.Sprintf("article: %d", id), nil
}

使用 singlefight 的时候就只需要 new(singleflight.Group) 然后调用以下相对应的Do方法

func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {

	v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
		return getArticle(id)
	})

	return v.(string), err
}

3. 测试

写一个简单的测试代码,模拟启动1000个goroutine 去并发调用这两个方法

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"golang.org/x/sync/singleflight"
)

var count int32

func getArticle(id int) (article string, err error) {
	// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
	atomic.AddInt32(&count, 1)
	time.Sleep(time.Duration(count) * time.Millisecond)

	return fmt.Sprintf("article: %d", id), nil
}

func singleFlightGetArticle(sg *singleflight.Group, id int) (string, error) {
	v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
		return getArticle(id)
	})

	return v.(string), err
}

func main() {
	// 使用一个定时器,
	time.AfterFunc(1*time.Second, func() {
		atomic.AddInt32(&count, -count)
	})

	var (
		wg  sync.WaitGroup
		now = time.Now()
		n   = 1000
		sg  = &singleflight.Group{}
	)

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			res, _ := singleFlightGetArticle(sg, 1)
			// res, _ := getArticle(1)
			if res != "article: 1" {
				panic("err")
			}
			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}

调用 getArticle 方法的耗时,花费了 1s 多

go run demo.go
同时发起 1000 次请求,耗时: 1.0157721s

切换到singleflight方法测试发现花费20ms

go run demo.go
同时发起 1000 次请求,耗时: 21.1962ms

测试发现,使用singleflight这种方式的确在这种场景下能减少执行执行,提高效率。

3.实现原理

这是非内置库,仓库golang.org/x/sync/singleflight 源码解析如下

Group
是一个结构体

type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

Group 结构体由一个互斥锁和一个 map 组成,可以看到注释 map 是懒加载的,所以 Group 只要声明就可以使用,不用进行额外的初始化零值就可以直接使用。call 保存了当前调用对应的信息,map 的键就是我们调用 Do 方法传入的 key

call 也是一个结构体,结构体如下

type call struct {
	wg sync.WaitGroup

	// 函数的返回值,在 wg 返回前只会写入一次
	val interface{}
	err error

	// 使用调用了 Forgot 方法
	forgotten bool

    // 统计调用次数以及返回的 channel
	dups  int
	chans []chan<- Result
}

Do
Do方法用来执行传入函数

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()

    // 前面提到的懒加载
    if g.m == nil {
		g.m = make(map[string]*call)
	}

    // 会先去看 key 是否已经存在
	if c, ok := g.m[key]; ok {
       	// 如果存在就会解锁
		c.dups++
		g.mu.Unlock()

        // 然后等待 WaitGroup 执行完毕,只要一执行完,所有的 wait 都会被唤醒
		c.wg.Wait()

        // 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}

    // 如果我们没有找到这个 key 就 new call
	c := new(call)

    // 然后调用 waitgroup 这里只有第一次调用会 add 1,其他的都会调用 wait 阻塞掉
    // 所以这要这次调用返回,所有阻塞的调用都会被唤醒
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

    // 然后我们调用 doCall 去执行
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

doCall
这个方法种有个技巧值得学习,使用了两个 defer 巧妙的将 runtime 的错误和我们传入 function 的 panic 区别开来避免了由于传入的 function panic 导致的死锁。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

    // 第一个 defer 检查 runtime 错误
	defer func() {
        ...
	}()

    // 使用一个匿名函数来执行
	func() {
		defer func() {
			if !normalReturn {
                // 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误
                // 后面在上层重新 panic
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()

		c.val, c.err = fn()

        // 如果 fn 没有 panic 就会执行到这一步,如果 panic 了就不会执行到这一步
        // 所以可以通过这个变量来判断是否 panic 了
		normalReturn = true
	}()

    // 如果 normalReturn 为 false 就表示,我们的 fn panic 了
    // 如果执行到了这一步,也说明我们的 fn  recover 住了,不是直接 runtime exit
	if !normalReturn {
		recovered = true
	}
}

第一个defer函数如下

defer func() {
	// 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了
	if !normalReturn && !recovered {
		c.err = errGoexit
	}

	c.wg.Done()
	g.mu.Lock()
	defer g.mu.Unlock()

     // 如果已经 forgot 过了,就不要重复删除这个 key 了
	if !c.forgotten {
		delete(g.m, key)
	}

	if e, ok := c.err.(*panicError); ok {
		// 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复
		if len(c.chans) > 0 {
			go panic(e)
			select {} // Keep this goroutine around so that it will appear in the crash dump.
		} else {
			panic(e)
		}
	} else if c.err == errGoexit {
		// 已经准备退出了,也就不用做其他操作了
	} else {
		// 正常情况下向 channel 写入数据
		for _, ch := range c.chans {
			ch <- Result{c.val, c.err, c.dups > 0}
		}
	}
}()

DoChan

Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上就是,如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch
}

Forget
forget 用于手动释放某个 key 下次调用就不会阻塞等待了

func (g *Group) Forget(key string) {
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

三.注意事项

3.1 一个阻塞,全员等待

使用 singleflight 我们比较常见的是直接使用 Do 方法,但是这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住

还是之前的例子, 加入一个 select 模拟阻塞

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"golang.org/x/sync/singleflight"
)

var count2 int32

func getArticle2(id int) (article string, err error) {
	// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
	atomic.AddInt32(&count2, 1)
	time.Sleep(time.Duration(count2) * time.Millisecond)

	return fmt.Sprintf("article: %d", id), nil
}

func singleFlightGetArticle2(sg *singleflight.Group, id int) (string, error) {
	v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
		// 模拟阻塞
		select {}
		return getArticle2(id)
	})

	return v.(string), err
}

func main() {
	// 使用一个定时器,
	time.AfterFunc(1*time.Second, func() {
		atomic.AddInt32(&count2, -count2)
	})

	var (
		wg  sync.WaitGroup
		now = time.Now()
		n   = 1000
		sg  = &singleflight.Group{}
	)

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			res, _ := singleFlightGetArticle2(sg, 1)
			if res != "article: 1" {
				panic("err")
			}
			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}

执行就会发现死锁

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:

这时候DoChan就能派上用场了,结合 select 做超时控制

func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) {
	result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
		// 模拟出现问题,hang 住
		select {}
		return getArticle(id)
	})

	select {
	case r := <-result:
		return r.Val.(string), r.Err
	case <-ctx.Done():
		return "", ctx.Err()
	}
}

调用的时候传入一个含 超时的 context 即可

func main() {
	// 使用一个定时器,
	time.AfterFunc(1*time.Second, func() {
		atomic.AddInt32(&count2, -count2)
	})

	var (
		wg     sync.WaitGroup
		now    = time.Now()
		n      = 1000
		sg     = &singleflight.Group{}
		// 超时控制
		ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
	)

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			res, _ := singleFlightGetArticle2(ctx, sg, 1)
			if res != "article: 1" {
				panic("err")
			}
			wg.Done()
		}()

	}

	wg.Wait()
	fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}

执行时就会返回超时错误.

❯ go run demo2.go
panic: context deadline exceeded

3.2 一个出错,全部出错

本身不是什么问题,因为singleflight就是这么设计的。 但是实际使用的时候,可能并不是想要这样的效果。 比如 如果一次调用要1s. 我们的数据库请求或者下游服务服务可以支撑10rps(即每秒可以支持10次的请求)的请求的时候会导致我们的错误阈值提高。 因为我们可以1s尝试10次,但是用了singleflight之后只能尝试一次。只要出错这段时间内的所有请求都会收到影响。 那这种情况该如何解决呢?

我们可以启动一个Goroutine定时forget一下,相当于将rps 从1rps提高到10rps

go func() {
       time.Sleep(100 * time.Millisecond)
       // logging
       g.Forget(key)
   }()

四. 使用场景

如开头场景所讲, singleflige 可以有效解决在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。

十一. Go并发编程--singleflight-LMLPHP

使用singleflight能有效解决这个问题,限制对同一个键值对的多次重复请求,减少对下游的瞬时流量

十一. Go并发编程--singleflight-LMLPHP

通过一段代码可以查看如何解决这种缓存击穿的问题


type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    // request.Hash就是传入的建,请求的哈希在业务上一般表示相同的请求,所以上述代码使用它作为请求的键
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    return Response{
        rows: rows,
    }, nil
}

五. 总结

golang/sync/singleflight.Group.Dogolang/sync/singleflight.Group.DoChan 分别提供了同步和异步的调用方式,这让我们使用起来也更加灵活。

当我们需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group 来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:

  • golang/sync/singleflight.Group.Dogolang/sync/singleflight.Group.DoChan 一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 Channel 接收函数的返回值;
  • golang/sync/singleflight.Group.Forget 可以通知 golang/sync/singleflight.Group 在持有的映射表中删除某个键,接下来对该键的调用就不会等待前面的函数返回了;
  • 一旦调用的函数返回了错误,所有在等待的 Goroutine 也都会接收到同样的错误;

六.参考

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#singleflight
  2. https://lailin.xyz/post/go-training-week5-singleflight.html
  3. https://pkg.go.dev/golang.org/x/sync/singleflight
11-28 10:13