func MakeGroupcacheStore()

in snapshot/store/groupcache_store.go [37:102]


func MakeGroupcacheStore(underlying Store, cfg *GroupcacheConfig, ttlc *TTLConfig, stat stats.StatsReceiver) (Store, http.Handler, error) {
	if ttlc == nil {
		return nil, nil, fmt.Errorf("MakeGroupcacheStore requires a non-nil TTLConfig")
	}
	stat = stat.Scope("bundlestoreCache")
	go stats.StartUptimeReporting(stat, stats.BundlestoreUptime_ms, "", stats.DefaultStartupGaugeSpikeLen)

	// Create the cache which knows how to retrieve the underlying bundle data.
	var cache = groupcache.NewGroup(
		cfg.Name,
		cfg.Memory_bytes,
		groupcache.GetterFunc(func(ctx groupcache.Context, bundleName string, dest groupcache.Sink) (*time.Time, error) {
			log.Info("Not cached, try to fetch bundle and populate cache: ", bundleName)
			stat.Counter(stats.GroupcacheReadUnderlyingCounter).Inc(1)
			defer stat.Latency(stats.GroupcacheReadUnderlyingLatency_ms).Time().Stop()

			resource, err := underlying.OpenForRead(bundleName)
			if err != nil {
				return nil, err
			}
			defer resource.Close()
			data, err := ioutil.ReadAll(resource)
			if err != nil {
				return nil, err
			}
			var ttl *time.Time
			if resource.TTLValue != nil {
				ttl = &resource.TTLValue.TTL
			}
			return ttl, dest.SetBytes(data)
		}),
		groupcache.ContainerFunc(func(ctx groupcache.Context, bundleName string) (bool, error) {
			log.Info("Not cached, try to check bundle existence: ", bundleName)
			stat.Counter(stats.GroupcacheExistUnderlyingCounter).Inc(1)
			defer stat.Latency(stats.GroupcacheExistUnderlyingLatency_ms).Time().Stop()

			ok, err := underlying.Exists(bundleName)
			if err != nil {
				return false, err
			}
			return ok, nil
		}),
		groupcache.PutterFunc(func(ctx groupcache.Context, bundleName string, data []byte, ttl *time.Time) error {
			log.Info("New bundle, write and populate cache: ", bundleName)
			stat.Counter(stats.GroupcacheWriteUnderlyingCounter).Inc(1)
			defer stat.Latency(stats.GroupcacheWriteUnderlyingLatency_ms).Time().Stop()

			ttlv := &TTLValue{TTL: *ttl, TTLKey: ttlc.TTLKey}
			buf := ioutil.NopCloser(bytes.NewReader(data))
			r := NewResource(buf, int64(len(data)), ttlv)
			err := underlying.Write(bundleName, r)
			if err != nil {
				return err
			}
			return nil
		}),
	)

	// Create and initialize peer group.
	// The HTTPPool constructor will register as a global PeerPicker on our behalf.
	poolOpts := &groupcache.HTTPPoolOptions{BasePath: cfg.Endpoint}
	pool := groupcache.NewHTTPPoolOpts("http://"+cfg.AddrSelf, poolOpts)
	go loop(cfg.NodeReqCh, pool, cache, stat)

	return &groupcacheStore{underlying: underlying, cache: cache, stat: stat, ttlConfig: ttlc}, pool, nil
}