snapshot/git/gitdb/bundlestore.go (215 lines of code) (raw):
package gitdb
import (
e "errors"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"strings"
log "github.com/sirupsen/logrus"
"github.com/twitter/scoot/common/errors"
snap "github.com/twitter/scoot/snapshot"
"github.com/twitter/scoot/snapshot/git/repo"
"github.com/twitter/scoot/snapshot/store"
)
// BundlestoreConfig defines how to talk to Bundlestore
// AllowStreamUpdate controls whether bundlestore download failures can fall back on stream updates
type BundlestoreConfig struct {
Store store.Store
AllowStreamUpdate bool
}
type bundlestoreBackend struct {
cfg *BundlestoreConfig
}
const bundlestoreIDText = "bs"
// "bs-gc-<bundle>-<stream>-<sha>"
func (b *bundlestoreBackend) parseID(id snap.ID, kind SnapshotKind, extraParts []string) (snapshot, error) {
if b.cfg == nil {
return nil, e.New("Bundlestore backend not initialized.")
}
if len(extraParts) != 3 {
return nil, fmt.Errorf("cannot parse snapshot ID: expected 5 parts in bundlestore id: %s", id)
}
bundleKey, streamName, sha := extraParts[0], extraParts[1], extraParts[2]
if err := validSha(sha); err != nil {
return nil, err
}
return &bundlestoreSnapshot{kind: kind, sha: sha, bundleKey: bundleKey, streamName: streamName}, nil
}
func (b *bundlestoreBackend) upload(s snapshot, db *DB) (snapshot, error) {
// We only have to upload a localSnapshot
switch s := s.(type) {
case *tagsSnapshot:
return s, nil
case *streamSnapshot:
// TODO(dbentley): we should upload to bundlestore if this commit is so recent
// it might not be on every worker already.
return s, nil
case *bundlestoreSnapshot:
return s, nil
case *localSnapshot:
return b.uploadLocalSnapshot(s, db)
default:
return nil, fmt.Errorf("cannot upload %v: unknown type %T", s, s)
}
}
// git bundle create takes a rev list; it requires that it include a ref
// so we can't just do:
// git bundle create 545c88d71d40a49ebdfb1d268c724110793330d2..3060a3a519888957e13df75ffd09ea50f97dd03b
// instead, we have to write a temporary ref
// (nb: the subtracted revisions can be a commit, not a ref, so you can do:
// git bundle create 545c88d71d40a49ebdfb1d268c724110793330d2..master
// )
const bundlestoreTempRef = "reserved_scoot/bundlestore/__temp_for_writing"
func (b *bundlestoreBackend) uploadLocalSnapshot(s *localSnapshot, db *DB) (sn snapshot, err error) {
// the sha of the commit we're going to use as the ref
commitSha := s.sha
// the revList to create the bundle
// unless we can find a merge base, we'll just include the commit
revList := bundlestoreTempRef
// the name of the stream that this bundle requires
streamName := ""
switch s.kind {
case KindGitCommitSnapshot:
// For a git commit, we want a bundle that has just the diff compared to the stream
// so find the merge base with our stream
// The generated bundle will require either no prereqs or a commit that is in the stream
if db.stream.cfg != nil && db.stream.cfg.RefSpec != "" {
streamHead, err := db.dataRepo.RunSha("rev-parse", db.stream.cfg.RefSpec)
if err != nil {
return nil, err
}
mergeBase, err := db.dataRepo.RunSha("merge-base", streamHead, commitSha)
if mergeBase == commitSha {
// we were asked to ingest a sha that's on the stream,
// e.g., the user just created a branch and just wants to test
// the baseline, and hasn't modified anything yet.
// so we don't have to upload it, just return that snapshot
// if we don't do this, then our git bundle create will die
// because the bundle would be empty.
return &streamSnapshot{sha: commitSha, kind: KindGitCommitSnapshot, streamName: db.stream.cfg.Name}, nil
}
// if err != nil, it just means we don't have a merge-base
if err == nil {
revList = fmt.Sprintf("%s..%s", mergeBase, bundlestoreTempRef)
streamName = db.stream.cfg.Name
}
}
case KindFSSnapshot:
// For an FSSnapshot (which is stored as a git tree), create a git commit
// with no parent.
// (Eventually we could get smarter, e.g., if it's storing the output of running
// cmd foo, we could try to find another run of foo and use that as a parent
// to reduce the delta)
// The generated bundle will require no prereqs.
commitSha, err = db.dataRepo.RunSha("commit-tree", commitSha, "-m",
"commit to distribute GitDB FSSnapshot via bundlestore")
default:
return nil, fmt.Errorf("unknown Snapshot kind: %v", s.kind)
}
// update the ref
if _, err := db.dataRepo.Run("update-ref", bundlestoreTempRef, commitSha); err != nil {
return nil, err
}
d, err := ioutil.TempDir(db.tmp, "bundle-")
if err != nil {
return nil, err
}
bundleName := makeBundleName(s.sha)
// we can't use tmpDir.TempFile() because we need the file to not exist
bundleFilename := path.Join(d, bundleName)
// create the bundle
// -c core.packobjectedgesonlyshallow=0 is because our internal git
// has a bug that shows up as:
// $ git bundle create /tmp/mybundle.pack HEAD^..HEAD
// fatal: expected sha1, got garbage:
// ^a7c35c3e44ca591d7dd98860ce49601dbc20a22c
//
// error: pack-objects died
// so we pass it, but hope to remove it once the bug is fixed
if _, err := db.dataRepo.Run("-c", "core.packobjectedgesonlyshallow=0", "bundle", "create", bundleFilename, revList); err != nil {
return nil, err
}
_, err = b.uploadFile(bundleFilename, nil)
if err != nil {
return nil, err
}
// For now, our bundle key is always the sha of the object we are uploading.
// We might eventually want to upload multiple objects in one bundle. E.g.,
// for code review you might want to have both before and after snapshots. In that case,
// we could upload once and return two IDs that have the same bundleKey but different
// sha's.
return &bundlestoreSnapshot{sha: s.sha, kind: s.Kind(), bundleKey: s.sha, streamName: streamName}, nil
}
func CreateBundlestoreSnapshot(sha string, kind SnapshotKind, bundleKey, streamName string) snapshot {
return &bundlestoreSnapshot{sha: sha, kind: kind, bundleKey: bundleKey, streamName: streamName}
}
type bundlestoreSnapshot struct {
sha string
kind SnapshotKind
// the (shortened) key to use in our key value store
// we actually store it as a name with a bit more around it,
// e.g., bundleKey is the sha, and then the name we send on the wire
// (generated by makeBundleName) is bs-<sha>.bundle
bundleKey string
streamName string
}
func (s *bundlestoreSnapshot) ID() snap.ID {
return snap.ID(strings.Join([]string{bundlestoreIDText, string(s.kind), s.bundleKey, s.streamName, s.sha}, "-"))
}
func (s *bundlestoreSnapshot) Kind() SnapshotKind { return s.kind }
func (s *bundlestoreSnapshot) SHA() string { return s.sha }
// Update's db's repo with this bundlestoreSnapshot's SHA.
// The snapshot must be a git 'bundle' file.
// Do this by getting the git bundle file from an underlying Store and
// unbundling it into the dataRepo. In cases where db.bundles.cfg.AllowStreamUpdate
// is true, will attempt to update the db's stream if initial unbundle attempt fails.
// Returns nil if the SHA ended up in the repo, or an error.
func (s *bundlestoreSnapshot) Download(db *DB) error {
log.Infof("Downloading sha: %s", s.SHA())
if err := db.shaPresent(s.SHA()); err == nil {
log.Infof("We already have sha: %s, returning from Download()", s.SHA())
return nil
}
dlDir, filename, err := s.downloadBundle(db)
if dlDir != "" {
defer os.RemoveAll(dlDir)
}
if err != nil {
log.Info("Unable to download bundle: ", err)
return err
}
// unbundle optimistically
// this will succeed if we have all of the prerequisite objects
if _, err = db.dataRepo.Run("bundle", "unbundle", filename); err == nil {
log.Infof("Unbundling got the sha: %s, returning from Download()", s.SHA())
return db.shaPresent(s.sha)
}
// we couldn't unbundle
// see if it's because we're missing prereqs
lacksCommitsStr := "error: Repository lacks these prerequisite commits:"
exitError, ok := err.(*exec.ExitError)
if ok && exitError == nil || ok && !strings.Contains(string(exitError.Stderr), lacksCommitsStr) {
log.Info("Can't find sha: ", s.SHA(), " and prereqs aren't the problem, returning err: ", err.Error())
return err
} else if !ok {
log.Info("Error of unexpected type while unbundling, returning err:", err.Error())
return err
}
if db.bundles.cfg.AllowStreamUpdate {
// we are missing prereqs, so let's try updating the stream that's the basis of the bundle
// this likely happened because:
// we're in a worker that started at time T1, when master pointed at commit C1
// at time T2, a commit C2 was created in our stream
// at time T3, a user ingested a git commit C3 whose ancestor is C2
// GitDB in their scoot-snapshot-db picked a merge-base of C2, because T3-T2 was sufficiently
// large (say, a half hour) that it's reasonable to assume its easy to get.
// Now we've got the bundle for C3, which depends on C2, but we only have C1, so we have to
// update our stream.
if err := db.stream.updateStream(s.streamName, db); err != nil {
log.Infof("Couldn't download sha: %s, updateStream returned error: %s", s.SHA(), err.Error())
return err
}
if _, err := db.dataRepo.Run("bundle", "unbundle", filename); err != nil {
// if we still can't unbundle, then the bundle might be corrupt or the
// prereqs might not be in the stream, or maybe the git server is serving us
// stale data.
log.Infof("Couldn't download sha: %s, the final unbundling attempt returned error: %s", s.SHA(), err.Error())
return err
}
}
return db.shaPresent(s.sha)
}
// TODO separate the use cases that need git/repo/stream semantics from things that can be passed
// around as binary bundles, and simplify the use cases accordingly.
// Downloads the snapshot's SHA locally similar to Download, but into a
// temp repository located under tmp and not db's persistent dataRepo.
// Returns the temp repo and nil if the sha can be found in it, or an error.
func (s *bundlestoreSnapshot) DownloadTempRepo(db *DB) (*repo.Repository, error) {
log.Infof("Downloading sha: %s", s.SHA())
tmpRepoIniter := &TmpRepoIniter{tmp: db.tmp}
tmpRepo, err := tmpRepoIniter.Init()
if err != nil {
return nil, err
}
dlDir, filename, err := s.downloadBundle(db)
if dlDir != "" {
defer os.RemoveAll(dlDir)
}
if err != nil {
log.Info("Unable to download bundle: ", err)
return tmpRepo, err
}
if _, err = tmpRepo.Run("bundle", "unbundle", filename); err != nil {
return tmpRepo, err
}
log.Infof("Unbundling got the sha: %s, returning from DownloadTempRepo()", s.SHA())
return tmpRepo, tmpRepo.ShaPresent(s.sha)
}
// Fetch a bundle file via the underlying Store configured in the DB's bundlestore config
// Downloads into a temp dir, the path of which is included in the return (for cleanup purposes)
func (s *bundlestoreSnapshot) downloadBundle(db *DB) (string, string, error) {
d, err := ioutil.TempDir(db.tmp, "bundle-")
if err != nil {
return "", "", err
}
bundleName := makeBundleName(s.bundleKey)
bundleFilename := path.Join(d, bundleName)
f, err := os.Create(bundleFilename)
if err != nil {
return d, "", err
}
defer f.Close()
r, err := db.bundles.cfg.Store.OpenForRead(bundleName)
if err != nil {
return d, "", err
}
defer r.Close()
if _, err := io.Copy(f, r); err != nil {
return d, "", err
}
return d, f.Name(), nil
}
func makeBundleName(key string) string {
return fmt.Sprintf("bs-%s.bundle", key)
}
func (b *bundlestoreBackend) uploadFile(filePath string, ttl *store.TTLValue) (string, error) {
f, err := os.Open(filePath)
if err != nil {
return "", errors.NewError(err, errors.BundleUploadFailureExitCode)
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return "", err
}
name := path.Base(filePath)
if name == "." || name == "/" {
return "", errors.NewError(fmt.Errorf("Invalid path %v, base parsed to %v", filePath, name), errors.BundleUploadFailureExitCode)
}
resource := store.NewResource(f, fi.Size(), ttl)
if err := b.cfg.Store.Write(name, resource); err != nil {
return "", errors.NewError(err, errors.BundleUploadFailureExitCode)
}
return b.cfg.Store.Root() + name, nil
}