snapshot/store/groupcache_store.go (196 lines of code) (raw):

package store import ( "bytes" "fmt" "io/ioutil" "net/http" "time" log "github.com/sirupsen/logrus" "github.com/twitter/groupcache" "github.com/twitter/scoot/cloud/cluster" "github.com/twitter/scoot/common/stats" ) // Called periodically in a goroutine. Must include the current instance among the fetched nodes. type PeerFetcher interface { Fetch() ([]cluster.Node, error) } // TODO: we should consider extending contexts in groupcache lib further to: // 1) control hot cache population rate deterministically (currently hardcoded at 10%) // 2) add more advanced caching behaviors like propagation to peer caches, // or populate cache and skip underlying store write, etc. // Note: Endpoint is concatenated with Name in groupcache internals, and AddrSelf is expected as HOST:PORT. type GroupcacheConfig struct { Name string Memory_bytes int64 AddrSelf string Endpoint string NodeReqCh cluster.NodeReqChType } // Add in-memory caching to the given store. func MakeGroupcacheStore(underlying Store, cfg *GroupcacheConfig, ttlc *TTLConfig, stat stats.StatsReceiver) (Store, http.Handler, error) { if ttlc == nil { return nil, nil, fmt.Errorf("MakeGroupcacheStore requires a non-nil TTLConfig") } stat = stat.Scope("bundlestoreCache") go stats.StartUptimeReporting(stat, stats.BundlestoreUptime_ms, "", stats.DefaultStartupGaugeSpikeLen) // Create the cache which knows how to retrieve the underlying bundle data. var cache = groupcache.NewGroup( cfg.Name, cfg.Memory_bytes, groupcache.GetterFunc(func(ctx groupcache.Context, bundleName string, dest groupcache.Sink) (*time.Time, error) { log.Info("Not cached, try to fetch bundle and populate cache: ", bundleName) stat.Counter(stats.GroupcacheReadUnderlyingCounter).Inc(1) defer stat.Latency(stats.GroupcacheReadUnderlyingLatency_ms).Time().Stop() resource, err := underlying.OpenForRead(bundleName) if err != nil { return nil, err } defer resource.Close() data, err := ioutil.ReadAll(resource) if err != nil { return nil, err } var ttl *time.Time if resource.TTLValue != nil { ttl = &resource.TTLValue.TTL } return ttl, dest.SetBytes(data) }), groupcache.ContainerFunc(func(ctx groupcache.Context, bundleName string) (bool, error) { log.Info("Not cached, try to check bundle existence: ", bundleName) stat.Counter(stats.GroupcacheExistUnderlyingCounter).Inc(1) defer stat.Latency(stats.GroupcacheExistUnderlyingLatency_ms).Time().Stop() ok, err := underlying.Exists(bundleName) if err != nil { return false, err } return ok, nil }), groupcache.PutterFunc(func(ctx groupcache.Context, bundleName string, data []byte, ttl *time.Time) error { log.Info("New bundle, write and populate cache: ", bundleName) stat.Counter(stats.GroupcacheWriteUnderlyingCounter).Inc(1) defer stat.Latency(stats.GroupcacheWriteUnderlyingLatency_ms).Time().Stop() ttlv := &TTLValue{TTL: *ttl, TTLKey: ttlc.TTLKey} buf := ioutil.NopCloser(bytes.NewReader(data)) r := NewResource(buf, int64(len(data)), ttlv) err := underlying.Write(bundleName, r) if err != nil { return err } return nil }), ) // Create and initialize peer group. // The HTTPPool constructor will register as a global PeerPicker on our behalf. poolOpts := &groupcache.HTTPPoolOptions{BasePath: cfg.Endpoint} pool := groupcache.NewHTTPPoolOpts("http://"+cfg.AddrSelf, poolOpts) go loop(cfg.NodeReqCh, pool, cache, stat) return &groupcacheStore{underlying: underlying, cache: cache, stat: stat, ttlConfig: ttlc}, pool, nil } // Convert 'host:port' node ids to the format expected by groupcache peering, http URLs. func toPeers(nodes []cluster.Node, stat stats.StatsReceiver) []string { peers := []string{} for _, node := range nodes { peers = append(peers, "http://"+string(node.Id())) } log.Info("New groupcacheStore peers: ", peers) stat.Counter(stats.GroupcachePeerDiscoveryCounter).Inc(1) stat.Gauge(stats.GroupcachePeerCountGauge).Update(int64(len(peers))) return peers } // Loop will listen for cluster updates and create a list of peer addresses to update groupcache. // Cluster is expected to include the current node. // Also updates cache stats, every 1s for now to account for arbitrary stat latch time. func loop(nodesReqCh cluster.NodeReqChType, pool *groupcache.HTTPPool, cache *groupcache.Group, stat stats.StatsReceiver) { nodesCh := make(chan []cluster.Node) statsTicker := time.NewTicker(1 * time.Second) for { nodesReqCh <- nodesCh // send a request for the nodes pool.Set(toPeers(<-nodesCh, stat)...) // record stats every 1s select { case <-statsTicker.C: updateCacheStats(cache, stat) default: } } } // Implements snapshot/store.Store interface type groupcacheStore struct { underlying Store cache *groupcache.Group stat stats.StatsReceiver ttlConfig *TTLConfig } func (s *groupcacheStore) OpenForRead(name string) (*Resource, error) { log.Info("Read() checking for cached bundle: ", name) defer s.stat.Latency(stats.GroupcacheReadLatency_ms).Time().Stop() s.stat.Counter(stats.GroupcacheReadCounter).Inc(1) var data []byte ttl, err := s.cache.Get(nil, name, groupcache.AllocatingByteSliceSink(&data)) if err != nil { return nil, err } var ttlv *TTLValue = nil if ttl != nil { ttlv = &TTLValue{TTL: *ttl, TTLKey: s.ttlConfig.TTLKey} } rc := ioutil.NopCloser(bytes.NewReader(data)) s.stat.Counter(stats.GroupcacheReadOkCounter).Inc(1) return NewResource(rc, int64(len(data)), ttlv), nil } func (s *groupcacheStore) Exists(name string) (bool, error) { log.Info("Exists() checking for cached bundle: ", name) defer s.stat.Latency(stats.GroupcachExistsLatency_ms).Time().Stop() s.stat.Counter(stats.GroupcacheExistsCounter).Inc(1) ok, err := s.cache.Contain(nil, name) if err != nil { return false, err } if !ok { return false, nil } s.stat.Counter(stats.GroupcacheExistsOkCounter).Inc(1) return true, nil } func (s *groupcacheStore) Write(name string, resource *Resource) error { defer s.stat.Latency(stats.GroupcacheWriteLatency_ms).Time().Stop() s.stat.Counter(stats.GroupcacheWriteCounter).Inc(1) if resource == nil { log.Info("Writing nil resource is a no op.") return nil } // Read data into a []byte and make a right-sized copy, as ReadAll will reserve at 2x capacity b, err := ioutil.ReadAll(resource) if err != nil { return err } c := make([]byte, len(b)) copy(c, b) var ttl *time.Time if resource.TTLValue != nil { ttl = &resource.TTLValue.TTL } log.Infof("Write() bundle %s: length: %d ttl: %s", name, len(b), ttl) if err := s.cache.Put(nil, name, c, ttl); err != nil { return err } s.stat.Counter(stats.GroupcacheWriteOkCounter).Inc(1) return nil } func (s *groupcacheStore) Root() string { return s.underlying.Root() } // The groupcache lib updates its stats in the background - we need to convert those to our own stat representation. // Gauges are expected to fluctuate, counters are expected to only ever increase. func updateCacheStats(cache *groupcache.Group, stat stats.StatsReceiver) { stat.Gauge(stats.GroupcacheMainBytesGauge).Update(cache.CacheStats(groupcache.MainCache).Bytes) stat.Gauge(stats.GroupcacheMainItemsGauge).Update(cache.CacheStats(groupcache.MainCache).Items) stat.Counter(stats.GroupcacheMainGetsCounter).Update(cache.CacheStats(groupcache.MainCache).Gets) stat.Counter(stats.GroupcacheMainHitsCounter).Update(cache.CacheStats(groupcache.MainCache).Hits) stat.Counter(stats.GroupcacheMainEvictionsCounter).Update(cache.CacheStats(groupcache.MainCache).Evictions) stat.Gauge(stats.GroupcacheHotBytesGauge).Update(cache.CacheStats(groupcache.HotCache).Bytes) stat.Gauge(stats.GroupcacheHotItemsGauge).Update(cache.CacheStats(groupcache.HotCache).Items) stat.Counter(stats.GroupcacheHotGetsCounter).Update(cache.CacheStats(groupcache.HotCache).Gets) stat.Counter(stats.GroupcacheHotHitsCounter).Update(cache.CacheStats(groupcache.HotCache).Hits) stat.Counter(stats.GroupcacheHotEvictionsCounter).Update(cache.CacheStats(groupcache.HotCache).Evictions) stat.Counter(stats.GroupcacheGetCounter).Update(cache.Stats.Gets.Get()) stat.Counter(stats.GroupcacheContainCounter).Update(cache.Stats.Contains.Get()) stat.Counter(stats.GroupcachePutCounter).Update(cache.Stats.Puts.Get()) stat.Counter(stats.GroupcacheHitCounter).Update(cache.Stats.CacheHits.Get()) stat.Counter(stats.GroupcacheLoadCounter).Update(cache.Stats.Loads.Get()) stat.Counter(stats.GroupcacheCheckCounter).Update(cache.Stats.Checks.Get()) stat.Counter(stats.GroupcacheStoreCounter).Update(cache.Stats.Stores.Get()) stat.Counter(stats.GroupcachePeerGetsCounter).Update(cache.Stats.PeerLoads.Get()) stat.Counter(stats.GroupcachePeerChecksCounter).Update(cache.Stats.PeerChecks.Get()) stat.Counter(stats.GroupcachePeerPutsCounter).Update(cache.Stats.PeerStores.Get()) stat.Counter(stats.GroupcachPeerErrCounter).Update(cache.Stats.PeerErrors.Get()) stat.Counter(stats.GroupcacheLocalLoadCounter).Update(cache.Stats.LocalLoads.Get()) stat.Counter(stats.GroupcacheLocalLoadErrCounter).Update(cache.Stats.LocalLoadErrs.Get()) stat.Counter(stats.GroupcacheLocalCheckCounter).Update(cache.Stats.LocalChecks.Get()) stat.Counter(stats.GroupcacheLocalCheckErrCounter).Update(cache.Stats.LocalCheckErrs.Get()) stat.Counter(stats.GroupcacheLocalStoreCounter).Update(cache.Stats.LocalStores.Get()) stat.Counter(stats.GroupcacheLocalStoreErrCounter).Update(cache.Stats.LocalStoreErrs.Get()) stat.Counter(stats.GroupcacheIncomingRequestsCounter).Update(cache.Stats.ServerRequests.Get()) }