async stream()

in packages/sqrl-cli/src/cli/CliRun.ts [59:109]


  async stream(options: {
    ctx: Context;
    streamFeature: string;
    concurrency: number;
    inputs: FeatureMap;
    features: string[];
  }) {
    const { concurrency } = options;

    const stream: NodeJS.ReadStream = process.stdin.pipe(split2());
    const busy = new Semaphore();

    stream.on("data", (line) => {
      // Convert this line to the given feature
      const lineValues = Object.assign({}, options.inputs);
      try {
        lineValues[options.streamFeature] = JSON.parse(line.toString("utf-8"));
      } catch (err) {
        console.error(
          "Error: Invalid JSON value: %s",
          JSON.stringify(line) + err.toString()
        );
        return;
      }

      // @todo: Perhaps we want to run serially to ensure output is more easily digestable
      promiseFinally(
        busy.wrap(
          this.action(options.ctx, lineValues, options.features).catch(
            (err) => {
              console.error("Error: " + err.toString());
            }
          )
        ),
        () => {
          stream.resume();
        }
      );

      if (concurrency && busy.getCount() === concurrency) {
        stream.pause();
      }
    });

    // Wait for the stream to finish
    await new Promise((resolve, reject) => {
      stream.on("end", () => resolve());
      stream.on("error", (err) => reject(err));
    });
    await busy.waitForZero();
  }