packages/sqrl-redis-functions/src/services/RedisCountUnique.ts (91 lines of code) (raw):

/** * Copyright 2018 Twitter, Inc. * Licensed under the Apache License, Version 2.0 * http://www.apache.org/licenses/LICENSE-2.0 */ import { Context, SqrlKey } from "sqrl"; import { CountUniqueService } from "../Services"; import { RedisInterface, createRedisKey } from "./RedisInterface"; type CountUniqueData = { timestamp: number; value: string; }[]; export class RedisCountUniqueService implements CountUniqueService { constructor(private redis: RedisInterface, private prefix: string) { /* nothing else */ } async bump( ctx: Context, props: { at: number; key: SqrlKey; sortedHashes: string[]; windowMs: number; } ) { const { at, key, sortedHashes } = props; const push = sortedHashes.map((hash) => { return JSON.stringify({ timestamp: at, value: hash, }); }); await this.redis.listPush( ctx, createRedisKey(ctx.requireDatabaseSet(), this.prefix, key.getBuffer()), ...push ); } private async getData(ctx: Context, key: SqrlKey): Promise<CountUniqueData> { const data = await this.redis.getList( ctx, createRedisKey(ctx.requireDatabaseSet(), this.prefix, key.getBuffer()) ); return data.map((entry) => JSON.parse(entry)); } private async getHashes(ctx: Context, key: SqrlKey, windowStartMs: number) { const data = await this.getData(ctx, key); const hashes = Array.from( new Set( data .filter(({ timestamp }) => timestamp >= windowStartMs) .map(({ value }) => value) ) ).sort(); return hashes; } async fetchHashes( ctx: Context, props: { keys: SqrlKey[]; windowStartMs: number } ): Promise<string[]> { const { keys, windowStartMs } = props; if (keys.length === 0) { return []; } const rv: Set<string> = new Set(); await Promise.all( keys.map(async (key) => { const hashes = await this.getHashes(ctx, key, windowStartMs); for (const hash of hashes) { rv.add(hash); } }) ); return Array.from(rv).sort(); } async fetchCounts( ctx: Context, props: { keys: SqrlKey[]; at: number; windowMs: number; addHashes: string[]; } ): Promise<number[]> { const { keys, at, windowMs, addHashes } = props; const windowStartMs = at - windowMs; return Promise.all( keys.map(async (key) => { const hashes = await this.fetchHashes(ctx, { keys: [key], windowStartMs, }); return new Set([...hashes, ...addHashes]).size; }) ); } }