packages/sqrl-cli/src/cli/CliRun.ts (90 lines of code) (raw):
/**
* Copyright 2018 Twitter, Inc.
* Licensed under the Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0
*/
// tslint:disable:no-console
// tslint:disable:no-submodule-imports (@TODO)
import Semaphore from "sqrl/lib/jslib/Semaphore";
import * as split2 from "split2";
import { CliActionOutput } from "./CliOutput";
import { Context } from "sqrl/lib/api/ctx";
import { promiseFinally, SqrlObject } from "sqrl-common";
import { FeatureMap, Executable, Execution } from "sqrl";
import { CliManipulator } from "sqrl-cli-functions";
export class CliRun {
constructor(
private executable: Executable,
private output: CliActionOutput
) {}
triggerRecompile(compileCallback: () => Promise<Executable>): Promise<void> {
this.output.sourceRecompiling();
return compileCallback()
.then((rv) => {
this.executable = rv;
this.output.sourceUpdated();
})
.catch((err) => {
this.output.sourceRecompileError(err);
});
}
async action(trc: Context, inputs: FeatureMap, features: string[]) {
const manipulator = new CliManipulator();
const execution: Execution = await this.executable.execute(trc, {
featureTimeoutMs: 10000,
inputs,
manipulator,
});
const loggedFeatures: {
[featureName: string]: SqrlObject;
} = {};
await Promise.all(
features.map(async (featureName) => {
const value = await execution.fetchFeature(featureName);
loggedFeatures[featureName] = value;
})
);
await execution.fetchFeature("SqrlExecutionComplete");
await manipulator.mutate(trc);
this.output.action(manipulator, execution, loggedFeatures);
}
async stream(options: {
ctx: Context;
streamFeature: string;
concurrency: number;
inputs: FeatureMap;
features: string[];
}) {
const { concurrency } = options;
const stream: NodeJS.ReadStream = process.stdin.pipe(split2());
const busy = new Semaphore();
stream.on("data", (line) => {
// Convert this line to the given feature
const lineValues = Object.assign({}, options.inputs);
try {
lineValues[options.streamFeature] = JSON.parse(line.toString("utf-8"));
} catch (err) {
console.error(
"Error: Invalid JSON value: %s",
JSON.stringify(line) + err.toString()
);
return;
}
// @todo: Perhaps we want to run serially to ensure output is more easily digestable
promiseFinally(
busy.wrap(
this.action(options.ctx, lineValues, options.features).catch(
(err) => {
console.error("Error: " + err.toString());
}
)
),
() => {
stream.resume();
}
);
if (concurrency && busy.getCount() === concurrency) {
stream.pause();
}
});
// Wait for the stream to finish
await new Promise((resolve, reject) => {
stream.on("end", () => resolve());
stream.on("error", (err) => reject(err));
});
await busy.waitForZero();
}
close() {
this.output.close();
}
}