in packages/sqrl-cli/src/cli/CliRun.ts [59:109]
async stream(options: {
ctx: Context;
streamFeature: string;
concurrency: number;
inputs: FeatureMap;
features: string[];
}) {
const { concurrency } = options;
const stream: NodeJS.ReadStream = process.stdin.pipe(split2());
const busy = new Semaphore();
stream.on("data", (line) => {
// Convert this line to the given feature
const lineValues = Object.assign({}, options.inputs);
try {
lineValues[options.streamFeature] = JSON.parse(line.toString("utf-8"));
} catch (err) {
console.error(
"Error: Invalid JSON value: %s",
JSON.stringify(line) + err.toString()
);
return;
}
// @todo: Perhaps we want to run serially to ensure output is more easily digestable
promiseFinally(
busy.wrap(
this.action(options.ctx, lineValues, options.features).catch(
(err) => {
console.error("Error: " + err.toString());
}
)
),
() => {
stream.resume();
}
);
if (concurrency && busy.getCount() === concurrency) {
stream.pause();
}
});
// Wait for the stream to finish
await new Promise((resolve, reject) => {
stream.on("end", () => resolve());
stream.on("error", (err) => reject(err));
});
await busy.waitForZero();
}