snapshot/bundlestore/http_server.go (130 lines of code) (raw):

package bundlestore import ( "fmt" "io" "net/http" "regexp" "strings" "time" log "github.com/sirupsen/logrus" "github.com/twitter/scoot/common/stats" "github.com/twitter/scoot/snapshot/store" ) type httpServer struct { storeConfig *store.StoreConfig } func MakeHTTPServer(cfg *store.StoreConfig) *httpServer { return &httpServer{cfg} } func (s *httpServer) HandleUpload(w http.ResponseWriter, req *http.Request) { log.Infof("Uploading %v, %v, %v (from %v)", req.Host, req.URL, req.Header, req.RemoteAddr) defer s.storeConfig.Stat.Latency(stats.BundlestoreUploadLatency_ms).Time().Stop() s.storeConfig.Stat.Counter(stats.BundlestoreUploadCounter).Inc(1) bundleName := strings.TrimPrefix(req.URL.Path, "/bundle/") if err := checkBundleName(bundleName); err != nil { log.Infof("Bundlename err: %v --> StatusBadRequest (from %v)", err, req.RemoteAddr) http.Error(w, err.Error(), http.StatusBadRequest) s.storeConfig.Stat.Counter(stats.BundlestoreUploadErrCounter).Inc(1) return } bundleData := req.Body ok, err := s.storeConfig.Store.Exists(bundleName) if err != nil { log.Infof("Exists err: %v --> StatusInternalServerError (from %v)", err, req.RemoteAddr) http.Error(w, fmt.Sprintf("Error checking if bundle exists: %s", err), http.StatusInternalServerError) s.storeConfig.Stat.Counter(stats.BundlestoreUploadErrCounter).Inc(1) return } if ok { s.storeConfig.Stat.Counter(stats.BundlestoreUploadExistingCounter).Inc(1) fmt.Fprintf(w, "Bundle %s already exists, no-op and return\n", bundleName) return } // Get ttl if defaults were provided during Server construction or if it comes in this request header. var ttl *store.TTLValue if s.storeConfig.TTLCfg != nil { ttl = &store.TTLValue{TTL: time.Now().Add(s.storeConfig.TTLCfg.TTL), TTLKey: s.storeConfig.TTLCfg.TTLKey} } for k := range req.Header { if !strings.EqualFold(k, store.DefaultTTLKey) { continue } if ttlTime, err := time.Parse(time.RFC1123, req.Header.Get(k)); err != nil { log.Infof("TTL err: %v --> StatusInternalServerError (from %v)", err, req.RemoteAddr) http.Error(w, fmt.Sprintf("Error parsing TTL: %s", err), http.StatusInternalServerError) s.storeConfig.Stat.Counter(stats.BundlestoreUploadErrCounter).Inc(1) return } else if ttl != nil { ttl.TTL = ttlTime } else { ttl = &store.TTLValue{TTL: ttlTime, TTLKey: store.DefaultTTLKey} } break } if err := s.storeConfig.Store.Write(bundleName, store.NewResource(bundleData, req.ContentLength, ttl)); err != nil { log.Infof("Write err: %v --> StatusInternalServerError (from %v)", err, req.RemoteAddr) http.Error(w, fmt.Sprintf("Error writing Bundle: %s", err), http.StatusInternalServerError) s.storeConfig.Stat.Counter(stats.BundlestoreUploadErrCounter).Inc(1) return } fmt.Fprintf(w, "Successfully wrote bundle %s\n", bundleName) s.storeConfig.Stat.Counter(stats.BundlestoreUploadOkCounter).Inc(1) } func (s *httpServer) CheckExistence(w http.ResponseWriter, req *http.Request) { log.Infof("Checking Existence %v %v (from %v)", req.Host, req.URL, req.RemoteAddr) defer s.storeConfig.Stat.Latency(stats.BundlestoreCheckLatency_ms).Time().Stop() s.storeConfig.Stat.Counter(stats.BundlestoreCheckCounter).Inc(1) bundleName := strings.TrimPrefix(req.URL.Path, "/bundle/") if err := checkBundleName(bundleName); err != nil { log.Infof("Bundlename err: %v --> StatusBadRequest (from %v)", err, req.RemoteAddr) http.Error(w, err.Error(), http.StatusBadRequest) s.storeConfig.Stat.Counter(stats.BundlestoreCheckErrCounter).Inc(1) return } ok, err := s.storeConfig.Store.Exists(bundleName) if err != nil || !ok { log.Infof("Check err: %v --> StatusNotFound (from %v)", err, req.RemoteAddr) http.NotFound(w, req) s.storeConfig.Stat.Counter(stats.BundlestoreCheckErrCounter).Inc(1) return } w.Header().Set("Content-Type", "text/plain; charset=utf-8") s.storeConfig.Stat.Counter(stats.BundlestoreCheckOkCounter).Inc(1) } func (s *httpServer) HandleDownload(w http.ResponseWriter, req *http.Request) { log.Infof("Downloading %v %v (from %v)", req.Host, req.URL, req.RemoteAddr) defer s.storeConfig.Stat.Latency(stats.BundlestoreDownloadLatency_ms).Time().Stop() s.storeConfig.Stat.Counter(stats.BundlestoreDownloadCounter).Inc(1) bundleName := strings.TrimPrefix(req.URL.Path, "/bundle/") if err := checkBundleName(bundleName); err != nil { log.Infof("Bundlename err: %v --> StatusBadRequest (from %v)", err, req.RemoteAddr) http.Error(w, err.Error(), http.StatusBadRequest) //s.stat.Counter(stats.BundlestoreDownloadErrCounter).Inc(1) s.storeConfig.Stat.Counter(stats.BundlestoreDownloadErrCounter).Inc(1) return } r, err := s.storeConfig.Store.OpenForRead(bundleName) if err != nil { log.Infof("Read err: %v --> StatusNotFound (from %v)", err, req.RemoteAddr) http.NotFound(w, req) s.storeConfig.Stat.Counter(stats.BundlestoreDownloadErrCounter).Inc(1) return } if _, err := io.Copy(w, r); err != nil { log.Infof("Copy err: %v --> StatusInternalServerError (from %v)", err, req.RemoteAddr) s.storeConfig.Stat.Counter(stats.BundlestoreDownloadErrCounter).Inc(1) r.Close() return } if err := r.Close(); err != nil { log.Infof("Close err: %v --> StatusInternalServerError (from %v)", err, req.RemoteAddr) s.storeConfig.Stat.Counter(stats.BundlestoreDownloadErrCounter).Inc(1) return } s.storeConfig.Stat.Counter(stats.BundlestoreDownloadOkCounter).Inc(1) } var bundleRE *regexp.Regexp = regexp.MustCompile("^bs-[a-z0-9]{40}.bundle") // Check for name enforcement for HTTP API func checkBundleName(name string) error { if ok := bundleRE.MatchString(name); ok { return nil } return fmt.Errorf("Error with bundleName, expected %q, got: %s", bundleRE, name) }