packages/sqrl-redis-functions/src/CountFunctions.ts (429 lines of code) (raw):
/**
* Copyright 2018 Twitter, Inc.
* Licensed under the Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0
*/
// tslint:disable:no-submodule-imports (@TODO)
import {
AT,
AstBuilder,
SqrlKey,
CompileState,
Execution,
Instance,
sqrlInvariant,
Ast,
CustomCallAst,
} from "sqrl";
import { parse } from "./parser/sqrlRedisParser";
import { invariant, removeIndent } from "sqrl-common";
import {
CountArguments,
TrendingArguments,
AliasedFeature,
Timespan,
} from "./parser/sqrlRedis";
import { CountService } from "./Services";
const ENTITY_TYPE = "Counter";
const HOUR_MS = 3600 * 1000;
const DAY_MS = HOUR_MS * 24;
function dayDuration(days: number): Timespan {
return {
type: "duration",
durationMs: days * DAY_MS,
};
}
const PREVIOUS_CONFIG: {
[timespan: string]: {
subtractLeft: Timespan;
subtractRight: Timespan;
allowNegativeValue: boolean;
};
} = {
// These counters extract what should always be a larger value from a
// smaller one. If they are negative than the value should be ignored
// (i.e. in the case of not enough data.)
previousLastDay: {
subtractLeft: dayDuration(2),
subtractRight: dayDuration(1),
allowNegativeValue: false,
},
previousLastWeek: {
subtractLeft: dayDuration(14),
subtractRight: dayDuration(7),
allowNegativeValue: false,
},
// dayWeekAgo is internal only
dayWeekAgo: {
subtractLeft: dayDuration(8),
subtractRight: dayDuration(7),
allowNegativeValue: false,
},
// These x-over-y counters can be negative, but should still be null in
// the initial missing data cases.
dayOverDay: {
subtractLeft: dayDuration(1),
subtractRight: { type: "previousLastDay" },
allowNegativeValue: true,
},
dayOverWeek: {
subtractLeft: dayDuration(1),
subtractRight: { type: "dayWeekAgo" },
allowNegativeValue: true,
},
weekOverWeek: {
subtractLeft: dayDuration(7),
subtractRight: { type: "previousLastWeek" },
allowNegativeValue: true,
},
};
const TRENDING_CONFIG: {
[timespan: string]: {
current: Timespan;
currentAndPrevious: Timespan;
};
} = {
dayOverDay: {
current: dayDuration(1),
currentAndPrevious: dayDuration(2),
},
dayOverFullWeek: {
current: dayDuration(1),
currentAndPrevious: dayDuration(7),
},
weekOverWeek: {
current: dayDuration(7),
currentAndPrevious: dayDuration(14),
},
};
export interface CountServiceBumpProps {
at: number;
keys: SqrlKey[];
by: number;
windowMs: number;
}
function interpretCountArgs(
state: CompileState,
sourceAst: Ast,
args: CountArguments
) {
const { whereAst, whereFeatures, whereTruth } = state.combineGlobalWhere(
args.where
);
const counterProps: {
features: string[];
whereFeatures?: string[];
whereTruth?: string;
sumFeature?: string;
} = {
features: args.features.map((feature: AliasedFeature) => feature.alias),
whereFeatures,
whereTruth,
};
// Include sumFeature in the key if provided - otherwise we will
// just bump by 1 so leave it out of key.
let bumpByAst: Ast = AstBuilder.constant(1);
if (args.sumFeature) {
counterProps.sumFeature = args.sumFeature.value;
bumpByAst = AstBuilder.call("_getBumpBy", [args.sumFeature]);
}
const { entityAst, entityId } = state.addHashedEntity(
sourceAst,
ENTITY_TYPE,
counterProps
);
const featuresAst = args.features.map((aliasFeature) =>
AstBuilder.feature(aliasFeature.feature.value)
);
const featureString = featuresAst.map((ast) => ast.value).join("~");
const keyedCounterName = `${entityId.getIdString()}~${featureString}`;
const keysAst = state.setGlobal(
sourceAst,
AstBuilder.call("_getKeyList", [entityAst, ...featuresAst]),
`key(${keyedCounterName})`
);
const hasAlias = args.features.some(
(featureAst: AliasedFeature) =>
featureAst.feature.value !== featureAst.alias
);
return {
bumpByAst,
hasAlias,
keyedCounterName,
keysAst,
entityAst,
entityId,
whereAst,
whereFeatures,
whereTruth,
};
}
function getWindowMsForTimespan(timespan: Timespan): number | null {
if (timespan.type === "duration") {
return timespan.durationMs;
} else if (timespan.type === "total") {
return null;
} else {
throw new Error("Unknown duration for timespan type: " + timespan.type);
}
}
function getNameForTimespan(timespan: Timespan): string {
if (timespan.type === "duration") {
let name = "";
let remaining = timespan.durationMs;
if (remaining > DAY_MS) {
name += Math.floor(remaining / DAY_MS).toString() + "D";
remaining = remaining % DAY_MS;
}
name += remaining.toString();
return name;
} else {
return timespan.type;
}
}
export function ensureCounterBump(
state: CompileState,
sourceAst: Ast,
args: CountArguments
) {
const interpretResult = interpretCountArgs(state, sourceAst, args);
const {
hasAlias,
whereAst,
keyedCounterName,
bumpByAst,
keysAst,
} = interpretResult;
// [@todo: check] Only base the counter identity on features/where
if (hasAlias) {
return interpretResult;
}
const windowMs = getWindowMsForTimespan(args.timespan);
const slotAst = state.setGlobal(
sourceAst,
AstBuilder.call("_bumpCount", [
AstBuilder.branch(whereAst, keysAst, AstBuilder.constant(null)),
bumpByAst,
AstBuilder.constant(windowMs),
]),
`bump(${keyedCounterName}:${windowMs})`
);
state.addStatement("SqrlCountStatements", slotAst);
return interpretResult;
}
export function registerCountFunctions(
instance: Instance,
service: CountService
) {
instance.registerSync(
function _getBumpBy(bumpBy: number) {
if (typeof bumpBy !== "number") {
return null;
}
return bumpBy > 0 ? bumpBy : null;
},
{
args: [AT.feature],
}
);
instance.registerStatement(
"SqrlCountStatements",
async function _bumpCount(state, keys, by, windowMs) {
if (keys === null || by === null) {
return null;
}
state.manipulator.addCallback(async (ctx) => {
await service.bump(ctx, state.getClockMs(), keys, windowMs, by);
});
},
{
allowNull: true,
allowSqrlObjects: true,
args: [AT.state, AT.any.array, AT.any, AT.any],
}
);
instance.register(
function _fetchCountsFromDb(state: Execution, keys, windowMs) {
if (keys === null) {
return null;
}
return service.fetch(state.ctx, state.getClockMs(), keys, windowMs);
},
{
allowNull: true,
allowSqrlObjects: true,
args: [AT.state, AT.any, AT.any],
}
);
instance.register(
async function _fetchTrendingDetails(
state,
keys,
currentCounts,
currentAndPreviousCounts,
minEvents
) {
if (!currentCounts || !currentAndPreviousCounts) {
return [];
}
invariant(
currentCounts.length === currentAndPreviousCounts.length &&
currentCounts.length === keys.length,
"Mismatched current/previous trending counts."
);
const rv = [];
currentCounts.forEach((currentCount, i) => {
const currentAndPreviousCount = currentAndPreviousCounts[i];
if (
currentCount === null ||
currentCount < minEvents ||
currentAndPreviousCount === null ||
currentAndPreviousCount < currentCount
) {
return;
}
const key = keys[i];
invariant(key !== null, "Received null key for current count.");
const previousCount = currentAndPreviousCount - currentCount;
const magnitude = Math.log10(currentCount / Math.max(previousCount, 1));
if (magnitude >= 1) {
rv.push({
key: key.featureValues,
current: currentCount,
previous: previousCount,
delta: 2 * currentCount - currentAndPreviousCount,
magnitude,
});
}
});
return rv;
},
{
args: [AT.state, AT.any, AT.any, AT.any, AT.any],
allowSqrlObjects: true,
}
);
instance.registerCustom(
function trending(state: CompileState, ast: CustomCallAst): Ast {
const args: TrendingArguments = parse(ast.source, {
startRule: "TrendingArguments",
});
const { timespan } = args;
sqrlInvariant(
ast,
timespan.type === "dayOverDay" ||
timespan.type === "weekOverWeek" ||
timespan.type === "dayOverFullWeek",
"Invalid timespan for trending. Expecting `DAY OVER DAY` or `WEEK OVER WEEK` or `DAY OVER FULL WEEK`"
);
const trendingConfig = TRENDING_CONFIG[timespan.type];
const currentCountArgs: CountArguments = {
features: args.features,
sumFeature: null,
timespan: trendingConfig.current,
where: args.where,
};
const currentCountAst = databaseCountTransform(
state,
ast,
currentCountArgs
);
const currentAndPreviousCountAst = databaseCountTransform(state, ast, {
...currentCountArgs,
timespan: trendingConfig.currentAndPrevious,
});
const { keysAst } = interpretCountArgs(state, ast, currentCountArgs);
return AstBuilder.call("_fetchTrendingDetails", [
keysAst,
currentCountAst,
currentAndPreviousCountAst,
AstBuilder.constant(args.minEvents),
]);
},
{
argstring:
"Feature[, ...] [WHERE Condition] [WITH MIN Count EVENTS] (Timespan)",
docstring: removeIndent(`
Returns values whose counts have gone up by an order of magnitude
Timespans: DAY OVER DAY, DAY OVER WEEK, DAY OVER FULL WEEK
`),
}
);
function databaseCountTransform(
state: CompileState,
sourceAst: Ast,
args: CountArguments
): Ast {
const { keysAst } = ensureCounterBump(state, sourceAst, args);
const windowMs = getWindowMsForTimespan(args.timespan);
return AstBuilder.call("_fetchCountsFromDb", [
keysAst,
AstBuilder.constant(windowMs),
]);
}
function classifyCountTransform(
state: CompileState,
ast: CustomCallAst,
args: CountArguments
) {
const { hasAlias, keyedCounterName, whereAst } = interpretCountArgs(
state,
ast,
args
);
// Rewrite this count as a subtraction between other counts (whoah)
if (PREVIOUS_CONFIG.hasOwnProperty(args.timespan.type)) {
const previousConfig = PREVIOUS_CONFIG[args.timespan.type];
// Convert into a subtract(left, right)
// We transform into calls to count() which in turn will get transformed
// themselves. This is necessary because the previous config might
// itself be a recursive count.
//
// weekOverWeek = lastWeek - previousLastWeek
// = lastWeek - (lastTwoWeeks - lastWeek)
//
const resultAst = AstBuilder.call("_subtract", [
classifyCountTransform(state, ast, {
...args,
timespan: previousConfig.subtractLeft,
}),
classifyCountTransform(state, ast, {
...args,
timespan: previousConfig.subtractRight,
}),
]);
if (!previousConfig.allowNegativeValue) {
const subtractionAst = state.setGlobal(
ast,
resultAst,
`count(${args.timespan.type}:${keyedCounterName})`
);
return AstBuilder.branch(
// if result < 0
AstBuilder.call("_cmpL", [subtractionAst, AstBuilder.constant(0)]),
// then null
AstBuilder.constant(null),
// else result
subtractionAst
);
}
return resultAst;
}
const addAst = AstBuilder.branch(
AstBuilder.and([whereAst, AstBuilder.feature("SqrlIsClassify")]),
AstBuilder.constant(1),
AstBuilder.constant(0)
);
const resultAst = AstBuilder.call("_add", [
hasAlias ? AstBuilder.constant(0) : addAst,
AstBuilder.call("max", [
AstBuilder.call("concat", [
AstBuilder.constant([0]),
databaseCountTransform(state, ast, args),
]),
]),
]);
return state.setGlobal(
ast,
resultAst,
`count(${getNameForTimespan(args.timespan)}:${keyedCounterName})`
);
}
instance.registerCustom(
function count(state: CompileState, ast: CustomCallAst): Ast {
const args: CountArguments = parse(ast.source, {
startRule: "CountArguments",
});
return classifyCountTransform(state, ast, args);
},
{
argstring: "BY Feature[, ...] [WHERE Condition] [LAST Timespan]",
docstring: removeIndent(`
Returns the streaming count for the given window
Timespans: LAST [X] SECONDS/MINUTES/HOURS/DAYS/WEEKS
DAY OVER DAY, DAY OVER WEEK, WEEK OVER WEEK
TOTAL
`),
}
);
}