export function registerCountUniqueFunctions()

in packages/sqrl-redis-functions/src/CountUniqueFunctions.ts [53:375]


export function registerCountUniqueFunctions(
  instance: Instance,
  service: CountUniqueService
) {
  instance.registerStatement(
    "SqrlCountUniqueStatements",
    async function _bumpCountUnique(state: Execution, keys, uniques, windowMs) {
      uniques = SqrlObject.ensureBasic(uniques);
      if (!keys.length || !isCountable(uniques)) {
        return;
      }

      // @TODO Figure out a better way to batch up these adds
      for (const features of sqrlCartesianProduct(uniques)) {
        const isTuple = features.length > 1;
        const element = isTuple ? tupleToString(features) : features[0];
        const hashes = [slidingdHashHex(element)];

        state.manipulator.addCallback(async (ctx) => {
          await Promise.all(
            keys.map((key) => {
              return service.bump(ctx, {
                at: state.getClockMs(),
                key,
                sortedHashes: hashes,
                windowMs,
              });
            })
          );
        });
      }
    },
    {
      allowNull: true,
      allowSqrlObjects: true,
      args: [AT.state, AT.any.array, AT.any.array, AT.any],
    }
  );

  instance.registerSync(
    function _unionCountUnique(left, right) {
      invariant(
        left instanceof Set && right instanceof Set,
        "expected left and right to be Sets"
      );
      let count = left.size;
      right.forEach((element) => {
        if (!left.has(element)) {
          count++;
        }
      });
      return count;
    },
    {
      allowSqrlObjects: true,
      args: [AT.any, AT.any],
    }
  );

  instance.registerSync(
    function _intersectCountUnique(left, right) {
      invariant(
        left instanceof Set && right instanceof Set,
        "expected left and right to be Sets"
      );
      let count = 0;
      left.forEach((element) => {
        if (right.has(element)) {
          count++;
        }
      });
      return count;
    },
    {
      allowSqrlObjects: true,
      args: [AT.any, AT.any],
    }
  );

  instance.register(
    function _fetchCountUnique(state, keys, windowMs, uniques) {
      uniques = SqrlObject.ensureBasic(uniques).map((value) => {
        if (typeof value === "number") {
          return "" + value;
        } else {
          return value;
        }
      });
      return fetchCount(
        state.ctx,
        service,
        keys,
        state.getClockMs(),
        windowMs,
        uniques
      );
    },
    {
      allowNull: true,
      allowSqrlObjects: true,
      args: [AT.state, AT.any.array, AT.any, AT.any.array],
    }
  );

  instance.register(
    async function _fetchCountUniqueElements(
      state,
      keys,
      windowStartMs,
      uniques
    ) {
      const hexElements = new Set();

      if (keys.length === 0) {
        return hexElements;
      }

      // If uniques are empty or we are not bumping do not fetch count with current values.
      let elements = [];
      uniques = SqrlObject.ensureBasic(uniques);
      if (isCountable(uniques)) {
        const products = sqrlCartesianProduct(uniques);
        elements = products.map((features) => {
          const isTuple = features.length > 1;
          return isTuple ? tupleToString(features) : features[0];
        });
      }

      elements.forEach((element) => {
        hexElements.add(slidingdHashHex(element));
      });

      const hashes = await service.fetchHashes(state.ctx, {
        keys,
        windowStartMs,
      });
      hashes.forEach((hash) => {
        hexElements.add(hash);
      });
      return hexElements;
    },
    {
      allowSqrlObjects: true,
      args: [AT.state, AT.any, AT.any, AT.any],
    }
  );

  instance.registerCustom(
    function countUnique(state: CompileState, ast: CustomCallAst): Ast {
      const args: CountUniqueArguments = parse(ast.source, {
        startRule: "CountUniqueArguments",
      });
      const { whereAst, whereFeatures, whereTruth } = state.combineGlobalWhere(
        args.where
      );

      const sortedUniques: AliasedFeature[] = sortByAlias(args.uniques);
      const sortedGroup: AliasedFeature[] = sortByAlias(args.groups);

      const uniquesAst = AstBuilder.list(sortedUniques.map((f) => f.feature));
      const windowMsAst = AstBuilder.constant(args.windowMs);

      const groupAliases = args.groups.map((feature) => feature.alias);
      const groupFeatures = args.groups.map((feature) => feature.feature.value);
      const groupHasAliases = args.groups.some(
        (f) => f.feature.value !== f.alias
      );
      const sortedGroupAliases = sortedGroup.map((feature) => feature.alias);

      const { entityId, entityAst } = state.addHashedEntity(
        ast,
        "UniqueCounter",
        {
          groups: sortedGroupAliases,
          uniques: sortedUniques.map((feature) => feature.alias),

          // Only include the where clauses if they're non-empty
          ...(whereTruth ? { whereFeatures, whereTruth } : {}),
        }
      );

      const originalKeysAst = state.setGlobal(
        ast,
        AstBuilder.call("_getKeyList", [
          entityAst,
          ...groupAliases.map((alias) => AstBuilder.feature(alias)),
        ]),
        `key(${entityId.getIdString()})`
      );

      // Always bump the counter according to the original keys (aliases)
      const slotAst = state.setGlobal(
        ast,
        AstBuilder.call("_bumpCountUnique", [
          AstBuilder.branch(
            whereAst,
            originalKeysAst,
            AstBuilder.constant(null)
          ),
          uniquesAst,
          windowMsAst,
        ])
      );
      state.addStatement("SqrlCountUniqueStatements", slotAst);

      let keysAst = originalKeysAst;
      let countExtraUniques: Ast = AstBuilder.branch(
        AstBuilder.and([whereAst, AstBuilder.feature("SqrlIsClassify")]),
        uniquesAst,
        AstBuilder.constant([])
      );

      if (groupHasAliases) {
        keysAst = state.setGlobal(
          ast,
          AstBuilder.call("_getKeyList", [
            entityAst,
            ...groupFeatures.map((feature) => AstBuilder.feature(feature)),
          ]),
          `key(${entityId.getIdString()}:${groupFeatures.join(",")})`
        );

        // If we're using aliases we only count the uniques in this request if
        // they exactly match the aliases that we used
        const aliasesEqualAst = AstBuilder.call("_cmpE", [
          AstBuilder.list(
            groupAliases.map((alias) => AstBuilder.feature(alias))
          ),
          AstBuilder.list(
            groupFeatures.map((feature) => AstBuilder.feature(feature))
          ),
        ]);
        countExtraUniques = AstBuilder.branch(
          aliasesEqualAst,
          countExtraUniques,
          AstBuilder.constant([])
        );
      }

      if (args.beforeAction) {
        countExtraUniques = AstBuilder.constant([]);
      }

      const originalCall = AstBuilder.call("_fetchCountUnique", [
        keysAst,
        windowMsAst,
        countExtraUniques,
      ]);

      if (args.setOperation) {
        throw new Error("@todo setOperation transform");
        /*
      const { operation, features } = args.setOperation;
      const rightCountArgs = Object.assign({}, args, {
        groups: features,
        setOperation: null
      });

      const rightCall = state._wrapped.transform(
        Object.assign({}, ast, {
          args: [rightCountArgs, ...ast.args.slice(1)]
        })
      );

      let setFunction;
      if (operation === "intersect") {
        setFunction = "_intersectCountUnique";
      } else if (operation === "union") {
        setFunction = "_unionCountUnique";
      } else {
        throw new Error("Unknown set operation: " + operation);
      }

      return AstBuilder.call(setFunction, [
        Object.assign({}, originalCall, {
          func: "_fetchCountUniqueElements"
        }),
        Object.assign({}, rightCall, { func: "_fetchCountUniqueElements" })
      ]);*/
      }

      return originalCall;
    },
    {
      argstring:
        "Feature[, ...] [GROUP BY Feature[, ...]] [WHERE Condition] [LAST Duration] [BEFORE ACTION]",
      docstring: "Performs a sliding window unique set count",
    }
  );

  async function fetchCount(
    ctx: Context,
    service: CountUniqueService,
    keys,
    clockMs,
    windowMs,
    uniques
  ) {
    if (keys.length === 0) {
      return 0;
    }

    // If uniques are empty or we are not bumping do not fetch count with current values.
    let elements = [];
    if (isCountable(uniques)) {
      const products = sqrlCartesianProduct(uniques);
      elements = products.map((features) => {
        const isTuple = features.length > 1;
        return isTuple ? tupleToString(features) : features[0];
      });
    }

    elements = elements.map(slidingdHashHex);

    const results = await service.fetchCounts(ctx, {
      keys,
      at: clockMs,
      windowMs,
      addHashes: elements,
    });
    return Math.round(Math.max(0, ...results));
  }
}