runner/runners/local_output.go (258 lines of code) (raw):
package runners
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/twitter/scoot/runner"
osexecer "github.com/twitter/scoot/runner/execer/os"
)
// local_output.go: output that's stored locally
type HttpOutputCreator interface {
http.Handler
runner.OutputCreator
HttpPath() string
}
type localOutputCreator struct {
tmp string
hostname string
httpUri string
httpPath string
pathMap map[string]string
}
// Takes a tempdir to place new files and optionally an httpUri, ex: 'http://HOST:PORT/ENDPOINT/', to use instead of 'file://HOST/PATH'
func NewHttpOutputCreator(httpUri string) (HttpOutputCreator, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
httpPath := ""
if httpUri != "" {
u, err := url.Parse(httpUri)
if err != nil {
return nil, err
}
httpPath = strings.TrimSuffix(u.Path, "/") + "/"
}
tmp, err := ioutil.TempDir("", "output")
if err != nil {
return nil, err
}
return &localOutputCreator{
tmp: tmp, hostname: hostname,
httpUri: httpUri, httpPath: httpPath,
pathMap: make(map[string]string),
}, nil
}
// Create a new Output that writes to local fs.
// Note: id should not have leading or trailing slashes.
func (s *localOutputCreator) Create(id string) (runner.Output, error) {
if _, err := os.Stat(s.tmp); os.IsNotExist(err) {
err = os.MkdirAll(s.tmp, os.ModePerm)
if err != nil {
return nil, err
}
}
f, err := ioutil.TempFile(s.tmp, id)
if err != nil {
return nil, err
}
absPath, err := filepath.Abs(f.Name())
if err != nil {
return nil, err
}
// We don't need a / between hostname and path because absolute paths start with /
uri := fmt.Sprintf("file://%s%s", s.hostname, absPath)
if s.httpUri != "" {
uri = fmt.Sprintf("%s/%s?file=%s", s.httpUri, id, uri)
s.pathMap[strings.Trim(id, "/")] = absPath
s.pathMap[filepath.Base(absPath)] = absPath
}
return &localOutput{f: f, absPath: absPath, uri: uri}, nil
}
// Serves a minimal page that does ajax log tailing of the specified path
// When '?content=true' is specified, this serves the content directly without ajax.
// Does not check the request path, either it finds the local file or 404s.
func (s *localOutputCreator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientHtml :=
`<html>
<script type="text/javascript">
// This code makes use of ES6+ constructs, such as
// arrow functions: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/Arrow_functions
// async await: https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Async_await
// promise: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
// typed arrays: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Typed_arrays
let resourceId = "";
const oneHour = 60 * 60 * 1000;
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
const checkAtBottom = () => {
//scrolling: http://stackoverflow.com/a/22394544
let scrollTop =
(document.documentElement && document.documentElement.scrollTop) ||
document.body.scrollTop;
let scrollHeight =
(document.documentElement && document.documentElement.scrollHeight) ||
document.body.scrollHeight;
return scrollTop + window.innerHeight >= scrollHeight;
};
const gotoBottom = () => {
let scrollHeight =
(document.documentElement && document.documentElement.scrollHeight) ||
document.body.scrollHeight;
let scrollLeft =
(document.documentElement && document.documentElement.scrollLeft) ||
document.body.scrollLeft;
window.scrollTo(scrollLeft, scrollHeight);
};
// parseLength parses the Content-Range http header for the size of the resource
const parseLength = resp => {
let contentRange = resp.headers.get("Content-Range");
let idx = contentRange.lastIndexOf("/") + 1;
return contentRange.slice(idx);
};
const copyBuffer = (oldBuffer, length) => {
buffer = new Uint8Array(length);
buffer.set(oldBuffer);
return {
buffer,
length
};
};
const writeBuffer = (mainBuffer, arrayBuffer, offset) => {
let bytes = new Uint8Array(arrayBuffer);
for (let i = 0; i < bytes.byteLength; i++) {
mainBuffer[i + offset] = bytes[i];
}
};
const updateText = buffer => {
let wasAtBottom = checkAtBottom();
let div = document.getElementById("output");
div.innerText = new TextDecoder("utf-8").decode(buffer);
if (wasAtBottom) {
gotoBottom();
}
};
const increaseTimeout = currTimeout => Math.trunc((currTimeout * 3) / 2);
sendRequest = async () => {
let url =
location.href + (location.search == "" ? "?" : "&") + "content=true";
let resp = await fetch(url, {
method: "HEAD"
});
let length = Number.parseInt(resp.headers.get("Content-Length"));
// buffer is an Uint8Array to preserve utf-8 encoding
let buffer = new Uint8Array(length);
let curr = 0;
// 1280KB was chosen as it would be larger than most small files, and
// for larger files, say around 20MB, would be able to be retrieved in
// approximately 20 calls.
// 1280KB == 1310720 bytes
let offset = 1310720;
let minTimeout = 50;
let currTimeout = minTimeout;
// 15 minutes
let maxTimeout = 15 * 60 * 1000;
// Read 1280KB chunks until the entire resource is consumed, and wait for updates
// up to one hour after last modified date
while (true) {
let next = Math.min(curr + offset, length);
// if curr == next then we have reached the end of our file
// and need to wait for updates to be written
if (curr == next) {
let resp = await fetch(url, {
method: "HEAD"
});
length = Number.parseInt(resp.headers.get("Content-Length"));
// Date.parse and Date.now returns epoch time
lastModified = Date.parse(resp.headers.get("Last-Modified"));
if (Date.now() - lastModified > oneHour) {
// log hasn't been update in over 1 hour
// so stop fetching
break;
}
currTimeout = increaseTimeout(currTimeout);
} else {
// Make an HTTP Range Request
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
resp = await fetch(url, {
headers: new Headers({
Range: "bytes=" + curr + "-" + next
})
});
if (200 <= resp.status && resp.status < 300) {
let id = resp.headers.get("X-Resource-Id");
let newLength = parseLength(resp);
// if newLength != length that means the file is still being written to
// so increase the time between retries
if (newLength != length) {
minTimeout = 5000;
({ buffer, length } = copyBuffer(buffer, newLength));
}
writeBuffer(buffer, await resp.arrayBuffer(), curr);
updateText(buffer);
if (resourceId == "") {
resourceId = id;
}
if (id != resourceId) {
alert("Underlying resource changed! Quitting");
break;
}
currTimeout = minTimeout;
curr = next;
} else {
currTimeout = increaseTimeout(currTimeout);
}
}
await sleep(Math.min(currTimeout, maxTimeout));
}
};
sendRequest();
</script>
<body>
<div id="output" style="white-space: pre-wrap"></div>
</body>
</html>
`
if strings.TrimSuffix(r.URL.Path, "/")+"/" == s.HttpPath() {
http.StripPrefix(s.HttpPath(), http.FileServer(http.Dir(s.tmp))).ServeHTTP(w, r)
return
}
path := strings.TrimPrefix(r.URL.Path, s.HttpPath())
filepath, ok := s.pathMap[path]
if !ok {
http.Error(w, "Unrecognized path", http.StatusNotFound)
} else if resource, err := os.Open(filepath); err != nil {
http.Error(w, "", http.StatusGone)
} else if info, err := resource.Stat(); err != nil {
http.Error(w, "", http.StatusInternalServerError)
} else {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Resource-Id", filepath)
if r.URL.Query().Get("content") == "true" {
w.Header().Set("Cache-Control", "no-store")
http.ServeContent(w, r, "", info.ModTime(), resource)
} else {
fmt.Fprintf(w, clientHtml)
}
}
}
func (s *localOutputCreator) HttpPath() string {
return s.httpPath
}
type localOutput struct {
f *os.File
absPath string
uri string
}
// URI returns a URI to this Output
func (o *localOutput) URI() string {
return o.uri
}
// AsFile returns an absolute path to a file with this content
func (o *localOutput) AsFile() string {
return o.absPath
}
// Write implements io.Writer
func (o *localOutput) Write(p []byte) (n int, err error) {
return o.f.Write(p)
}
// Close implements io.Closer
func (o *localOutput) Close() error {
return o.f.Close()
}
// Return an underlying Writer. Why? Because some methods type assert to
// a more specific type and are more clever (e.g., if it's an *os.File, hook it up
// directly to a new process's stdout/stderr.)
// We care about this cleverness, so Output both is-a and has-a Writer
func (o *localOutput) WriterDelegate() io.Writer {
return o.f
}
var _ osexecer.WriterDelegater = (*localOutput)(nil)