简介
groupcache是一个分布式kv缓存的library,能够实现对数据的set、get,而不能进行update和delete操作。groupcache采用的是P2P的架构,所有的节点都是同构的。当客户端程序查找某个值时,groupcache先在本地的cache中进行查找,如果不存在,则通过一致性哈希寻找该key-value所在的peer的地址,如果都不存在则从数据源(数据库)拉取。cache使用LRU策略淘汰最近不常用的数据。需要注意的是,如果很多请求都访问cache中不存在的值时,会导致groupcache向数据源请求数据,这时候需要阻塞请求,保证只有一个请求执行。
需要注意的是groupcache只是一个分布式的kv缓存库,我们需要将自己的规则填入,capotej 实现了一个了一个demo,下面将自顶向下分析整个库的源码。
Demo
客户端与groupcache之间通过RPC通信,而peer之间通过http进行数据的传输,而数据源通过延时模拟一个数据库的存取过程。
架构
Command line client 与前端之间通过RPC进行通信,而groupcache服务器之间通过http进行数据的传输,而DB Server作为数据源,当所有的cache中都不存在的时候,向数据源获取结果
var stringcache = groupcache.NewGroup("SlowDBCache", 64<<20, groupcache.GetterFunc(
func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
result := client.Get(key)
fmt.Printf("asking for %s from dbserver\n", key)
dest.SetBytes([]byte(result))
return nil
}))
上述代码指定了groupcache的名称,大小,以及获取源数据的方法,当客户端程序需要获取某个值时,就会调用Get函数,从groupcache集群中获取,代码如下
func (s *Frontend) Get(args *api.Load, reply *api.ValueResult) error {
var data []byte
fmt.Printf("cli asked for %s from groupcache\n", args.Key)
err := s.cacheGroup.Get(nil, args.Key,
groupcache.AllocatingByteSliceSink(&data))
reply.Value = string(data)
return err
}
LRU实现
代码中的cache可以基于map
和container/list
实现,整个过程非常的简单,具体的实现过程如下:
- 如果一个k-v未在cache中出现,将其加入进
list
和map
中 - 如果这个k-v对已经出现过,则将其放在
list
的头部 - 当
nums >maxEntries
时,将它从list
和map
中移除出去
代码如下
type Cache struct {
MaxEntries int
// 当驱逐某个key时调用的函数,用于回调
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
type Key interface{}
type entry struct {
key Key
value interface{}
}
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
// ee.Value.(*entry)类型断言成*entry并赋值
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
删除元素的函数如下所示:首先,从list中删除,然后从map中移除,最后触发onEvicted()
函数
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
一致性哈希
数据的查找使用的是一致性哈希,简单的说就是计算每个key的Hash值,将其存放在hash值大于它的hash值的第一个map中
但是在项目中看不到任何关于迁移的代码,一致性哈希一个重要的特性就是迁移的代价小,增减节点只需要将临近节点的数据迁移
过程如下:
- 首先根据传入的key(
ip+port+id
,id为replica的编号),根据hash函数(提供了默认的hash函数)计算hash值,hash值作为hashMap
的key,key作为value
存储下来,并对key数组排序,方便后续的二分查找 - 在查找某个key所在的server,通过hash值的数组,使用二分法查找第一个hash值大于或等于这个key的hash值的服务器,返回该服务器的
ip+port
代码如下所示
type Hash func(data []byte) uint32
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
return len(m.keys) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
hash := int(m.hash([]byte(key)))
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
if idx == len(m.keys) {
idx = 0
}
return m.hashMap[m.keys[idx]]
}
singleflight
这段请求控制的代码非常有意思,当请求的key在map中不存在时,说明未有该请求,先wg.Add(1)
,然后执行fn()
,执行完成后的结果保存在call
中并且wg.Done()
如果在执行fn()
的过程中,出现了新的请求,会使用wg.Wait()
将请求阻塞,直到wg.Done()
后,直接从map中取出该key所对应的call并直接返回结果,不再执行fn()
函数
代码如下:
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
groupcache模块
首先为Group结构体的定义,包括cache、peer的选择、并发多请求的解决
type Group struct {
name string // name of group
getter Getter //the function runs when getting data from source
peersOnce sync.Once //only run once
peers PeerPicker
cacheBytes int64 // limit for sum of mainCache and hotCache size
mainCache cache
hotCache cache
loadGroup flightGroup
// Stats are statistics on the group.
Stats Stats
}
Get
为从groupcache分布式缓存中获取value
的整个过程:
- 首先从本地的cache中查找(hotcache、maincache),最终会由LRU进行查找
- 如果没有则程序跳转至
group.load
从peers的cache中查找,使用http进行请求的发送和响应的接收 - 如果还是没有,则调用
group.getter.Get
向源数据库中查找
注意!!!这段代码有点意思!
整个load
过程又是由之前所说的flightgroup
的Do进行控制的,确保getFromPeer
以及getLocally
在同时出现的针对同一个key
的多个请求只执行一次。
代码如下:
func (g *Group) Get(ctx Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
//1、从cache中寻找
value, cacheHit := g.lookupCache(key)
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
destPopulated := false
// 2、从其他的peer中或者数据源中获取
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// 1、再尝试从cache中查找一遍
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
//2、根据key选择其所在的peer,再根据peer的ip+port请求获取value
if peer, ok := g.peers.PickPeer(key); ok {
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
}
//3、如果上述方法都不成功只能从本地的数据源中获取数据
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache)
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}
func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}
func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}
return value, nil
}
http包
首先定义了HTTPPool
这个资源池,包括利用一致性哈希的索引表以及利用ip+port作为key的一个httpGetter
的map,结构如下:
type HTTPPool struct {
Context func(*http.Request) Context
Transport func(Context) http.RoundTripper
self string
opts HTTPPoolOptions
mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map
httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}
当需要对HTTPPool
进行设置,因此在外面套了一个NewHTTPPoolOpts
用于对HTTPPool
进行初始化,设置Replica
以及初始化consistHash
,最终返回HTTPPool
对象
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
if httpPoolMade {
panic("groupcache: NewHTTPPool must be called only once")
}
httpPoolMade = true
p := &HTTPPool{
self: self,
httpGetters: make(map[string]*httpGetter),
}
if o != nil {
p.opts = *o
}
if p.opts.BasePath == "" {
p.opts.BasePath = defaultBasePath
}
if p.opts.Replicas == 0 {
p.opts.Replicas = defaultReplicas
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
RegisterPeerPicker(func() PeerPicker { return p })
return p
}
同时,HTTPPool
实现了PeerPicker
接口,根据给定的key
计算出哈希值,在哈希表中查找出该元素所在peer的GetterFunc
,过程如下:
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
if peer := p.peers.Get(key); peer != p.self {
return p.httpGetters[peer], true
}
return nil, false
}
在客户端程序中,需要指定其他的peer的ip地址以方便获取GetterFunc
,并且后期不能改变,添加进入后并完善每个peer的httpGetter
,内容包括一个transport和一个url,代码如下:
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
}
}
HTTPPool
还实现了Handler
用于接受请求,并将结果写入response
中返回,过程为:解析、取值、压缩、返回,代码如下:
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
var ctx Context
if p.Context != nil {
ctx = p.Context(r)
}
group.Stats.ServerRequests.Add(1)
var value []byte
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body, err := proto.Marshal(&pb.GetResponse{Value: value})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.Write(body)
}
httpGetter
定义了Get
方法,将groupName
和baseURL
以及key
作为URL,向其他的peer
发送请求,得到响应之后,将结果用buffer缓存并解析到response
中
func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return err
}
tr := http.DefaultTransport
if h.transport != nil {
tr = h.transport(context)
}
res, err := tr.RoundTrip(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned: %v", res.Status)
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
_, err = io.Copy(b, res.Body)
if err != nil {
return fmt.Errorf("reading response body: %v", err)
}
err = proto.Unmarshal(b.Bytes(), out)
if err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}
剩下在ByteView
中定义了一个数据结构,包含一个string和一个byte数组,而Sink
定义了接口用于对实现Set
函数,方便多种格式的数据进行存取。
总结
看这个项目主要是为了学习一个Go语言的项目的架构,以及熟悉Go语言的语法。
- 良好的接口定义能够省事很多,但是也非常得难
- 可以使用注册的方式向包的内部传递自己的函数,简单的说就是包内部定义一个函数的变量,使用
R egister
函数对变量进行复制,所有的东西需要先注册后再创建 - 多抽象,使用接口代替变量或者结构体作为函数的形参(这样的表述好像有点问题),这样更容易拓展