snapshot/git/gitdb/stream.go (72 lines of code) (raw):

package gitdb import ( "errors" "fmt" "strings" "github.com/twitter/scoot/common/stats" snap "github.com/twitter/scoot/snapshot" "github.com/twitter/scoot/snapshot/git/repo" ) // A Stream is a sequence of GitCommitSnapshots that updates. // The backend is a git refspec that can be fetched from a Git remote. type StreamConfig struct { // Name (used in IDs (so it should be short) // e.g. sm for a Stream following Source (repo)'s Master (branch) // name may include a ref string that'll override the refspec configured for 'sm' // the full id might look like "stream-gc-sm:<branch>-<sha>" for a given branch and sha Name string // Remote to fetch from (e.g. https://github.com/twitter/scoot) Remote string // Name of ref to follow in data repo (e.g. refs/remotes/upstream/master) RefSpec string } const streamIDText = "stream" const streamIDFmt = "%s-%s-%s-%s" const streamNameShaSuffix = ":sha" type streamBackend struct { cfg *StreamConfig stat stats.StatsReceiver } func (b *streamBackend) parseID(id snap.ID, kind SnapshotKind, extraParts []string) (*streamSnapshot, error) { if b.cfg == nil { return nil, errors.New("Stream backend not initialized.") } if len(extraParts) < 2 { return nil, fmt.Errorf("cannot parse snapshot ID: expected 4 extraParts in stream id: %s", id) } // The last token is the sha, anything before that is a stream name that may include '-'. streamName := strings.Join(extraParts[0:len(extraParts)-1], "-") sha := extraParts[len(extraParts)-1] if err := validSha(sha); err != nil { return nil, err } return &streamSnapshot{streamName: streamName, kind: kind, sha: sha}, nil } // streamSnapshot represents a Snapshot that lives in a Stream type streamSnapshot struct { sha string kind SnapshotKind streamName string } func (s *streamSnapshot) ID() snap.ID { return snap.ID(fmt.Sprintf(streamIDFmt, streamIDText, s.kind, s.streamName, s.sha)) } func (s *streamSnapshot) Kind() SnapshotKind { return s.kind } func (s *streamSnapshot) SHA() string { return s.sha } func (s *streamSnapshot) Download(db *DB) error { if err := db.shaPresent(s.SHA()); err == nil { // Already present! return nil } // TODO(dbentley): what if we've already fetched recently? We should figure out some way to // prevent that if db.stream == nil { return fmt.Errorf("cannot download snapshot %s: no streams configured", s.ID()) } if err := db.stream.updateStream(s.streamName, db); err != nil { return err } return db.shaPresent(s.SHA()) } func (s *streamSnapshot) DownloadTempRepo(_ *DB) (*repo.Repository, error) { return nil, errors.New("DownloadTempRepo unimplemented in streamSnapshot") } // updateStream updates the named stream // the stream name is used to make sure we're operating on the right remote/refspec // the sha is optionally used to override refspec for remote with a specific sha request func (b *streamBackend) updateStream(name string, db *DB) error { b.stat.Counter(stats.GitStreamUpdateFetches).Inc(1) args := []string{"fetch", b.cfg.Remote} if !strings.HasPrefix(name, b.cfg.Name) { return fmt.Errorf("cannot update stream %s: does not match stream %s", name, db.stream.cfg.Name) } if strings.HasPrefix(name, b.cfg.Name+":") { // If the stream name includes a ref then fetch will override the default refspec args = append(args, strings.Replace(name, b.cfg.Name+":", "", 1)) } _, err := db.dataRepo.Run(args...) return err }