简介

groupcache是一个分布式kv缓存的library,能够实现对数据的set、get,而不能进行update和delete操作。groupcache采用的是P2P的架构,所有的节点都是同构的。当客户端程序查找某个值时,groupcache先在本地的cache中进行查找,如果不存在,则通过一致性哈希寻找该key-value所在的peer的地址,如果都不存在则从数据源(数据库)拉取。cache使用LRU策略淘汰最近不常用的数据。需要注意的是,如果很多请求都访问cache中不存在的值时,会导致groupcache向数据源请求数据,这时候需要阻塞请求,保证只有一个请求执行。
需要注意的是groupcache只是一个分布式的kv缓存库,我们需要将自己的规则填入,capotej 实现了一个了一个demo,下面将自顶向下分析整个库的源码。

Demo

客户端与groupcache之间通过RPC通信,而peer之间通过http进行数据的传输,而数据源通过延时模拟一个数据库的存取过程。

架构

Command line client 与前端之间通过RPC进行通信,而groupcache服务器之间通过http进行数据的传输,而DB Server作为数据源,当所有的cache中都不存在的时候,向数据源获取结果
groupcache源码解析-LMLPHP

var stringcache = groupcache.NewGroup("SlowDBCache", 64<<20, groupcache.GetterFunc(
		func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
			result := client.Get(key)
			fmt.Printf("asking for %s from dbserver\n", key)
			dest.SetBytes([]byte(result))
			return nil
		}))

上述代码指定了groupcache的名称,大小,以及获取源数据的方法,当客户端程序需要获取某个值时,就会调用Get函数,从groupcache集群中获取,代码如下

func (s *Frontend) Get(args *api.Load, reply *api.ValueResult) error {
	var data []byte
	fmt.Printf("cli asked for %s from groupcache\n", args.Key)
	err := s.cacheGroup.Get(nil, args.Key,
		groupcache.AllocatingByteSliceSink(&data))
	reply.Value = string(data)
	return err
}

LRU实现

代码中的cache可以基于mapcontainer/list实现,整个过程非常的简单,具体的实现过程如下:

  1. 如果一个k-v未在cache中出现,将其加入进listmap
  2. 如果这个k-v对已经出现过,则将其放在list的头部
  3. nums >maxEntries时,将它从listmap中移除出去

代码如下

type Cache struct {
	MaxEntries int
	// 当驱逐某个key时调用的函数,用于回调
	OnEvicted func(key Key, value interface{})
	ll	*list.List
	cache map[interface{}]*list.Element
}
type Key interface{}
type entry struct {
	key   Key
	value interface{}
}
func (c *Cache) Add(key Key, value interface{}) {
	if c.cache == nil {
		c.cache = make(map[interface{}]*list.Element)
		c.ll = list.New()
	}
	if ee, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ee)
		// ee.Value.(*entry)类型断言成*entry并赋值
		ee.Value.(*entry).value = value
		return
	}
	ele := c.ll.PushFront(&entry{key, value})
	c.cache[key] = ele
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.RemoveOldest()
	}
}

删除元素的函数如下所示:首先,从list中删除,然后从map中移除,最后触发onEvicted()函数

func (c *Cache) removeElement(e *list.Element) {
	c.ll.Remove(e)
	kv := e.Value.(*entry)
	delete(c.cache, kv.key)
	if c.OnEvicted != nil {
		c.OnEvicted(kv.key, kv.value)
	}
}

一致性哈希

数据的查找使用的是一致性哈希,简单的说就是计算每个key的Hash值,将其存放在hash值大于它的hash值的第一个map中
但是在项目中看不到任何关于迁移的代码,一致性哈希一个重要的特性就是迁移的代价小,增减节点只需要将临近节点的数据迁移
过程如下:

  1. 首先根据传入的key(ip+port+id,id为replica的编号),根据hash函数(提供了默认的hash函数)计算hash值,hash值作为hashMap的key,key作为value存储下来,并对key数组排序,方便后续的二分查找
  2. 在查找某个key所在的server,通过hash值的数组,使用二分法查找第一个hash值大于或等于这个key的hash值的服务器,返回该服务器的ip+port

代码如下所示

type Hash func(data []byte) uint32
type Map struct {
	hash     Hash
	replicas int
	keys     []int // Sorted
	hashMap  map[int]string
}
func New(replicas int, fn Hash) *Map {
	m := &Map{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[int]string),
	}
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
	return len(m.keys) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key
		}
	}
	sort.Ints(m.keys)
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
	if m.IsEmpty() {
		return ""
	}
	hash := int(m.hash([]byte(key)))
		idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
	if idx == len(m.keys) {
		idx = 0
	}
	return m.hashMap[m.keys[idx]]
}

singleflight

这段请求控制的代码非常有意思,当请求的key在map中不存在时,说明未有该请求,先wg.Add(1),然后执行fn(),执行完成后的结果保存在call中并且wg.Done()
如果在执行fn()的过程中,出现了新的请求,会使用wg.Wait()将请求阻塞,直到wg.Done()后,直接从map中取出该key所对应的call并直接返回结果,不再执行fn()函数
代码如下:

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	c.val, c.err = fn()
	c.wg.Done()
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
	return c.val, c.err
}

groupcache模块

首先为Group结构体的定义,包括cache、peer的选择、并发多请求的解决

type Group struct {
	name       string // name of group
	getter     Getter //the function runs when getting data from source
	peersOnce  sync.Once //only run once
	peers      PeerPicker
	cacheBytes int64 // limit for sum of mainCache and hotCache size
	mainCache cache
	hotCache cache
	loadGroup flightGroup
	// Stats are statistics on the group.
	Stats Stats
}

Get为从groupcache分布式缓存中获取value的整个过程:

  • 首先从本地的cache中查找(hotcache、maincache),最终会由LRU进行查找
  • 如果没有则程序跳转至group.load从peers的cache中查找,使用http进行请求的发送和响应的接收
  • 如果还是没有,则调用group.getter.Get向源数据库中查找

注意!!!这段代码有点意思!
整个load过程又是由之前所说的flightgroup的Do进行控制的,确保getFromPeer以及getLocally在同时出现的针对同一个key的多个请求只执行一次。
代码如下:

func (g *Group) Get(ctx Context, key string, dest Sink) error {
	g.peersOnce.Do(g.initPeers)
	g.Stats.Gets.Add(1)
	if dest == nil {
		return errors.New("groupcache: nil dest Sink")
	}
	//1、从cache中寻找
	value, cacheHit := g.lookupCache(key)
	if cacheHit {
		g.Stats.CacheHits.Add(1)
		return setSinkView(dest, value)
	}
	destPopulated := false
	// 2、从其他的peer中或者数据源中获取
	value, destPopulated, err := g.load(ctx, key, dest)
	if err != nil {
		return err
	}
	if destPopulated {
		return nil
	}
	return setSinkView(dest, value)
}

// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
	g.Stats.Loads.Add(1)
	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
		// 1、再尝试从cache中查找一遍
		if value, cacheHit := g.lookupCache(key); cacheHit {
			g.Stats.CacheHits.Add(1)
			return value, nil
		}
		g.Stats.LoadsDeduped.Add(1)
		var value ByteView
		var err error
		//2、根据key选择其所在的peer,再根据peer的ip+port请求获取value
		if peer, ok := g.peers.PickPeer(key); ok {
			value, err = g.getFromPeer(ctx, peer, key)
			if err == nil {
				g.Stats.PeerLoads.Add(1)
				return value, nil
			}
			g.Stats.PeerErrors.Add(1)
		}
		//3、如果上述方法都不成功只能从本地的数据源中获取数据
		value, err = g.getLocally(ctx, key, dest)
		if err != nil {
			g.Stats.LocalLoadErrs.Add(1)
			return nil, err
		}
		g.Stats.LocalLoads.Add(1)
		destPopulated = true // only one caller of load gets this return value
		g.populateCache(key, value, &g.mainCache)
		return value, nil
	})
	if err == nil {
		value = viewi.(ByteView)
	}
	return
}

func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
	err := g.getter.Get(ctx, key, dest)
	if err != nil {
		return ByteView{}, err
	}
	return dest.view()
}

func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
	req := &pb.GetRequest{
		Group: &g.name,
		Key:   &key,
	}
	res := &pb.GetResponse{}
	err := peer.Get(ctx, req, res)
	if err != nil {
		return ByteView{}, err
	}
	value := ByteView{b: res.Value}
	if rand.Intn(10) == 0 {
		g.populateCache(key, value, &g.hotCache)
	}
	return value, nil
}

http包

首先定义了HTTPPool这个资源池,包括利用一致性哈希的索引表以及利用ip+port作为key的一个httpGetter的map,结构如下:

type HTTPPool struct {
	Context func(*http.Request) Context
	Transport func(Context) http.RoundTripper
	self string
	opts HTTPPoolOptions
	mu          sync.Mutex // guards peers and httpGetters
	peers       *consistenthash.Map
	httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}

当需要对HTTPPool进行设置,因此在外面套了一个NewHTTPPoolOpts用于对HTTPPool进行初始化,设置Replica以及初始化consistHash,最终返回HTTPPool对象

func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
	if httpPoolMade {
		panic("groupcache: NewHTTPPool must be called only once")
	}
	httpPoolMade = true
	p := &HTTPPool{
		self:        self,
		httpGetters: make(map[string]*httpGetter),
	}
	if o != nil {
		p.opts = *o
	}
	if p.opts.BasePath == "" {
		p.opts.BasePath = defaultBasePath
	}
	if p.opts.Replicas == 0 {
		p.opts.Replicas = defaultReplicas
	}
	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
	RegisterPeerPicker(func() PeerPicker { return p })
	return p
}

同时,HTTPPool实现了PeerPicker接口,根据给定的key计算出哈希值,在哈希表中查找出该元素所在peer的GetterFunc,过程如下:

func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if p.peers.IsEmpty() {
		return nil, false
	}
	if peer := p.peers.Get(key); peer != p.self {
		return p.httpGetters[peer], true
	}
	return nil, false
}

在客户端程序中,需要指定其他的peer的ip地址以方便获取GetterFunc,并且后期不能改变,添加进入后并完善每个peer的httpGetter,内容包括一个transport和一个url,代码如下:

func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
	}
}

HTTPPool还实现了Handler用于接受请求,并将结果写入response中返回,过程为:解析、取值、压缩、返回,代码如下:

func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
		panic("HTTPPool serving unexpected path: " + r.URL.Path)
	}
	parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
	if len(parts) != 2 {
		http.Error(w, "bad request", http.StatusBadRequest)
		return
	}
	groupName := parts[0]
	key := parts[1]
	group := GetGroup(groupName)
	if group == nil {
		http.Error(w, "no such group: "+groupName, http.StatusNotFound)
		return
	}
	var ctx Context
	if p.Context != nil {
		ctx = p.Context(r)
	}
	group.Stats.ServerRequests.Add(1)
	var value []byte
	err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	body, err := proto.Marshal(&pb.GetResponse{Value: value})
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Header().Set("Content-Type", "application/x-protobuf")
	w.Write(body)
}

httpGetter定义了Get方法,将groupNamebaseURL以及key作为URL,向其他的peer发送请求,得到响应之后,将结果用buffer缓存并解析到response

func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(in.GetGroup()),
		url.QueryEscape(in.GetKey()),
	)
	req, err := http.NewRequest("GET", u, nil)
	if err != nil {
		return err
	}
	tr := http.DefaultTransport
	if h.transport != nil {
		tr = h.transport(context)
	}
	res, err := tr.RoundTrip(req)
	if err != nil {
		return err
	}
	defer res.Body.Close()
	if res.StatusCode != http.StatusOK {
		return fmt.Errorf("server returned: %v", res.Status)
	}
	b := bufferPool.Get().(*bytes.Buffer)
	b.Reset()
	defer bufferPool.Put(b)
	_, err = io.Copy(b, res.Body)
	if err != nil {
		return fmt.Errorf("reading response body: %v", err)
	}
	err = proto.Unmarshal(b.Bytes(), out)
	if err != nil {
		return fmt.Errorf("decoding response body: %v", err)
	}
	return nil
}

剩下在ByteView中定义了一个数据结构,包含一个string和一个byte数组,而Sink定义了接口用于对实现Set函数,方便多种格式的数据进行存取。

总结

看这个项目主要是为了学习一个Go语言的项目的架构,以及熟悉Go语言的语法。

  1. 良好的接口定义能够省事很多,但是也非常得难
  2. 可以使用注册的方式向包的内部传递自己的函数,简单的说就是包内部定义一个函数的变量,使用R egister函数对变量进行复制,所有的东西需要先注册后再创建
  3. 多抽象,使用接口代替变量或者结构体作为函数的形参(这样的表述好像有点问题),这样更容易拓展
10-06 14:46