in snapshot/store/groupcache_store.go [37:102]
func MakeGroupcacheStore(underlying Store, cfg *GroupcacheConfig, ttlc *TTLConfig, stat stats.StatsReceiver) (Store, http.Handler, error) {
if ttlc == nil {
return nil, nil, fmt.Errorf("MakeGroupcacheStore requires a non-nil TTLConfig")
}
stat = stat.Scope("bundlestoreCache")
go stats.StartUptimeReporting(stat, stats.BundlestoreUptime_ms, "", stats.DefaultStartupGaugeSpikeLen)
// Create the cache which knows how to retrieve the underlying bundle data.
var cache = groupcache.NewGroup(
cfg.Name,
cfg.Memory_bytes,
groupcache.GetterFunc(func(ctx groupcache.Context, bundleName string, dest groupcache.Sink) (*time.Time, error) {
log.Info("Not cached, try to fetch bundle and populate cache: ", bundleName)
stat.Counter(stats.GroupcacheReadUnderlyingCounter).Inc(1)
defer stat.Latency(stats.GroupcacheReadUnderlyingLatency_ms).Time().Stop()
resource, err := underlying.OpenForRead(bundleName)
if err != nil {
return nil, err
}
defer resource.Close()
data, err := ioutil.ReadAll(resource)
if err != nil {
return nil, err
}
var ttl *time.Time
if resource.TTLValue != nil {
ttl = &resource.TTLValue.TTL
}
return ttl, dest.SetBytes(data)
}),
groupcache.ContainerFunc(func(ctx groupcache.Context, bundleName string) (bool, error) {
log.Info("Not cached, try to check bundle existence: ", bundleName)
stat.Counter(stats.GroupcacheExistUnderlyingCounter).Inc(1)
defer stat.Latency(stats.GroupcacheExistUnderlyingLatency_ms).Time().Stop()
ok, err := underlying.Exists(bundleName)
if err != nil {
return false, err
}
return ok, nil
}),
groupcache.PutterFunc(func(ctx groupcache.Context, bundleName string, data []byte, ttl *time.Time) error {
log.Info("New bundle, write and populate cache: ", bundleName)
stat.Counter(stats.GroupcacheWriteUnderlyingCounter).Inc(1)
defer stat.Latency(stats.GroupcacheWriteUnderlyingLatency_ms).Time().Stop()
ttlv := &TTLValue{TTL: *ttl, TTLKey: ttlc.TTLKey}
buf := ioutil.NopCloser(bytes.NewReader(data))
r := NewResource(buf, int64(len(data)), ttlv)
err := underlying.Write(bundleName, r)
if err != nil {
return err
}
return nil
}),
)
// Create and initialize peer group.
// The HTTPPool constructor will register as a global PeerPicker on our behalf.
poolOpts := &groupcache.HTTPPoolOptions{BasePath: cfg.Endpoint}
pool := groupcache.NewHTTPPoolOpts("http://"+cfg.AddrSelf, poolOpts)
go loop(cfg.NodeReqCh, pool, cache, stat)
return &groupcacheStore{underlying: underlying, cache: cache, stat: stat, ttlConfig: ttlc}, pool, nil
}