snapshot/git/gitdb/db.go (371 lines of code) (raw):

package gitdb import ( "fmt" "sync" "time" "github.com/twitter/scoot/common/errors" "github.com/twitter/scoot/common/stats" snap "github.com/twitter/scoot/snapshot" "github.com/twitter/scoot/snapshot/git/repo" "github.com/twitter/scoot/snapshot/store" log "github.com/sirupsen/logrus" ) // TODO The interfaces and functionality here should be refactored to only use git when necessary. // Many operations relying on git are more inefficient than they need to be: // we could be using memory buffers/streams instead of using git cmds on disk to transform data. // MakeDBFromRepo makes a gitdb.DB that uses dataRepo for data and tmp for temporary directories func MakeDBFromRepo( dataRepo *repo.Repository, updater RepoUpdater, tmp string, stream *StreamConfig, tags *TagsConfig, bundles *BundlestoreConfig, autoUploadDest AutoUploadDest, stat stats.StatsReceiver) *DB { return makeDB(dataRepo, nil, updater, tmp, stream, tags, bundles, autoUploadDest, stat) } // MakeDBNewRepo makes a gitDB that uses a new DB, populated by initer func MakeDBNewRepo( initer RepoIniter, updater RepoUpdater, tmp string, stream *StreamConfig, tags *TagsConfig, bundles *BundlestoreConfig, autoUploadDest AutoUploadDest, stat stats.StatsReceiver) *DB { return makeDB(nil, initer, updater, tmp, stream, tags, bundles, autoUploadDest, stat) } func makeDB( dataRepo *repo.Repository, initer RepoIniter, updater RepoUpdater, tmp string, stream *StreamConfig, tags *TagsConfig, bundles *BundlestoreConfig, autoUploadDest AutoUploadDest, stat stats.StatsReceiver) *DB { if (dataRepo == nil) == (initer == nil) { panic(fmt.Errorf("exactly one of dataRepo and initer must be non-nil in call to makeDB: %v %v", dataRepo, initer)) } result := &DB{ initDoneCh: make(chan error), InitDoneCh: make(chan error, 1), reqCh: make(chan req), dataRepo: dataRepo, updater: updater, tmp: tmp, checkouts: make(map[string]bool), local: &localBackend{}, stream: &streamBackend{cfg: stream, stat: stat}, tags: &tagsBackend{cfg: tags}, bundles: &bundlestoreBackend{cfg: bundles}, stat: stat, } switch autoUploadDest { case AutoUploadNone: case AutoUploadTags: result.autoUpload = result.tags case AutoUploadBundlestore: result.autoUpload = result.bundles default: panic(fmt.Errorf("unknown GitDB AutoUpload destination: %v", autoUploadDest)) } go result.loop(initer) return result } // Interface for defining initialization that results in a valid repo.Repository. type RepoIniter interface { Init() (*repo.Repository, error) } // Interface for defining update behavior for an underlying repo.Repository. // Provides an update mechanism and an interval definition type RepoUpdater interface { Update(r *repo.Repository) error UpdateInterval() time.Duration } // DB stores its data in a Git Repo type DB struct { // DB uses a goroutine to serve requests, with requests of type req reqCh chan req // Our init can fail, and if it did, err will be non-nil, so before sending // to reqCh, read from initDoneCh (which will be closed after initialization is done) // and test if err is non-nil. // Using both an internal initDoneCh and a public InitDoneCh for external consumption. initDoneCh chan error InitDoneCh chan error err error // must hold workTreeLock before sending a checkoutReq to reqCh workTreeLock sync.Mutex // All data below here should be accessed only by the loop() goroutine dataRepo *repo.Repository updater RepoUpdater tmp string // path of a preexisting persistent directory used for temporary data checkouts map[string]bool // checkouts stores bare checkouts, but not the git worktree local *localBackend stream *streamBackend tags *tagsBackend bundles *bundlestoreBackend autoUpload uploader // This is one of our backends that we use to upload automatically stat stats.StatsReceiver } // req is a request interface type req interface { req() } // stringAndError contains a string field, where information such as a snapshot ID, path, or file contents can be stored, // in addition to a Error field which contains both a standard error and an exit code type stringAndError struct { str string err error } // Close stops the DB func (db *DB) Close() { close(db.reqCh) } // initialize our repo (if necessary) func (db *DB) init(initer RepoIniter) { defer close(db.initDoneCh) defer close(db.InitDoneCh) if initer != nil { db.dataRepo, db.err = initer.Init() db.InitDoneCh <- db.err } } // Update our repo with underlying RepoUpdater if provided func (db *DB) updateRepo() error { if db.updater == nil { return nil } return db.updater.Update(db.dataRepo) } // Returns RepoUpdater's interval if an updater exists, or a zero duration func (db *DB) UpdateInterval() time.Duration { if db.updater == nil { return snap.NoDuration } return db.updater.UpdateInterval() } // loop loops serving requests serially, blocking only if there are multiple checkout requests. // only one checkout is allowed at a time in the underlying repo, everything else should be ok if concurrent func (db *DB) loop(initer RepoIniter) { if db.init(initer); db.err != nil { // we couldn't create our repo, so all operations will fail before // sending to reqCh, so we can stop serving return } // Handle checkout logic separately and block until each request completes checkoutCh := make(chan interface{}) go func() { for req := range checkoutCh { switch req := req.(type) { case checkoutReq: path, err := db.checkout(req.id) req.resultCh <- stringAndError{str: path, err: err} case releaseCheckoutReq: req.resultCh <- db.releaseCheckout(req.path) } } }() // Handle all request types for db.reqCh != nil { req, ok := <-db.reqCh if !ok { db.reqCh = nil continue } switch req := req.(type) { case ingestReq: log.Debugf("processing ingestReq") go func() { s, err := db.ingestDir(req.dir) if err == nil && db.autoUpload != nil { s, err = db.autoUpload.upload(s, db) } if err != nil { req.resultCh <- idAndError{err: err} } else { req.resultCh <- idAndError{id: s.ID()} } }() case ingestGitCommitReq: log.Debugf("processing ingestGitCommitReq") go func() { s, err := db.ingestGitCommit(req.ingestRepo, req.commitish) if err == nil && db.autoUpload != nil { s, err = db.autoUpload.upload(s, db) } if err != nil { req.resultCh <- idAndError{err: err} } else { req.resultCh <- idAndError{id: s.ID()} } }() case ingestGitWorkingDirReq: log.Debugf("processing ingestGitWorkingDirReq") go func() { s, err := db.ingestGitWorkingDir(req.ingestRepo) if err == nil && db.autoUpload != nil { s, err = db.autoUpload.upload(s, db) } if err != nil { req.resultCh <- idAndError{err: err} } else { req.resultCh <- idAndError{id: s.ID()} } }() case uploadFileReq: log.Debugf("processing uploadFileReq") go func() { s, err := db.bundles.uploadFile(req.filePath, req.ttl) req.resultCh <- stringAndError{str: s, err: err} }() case readFileAllReq: log.Debugf("processing readFileAllReq") go func() { data, err := db.readFileAll(req.id, req.path) req.resultCh <- stringAndError{str: data, err: err} }() case checkoutReq, releaseCheckoutReq: log.Debugf("processing checkoutReq, releaseCheckoutReq") go func() { checkoutCh <- req }() case exportGitCommitReq: log.Debugf("processing exportGitCommitReq") go func() { sha, err := db.exportGitCommit(req.id, req.exportRepo) req.resultCh <- stringAndError{str: sha, err: err} }() case updateRepoReq: log.Debugf("processing updateRepoReq") go func() { req.resultCh <- db.updateRepo() }() default: panic(fmt.Errorf("unknown reqtype: %T %v", req, req)) } } close(checkoutCh) } // Request entry points and request/result type defs type ingestReq struct { dir string resultCh chan idAndError } func (r ingestReq) req() {} type idAndError struct { id snap.ID err error } // IngestDir ingests a directory directly. func (db *DB) IngestDir(dir string) (snap.ID, error) { if <-db.initDoneCh; db.err != nil { return "", db.err } resultCh := make(chan idAndError) db.reqCh <- ingestReq{dir: dir, resultCh: resultCh} result := <-resultCh return result.id, result.err } type ingestGitCommitReq struct { ingestRepo *repo.Repository commitish string resultCh chan idAndError } func (r ingestGitCommitReq) req() {} // IngestGitCommit ingests the commit identified by commitish from ingestRepo func (db *DB) IngestGitCommit(ingestRepo *repo.Repository, commitish string) (snap.ID, error) { if <-db.initDoneCh; db.err != nil { return "", db.err } resultCh := make(chan idAndError) db.reqCh <- ingestGitCommitReq{ingestRepo: ingestRepo, commitish: commitish, resultCh: resultCh} result := <-resultCh return result.id, result.err } type ingestGitWorkingDirReq struct { ingestRepo *repo.Repository resultCh chan idAndError } func (r ingestGitWorkingDirReq) req() {} // IngestGitWorkingDir ingests HEAD + working dir modifications from the ingestRepo. func (db *DB) IngestGitWorkingDir(ingestRepo *repo.Repository) (snap.ID, error) { if <-db.initDoneCh; db.err != nil { return "", db.err } resultCh := make(chan idAndError) db.reqCh <- ingestGitWorkingDirReq{ingestRepo: ingestRepo, resultCh: resultCh} result := <-resultCh return result.id, result.err } type readFileAllReq struct { id snap.ID path string resultCh chan stringAndError } func (r readFileAllReq) req() {} // ReadFileAll reads the contents of the file path in FSSnapshot ID, or errors func (db *DB) ReadFileAll(id snap.ID, path string) ([]byte, error) { if <-db.initDoneCh; db.err != nil { return nil, errors.NewError(db.err, errors.DBInitFailureExitCode) } resultCh := make(chan stringAndError) db.reqCh <- readFileAllReq{id: id, path: path, resultCh: resultCh} result := <-resultCh return []byte(result.str), result.err } type checkoutReq struct { id snap.ID resultCh chan stringAndError } func (r checkoutReq) req() {} // Checkout puts the snapshot identified by id in the local filesystem, returning // the path where it lives or an error. func (db *DB) Checkout(id snap.ID) (path string, err error) { log.Debugf("Checking out %s", id) if <-db.initDoneCh; db.err != nil { log.Error("Unable to init db") return "", errors.NewError(db.err, errors.DBInitFailureExitCode) } db.workTreeLock.Lock() resultCh := make(chan stringAndError) db.reqCh <- checkoutReq{id: id, resultCh: resultCh} result := <-resultCh return result.str, result.err } type releaseCheckoutReq struct { path string resultCh chan error } func (r releaseCheckoutReq) req() {} // ReleaseCheckout releases a path from a previous Checkout. This allows Scoot to reuse // the path. Scoot will not touch path after Checkout until ReleaseCheckout. func (db *DB) ReleaseCheckout(path string) error { if <-db.initDoneCh; db.err != nil { return db.err } resultCh := make(chan error) db.reqCh <- releaseCheckoutReq{path: path, resultCh: resultCh} return <-resultCh } type exportGitCommitReq struct { id snap.ID exportRepo *repo.Repository resultCh chan stringAndError } func (r exportGitCommitReq) req() {} // ExportGitCommit applies a snapshot to a repository, returning the sha of // the exported commit or an error. func (db *DB) ExportGitCommit(id snap.ID, exportRepo *repo.Repository) (string, error) { if <-db.initDoneCh; db.err != nil { return "", errors.NewError(db.err, errors.DBInitFailureExitCode) } resultCh := make(chan stringAndError) db.reqCh <- exportGitCommitReq{id: id, exportRepo: exportRepo, resultCh: resultCh} result := <-resultCh return result.str, result.err } type updateRepoReq struct { resultCh chan error } func (r updateRepoReq) req() {} // Update is used to trigger an underlying RepoUpdater to Update() the Repository. // If db.updater is nil, has no effect. func (db *DB) Update() error { if <-db.initDoneCh; db.err != nil { return db.err } resultCh := make(chan error) db.reqCh <- updateRepoReq{resultCh: resultCh} result := <-resultCh return result } // Below functions are utils not part of DB interface // IDForStreamCommitSHA gets a SnapshotID from a string name and commit sha func (db *DB) IDForStreamCommitSHA(streamName string, sha string) snap.ID { s := &streamSnapshot{sha: sha, kind: KindGitCommitSnapshot, streamName: streamName} return s.ID() } type uploadFileReq struct { filePath string ttl *store.TTLValue resultCh chan stringAndError } func (r uploadFileReq) req() {} // Manual write of a file (like an existing bundle) to the underlying store // Intended for HTTP-backed stores that implement bundlestore's TTL fields // Base of the filePath will be used as bundle name // Returns location of stored file func (db *DB) UploadFile(filePath string, ttl *store.TTLValue) (string, error) { if <-db.initDoneCh; db.err != nil { return "", errors.NewError(db.err, errors.DBInitFailureExitCode) } resultCh := make(chan stringAndError) db.reqCh <- uploadFileReq{filePath: filePath, ttl: ttl, resultCh: resultCh} result := <-resultCh return result.str, result.err } func (db *DB) StreamName() string { if db.stream != nil && db.stream.cfg != nil { return db.stream.cfg.Name } return "" } // Unimplemented func (db *DB) Cancel() error { return nil }