GroupCache 源码阅读
总述
项目地址:https://github.com/golang/groupcache
题外话,是memcached的作者写的,细节处还是很精彩的
groupcache已经在dl.Google.com、Blogger、Google Code、Google Fiber、Google生产监视系统等项目中投入使用
相对于go-cache而言,groupcache提供了分布式的缓存功能,功能复杂度降低(见制约),但是大规模的缓存能力提升。
框架灵活性强,可以灵活定制缓存策略,缓存源数据无关性
还满足多节点缓存的分布式一致性hash,以及本地缓存miss后向其他缓存查询的机制。
框架较为健壮,细节上对效率有考究,可以应对缓存击穿、缓存穿透问题以及缓存雪崩问题
Tips:看bradfitz大佬的这个源码,可以重点关注大段的注释和TODO,都是很值得思考的部分
制约
只能get、remove,不支持update,只能remove后再get
过期机制为限制cache队列长度,不能设置过期时间,只能通过内置lru淘汰过期数据;
经常变更的数据不适合groupcache作为缓存;
注意一点,groupcache的consistenhash实现只能增加节点,无法减少节点,若要实现该功能,要注意当减少节点时,如果有相同的虚拟节点映射到同一个位置,则要判断删除的数量。增加不存在问题,因为增加的时候后来的虚拟节点可以替换先设置的虚拟节点,不影响使用。
源码分析:
先说下大体文件结构:
因为groupcache是一个库性质的,所以没有main入口,但是看使用示例也知道,入口是groupcache.go的NewGroup, 所以应该从groupcache.go文件开始分析。
http.go实现了peers之间的http查询缓存的请求方法和响应服务,
groupcachepb提供了上述节点之间其消息序列化和反序列化协议,基于protobuf,在groupcache.proto中定义。
peers.go抽象了缓存节点,提供了注册、获取节点的机制
sinks.go抽象了数据容器, 可以以不同的方法初始化设置,最终以统一的字节切片读出
byteview.go封装了字节切片的多种操作方法
singleflight保证了多个相同key的请求下的加锁等待,避免了热点缓存击穿问题, waitGroup实现
lru提供了lru策略的实现,过期方式为队列长度超预期则删除。
consistenthash提供了分布式一致性hash的抽象,其本身并没有实现分布式一致性的hash,而是可以以指定的hash完成分布式一致性的key的定位和节点的增加,注意这里没有实现节点删除。
后面细说:
groupcache.go:
NewGroup提供了group的新增,内部完成了去重(重复注册则panic),并提供了两个位置的hook,一个是初次建立对等节点服务的hook(initPeerServer,其实不算hook,算是提供的初始化定制策略),另一个是新建缓存组的hook(newGroupHook)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// A Group is a cache namespace and associated data loaded spread over a group of 1 or more machines. type Group struct { name string getter Getter peersOnce sync.Once peers PeerPicker cacheBytes int64 // limit for sum of mainCache and hotCache size mainCache cache hotCache cache loadGroup flightGroup _ int32 // force Stats to be 8-byte aligned on 32-bit platforms Stats Stats } |
查看group定义,getter是传入的回调函数,给出了当本地miss同时缓存节点miss,或者本地miss,但该key属于本节点维护时该如何获取数据的函数。见load
里面有两个cache,一个是mainCache,其实就是本地的cache,hotCache是当本地miss,向其他缓存请求时本地也会缓存下来,防止反复向其他对等节点的请求。
flightGroup定义了接口,用以保证对同一个key的并发请求不会引起大量的peers间的请求,用waitGroup实现,具体实现文件在singleflight.
_ int32用来在32bit平台上强制8字节对齐的(这个较少遇到)
Stats是一些统计,比如缓存请求数,hit数,peer请求数等等
重点是Get和load函数的分析
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func (g *Group) Get(ctx Context, key string, dest Sink) error { g.peersOnce.Do(g.initPeers) //首次运行,初始化对等节点 g.Stats.Gets.Add(1) //设置stats if dest == nil { //必须指定数据载体 return errors.New("groupcache: nil dest Sink") } value, cacheHit := g.lookupCache(key) //本地缓存查找 if cacheHit { //本地缓存命中 g.Stats.CacheHits.Add(1) return setSinkView(dest, value) } |
destPopulated := false //由于load中会对并发的请求做处理,只有最先到的请求会直接向对等节点请求,执行loadGroup.Do的传入参数二的闭包函数,而该函数调用了getLocally设置了dest,其他的只会等待,所以需要这个标志,对没有直接请求的另外执行setSinkView完成值的传递。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
value, destPopulated, err := g.load(ctx, key, dest) if err != nil { return err } if destPopulated { return nil } return setSinkView(dest, value) } // load loads key either by invoking the getter locally or by sending it to another machine. func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { //注意这里又调用了lookupCache, 这个很巧妙,brilliant!,防止了略有先后的两个请求可能导致两次重复对peer的访问,比如后来的请求在第一个请求还没有发起对等节点查询的时候发起 //等第一个请求还没完成loadGroup.Do(key,func)中的func()中对缓存的设置 getFromPeer,第二个请求完成了本地缓存的检索,则第二个缓存会重复进入load,并且此时可能loadGroup.Do已经返回, //waitGroup已经结束,会再一次向对等节点发起请求 if value, cacheHit := g.lookupCache(key); cacheHit { g.Stats.CacheHits.Add(1) return value, nil } g.Stats.LoadsDeduped.Add(1) var value ByteView var err error if peer, ok := g.peers.PickPeer(key); ok { //根据分布式一致性hash查找对应节点, ok为true表明不是本机 value, err = g.getFromPeer(ctx, peer, key) //向对应节点请求数据 if err == nil { g.Stats.PeerLoads.Add(1) return value, nil } g.Stats.PeerErrors.Add(1) } value, err = g.getLocally(ctx, key, dest) //peer未找到该节点,或者该key属于本节点管辖,应该请求向数据源获取数据,调用group初始化的getter回调函数 if err != nil { g.Stats.LocalLoadErrs.Add(1) return nil, err } g.Stats.LocalLoads.Add(1) destPopulated = true // only one caller of load gets this return value g.populateCache(key, value, &g.mainCache) // 再次从本地cache中加载 return value, nil }) if err == nil { value = viewi.(ByteView) } return } |
这两个函数之后的内容就好分析了很多,只单独说下populateCache,该函数将新增的(无论从peer获取,或从数据源getter获取)key\value存入对应的缓存,peer→hotCache, 数据源获取→mainCache,
并在该函数内实现了对cache的维护,当两个cache超过总的大小限制,则根据lru删除多余的内容,并保证hotCache大小小于mainCache/8
接下来看http.go和Peer.go部分:
1 2 3 4 5 6 7 8 9 10 11 |
type HTTPPool struct { Context func(*http.Request) Context Transport func(Context) http.RoundTripper // this peer's base URL, e.g. "https://example.net:8000" self string opts HTTPPoolOptions mu sync.Mutex // guards peers and httpGetters peers *consistenthash.Map httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008" } |
其中 Context可根据情况定制,主要用于服务端,将接收到的http请求转换成指定的context类型,便于进一步处理
Transport可根据情况定制,主要用于客户端,将一个向对等节点发起的请求context,转换成指定的Http访问方式,其中 RoundTripper为实现了
RoundTrip(*Request) (*Response, error)的接口,故而,HTTPPool可以实现访问方式的灵活定制
self为该节点本身的URL,为监听服务的地址
opts为HTTP池的默认选项
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
type HTTPPoolOptions struct { // http服务地址前缀,默认为 "/_groupcache/". BasePath string // 分布式一致性hash中虚拟节点数量,默认 50. Replicas int // 分布式一致性hash的hash算法,默认 crc32.ChecksumIEEE. HashFn consistenthash.Hash } //再往后看两个主要函数 func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { //实现了http服务的具体方法,这里简略说下 if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) // 判断URL前缀是否合法 parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2) // 分割URL,并从中提取group和key值,示例请求URL为:https://example.net:8000/_groupcache/groupname/key group := GetGroup(groupName) // 根据url中提取的groupname 获取group if p.Context != nil { ctx = p.Context(r) // 如Context不为空,说明需要使用定制的context } err := group.Get(ctx, key, AllocatingByteSliceSink(&value)) // 获取指定key对应的缓存 body, err := proto.Marshal(&pb.GetResponse{Value: value}) //序列化响应内容 w.Header().Set("Content-Type", "application/x-protobuf") // 设置http头 w.Write(body) //设置http body } //其次是 func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error { // 该方法根据需要向对等节点查询缓存 u := fmt.Sprintf( "%v%v/%v",h.baseURL ...... ) // 生成请求url req, err := http.NewRequest("GET", u, nil) // 新建Get请求 tr := http.DefaultTransport //获取transport方法 res, err := tr.RoundTrip(req) // 执行请求 b := bufferPool.Get().(*bytes.Buffer) // 这里用到了go 提供的 sync.Pool,对字节缓冲数组进行复用,避免了反复申请(缓存期为两次gc之间) b.Reset() //字节缓冲重置 _, err = io.Copy(b, res.Body) //字节缓冲填充 err = proto.Unmarshal(b.Bytes(), out) //反序列化字节数组 } |
其余的,通过实现PickPeer函数实现了接口PeerPicker,此函数用于根据key得到对应的对等节点peer,返回的peer的表现形式为实现了
ProtoGetter接口的对象,而该接口提供了Get方法,然后于getFromPeer中被调用,用以获取peer中对应key的value值
如此一来,peer.go中的最重要的两个接口PeerPicker\ProtoGetter也都介绍了,peer.go中就剩下两个Regist函数,注册一个可以根据groupName获取PeerPicker的函数,
还有一个getPeers就是根据groupName获取PeerPicker的函数,可以继续其他文件分析。
sinks.go 和 byteview.go:
byteview很好理解,就是定义了一个新的结构体类型
1 2 3 4 5 |
type ByteView struct { // If b is non-nil, b is used, else s is used. //最重要的就是这个注释,当b是nil 的时候,s作为存储内容的容器 b []byte s string } |
其余的都是针对该类型实现的方法,read write等基本方法
重头戏是sink,
1 2 3 4 5 6 7 8 9 10 11 |
type Sink interface { // SetString sets the value to s. SetString(s string) error // SetBytes sets the value to the contents of v. The caller retains ownership of v. SetBytes(v []byte) error // SetProto sets the value to the encoded version of m. The caller retains ownership of m. SetProto(m proto.Message) error // view returns a frozen view of the bytes for caching. view() (ByteView, error) } |
注释貌似说的很清楚,但是一看后面的实现,各种byte string sink byteview还是会有点凌乱,别急,不慌
Sink是一个接口,把它叫数据容器接口吧,这里面的方法分两部分,除了view,其余的设置方法都是将该接口对应的数据容器类型的内容用对应的类型进行设置,比如SetString就是用string进行初始化该sink
后面定义了实现该接口的多种类型
stringSink、byteViewSink、protoSink、allocBytesSink、truncBytesSink,顾名思义,不同的类型代表了其内部的存储方式的不同,
其每一种接口都实现了对应的初始化函数,比如以stringSink为例
1 2 3 4 |
// StringSink returns a Sink that populates the provided string pointer. func StringSink(sp *string) Sink { return &stringSink{sp: sp} } |
其实就是把该类型内部用于存储的数据容器进行初始化,以便后续SetXXX的时候可以正常设置不panic,
其中byteViewSink必须用non-nil的字节slice初始化,而allocBytesSink可以传入nil自行分配内存
此外这两种byteSink还提供了setView函数,以满足一个内部接口,函数setSinkView函数的内部接口viewSetter,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func (s *byteViewSink) setView(v ByteView) error { *s.dst = v return nil } func setSinkView(s Sink, v ByteView) error { // A viewSetter is a Sink that can also receive its value from a ByteView. This is a fast path to minimize copies when the // item was already cached locally in memory (where it's cached as a ByteView) type viewSetter interface { setView(v ByteView) error } if vs, ok := s.(viewSetter); ok { return vs.setView(v) } if v.b != nil { return s.SetBytes(v.b) } return s.SetString(v.s) } |
可以看到通过这个内部接口可以进行sink类型的区分,如果该sink类型可以成功转换成viewSetter接口,则说明其支持直接用byteView进行内容填充,则可以避免写入和读出的两次内存拷贝。Smart!
这里提醒下golang 初学者,使用sink的地方,比如groupcache.go中的Getter接口,其方法
Get(ctx Context, key string, dest Sink),初学者会好奇,为啥sink传入的不是指针,那么如何能在函数作用域内修改其值而传递出来,其实Sink是interface,interface就包含了两个指针,
一个对象指针,一个运行时类型指针,参考https://stackoverflow.com/questions/44370277/type-is-pointer-to-interface-not-interface-confusion
所以传入的对象是可以被修改的,而且,golang 中是不提倡使用 *interface类型的
lru.go:
实现了lru策略的缓存,注意,没有锁机制,所以不是线程安全 的,需要根据应用场景,在调用层进行并发读写控制, 本项目中应该是考虑缓存的应用场景不涉及key对应的value的update,而key和value作为整体放在entry
类型中,最坏的情况是cache miss, 所以没有用锁,但值得思考的是,list没有加锁的情况下进行移动和节点增删,会导致怎样的问题, TODO 需要分析汇编码,maybe crash
1 2 3 4 5 6 7 8 9 |
type Cache struct { // MaxEntries is the maximum number of cache entries before an item is evicted. Zero means no limit. MaxEntries int // OnEvicted optionally specifies a callback function to be executed when an entry is purged from the cache. OnEvicted func(key Key, value interface{}) ll *list.List cache map[interface{}]*list.Element } |
其他的可以简单过一下了:
singleflight.go 核心两个结构体,其用Group管理着同一个group中遇到的对相同key的访问,该访问存于Group.m中,用mu加锁保护读写,当同时有并发的请求,Group.m中会查询到该key, 于是不执行请求,进入
call.wg.Wait(), 直到最先到达的那个完成call.wg.Done(),如此减少了并发key请求下对peer网络的压力。
1 2 3 4 5 6 7 8 9 10 |
type call struct { wg sync.WaitGroup val interface{} err error } type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized } |
groupcachepb就不说了,参见protobuf相关的内容
最后就剩一个consistenthash.go,要理解这个当然要先去看下分布式一致性hash的原理,再看代码会轻松很多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type Map struct { hash Hash //hash 函数,[]byte--->uint32的hash replicas int //虚拟节点数 keys []int // Sorted //排好序的所有groupname及其虚拟节点的hash值 hashMap map[int]string //保存所有节点的hash对groupname的反向映射 } //核心就两个函数,一个是节点以及虚拟节点的创建 func (m *Map) Add(keys ...string) { for _, key := range keys { for i := 0; i < m.replicas; i++ { hash := int(m.hash([]byte(strconv.Itoa(i) + key))) m.keys = append(m.keys, hash) m.hashMap[hash] = key } } sort.Ints(m.keys) } |
另外是Get函数将指定key映射到指定group的过程
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
重点是groupname的映射和key的映射是使用相同的hash算法。
参考:
https://studygolang.com/articles/6209
https://www.jianshu.com/p/f69f3a3a9a78
https://www.jianshu.com/p/fc39399a27f1