packages/sdk/src/FlagResolverClient.ts (302 lines of code) (raw):

import { FlagEvaluation } from '.'; import { AccessiblePromise } from './AccessiblePromise'; import { Applier, FlagResolution } from './FlagResolution'; import { Telemetry, Meter } from './Telemetry'; import { Value } from './Value'; import { Context } from './context'; import { FetchBuilder, TimeUnit } from './fetch-util'; import { ResolveFlagsRequest, ResolveFlagsResponse, ApplyFlagsRequest, AppliedFlag, } from './generated/confidence/flags/resolver/v1/api'; import { Sdk } from './generated/confidence/flags/resolver/v1/types'; import { LibraryTraces_Library, LibraryTraces_TraceId, Monitoring, } from './generated/confidence/telemetry/v1/telemetry'; import { SimpleFetch } from './types'; const FLAG_PREFIX = 'flags/'; export class ResolveError extends Error { constructor(public readonly code: FlagEvaluation.ErrorCode, message: string) { super(message); } } export class PendingResolution<T = FlagResolution> extends AccessiblePromise<T> { #context: Context; #controller: AbortController; get signal(): AbortSignal { return this.#controller.signal; } protected constructor(promise: PromiseLike<T>, context: Context, controller: AbortController, rejected?: boolean) { super(promise, rejected); this.#context = context; this.#controller = controller; } protected chain<S>(value: any, rejected?: boolean | undefined): PendingResolution<S> { return new PendingResolution(value, this.#context, this.#controller, rejected); } get context(): Context { return this.#context; } abort(): void { this.#controller.abort(); } then<TResult1 = T, TResult2 = never>( onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | null | undefined, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | null | undefined, ): PendingResolution<TResult1 | TResult2> { return super.then(onfulfilled, onrejected) as PendingResolution<TResult1 | TResult2>; } catch<TResult = never>( onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null | undefined, ): PendingResolution<T | TResult> { return super.catch(onrejected) as PendingResolution<T | TResult>; } finally(onfinally?: (() => void) | null | undefined): PendingResolution<T> { return super.finally(onfinally) as PendingResolution<T>; } static create( context: Context, executor: (signal: AbortSignal) => PromiseLike<FlagResolution>, ): PendingResolution<FlagResolution> { const controller = new AbortController(); return new PendingResolution(executor(controller.signal), context, controller); } } export interface FlagResolverClient { resolve(context: Context, flags: string[]): PendingResolution; } export type FlagResolverClientOptions = { fetchImplementation: SimpleFetch; clientSecret: string; sdk: Sdk; applyTimeout?: number; resolveTimeout: number; environment: 'client' | 'backend'; region?: 'eu' | 'us'; resolveBaseUrl?: string; telemetry: Telemetry; }; export class FetchingFlagResolverClient implements FlagResolverClient { private readonly fetchImplementation: SimpleFetch; private readonly clientSecret: string; private readonly sdk: Sdk; private readonly applyTimeout?: number; private readonly resolveTimeout: number; private readonly baseUrl: string; private readonly markLatency: Meter; constructor({ fetchImplementation, clientSecret, sdk, applyTimeout, resolveTimeout, // todo refactor to move out environment environment, region, resolveBaseUrl, telemetry, }: FlagResolverClientOptions) { this.markLatency = telemetry.registerMeter({ library: LibraryTraces_Library.LIBRARY_CONFIDENCE, version: sdk.version, id: LibraryTraces_TraceId.TRACE_ID_RESOLVE_LATENCY, }); const fetchBuilderWithTelemetry = withTelemetryData(new FetchBuilder(), telemetry); // TODO think about both resolve and apply request logic for backends const fetchWithTelemetry = fetchBuilderWithTelemetry.build(fetchImplementation); this.fetchImplementation = environment === 'backend' ? fetchWithTelemetry : withRequestLogic(fetchWithTelemetry); this.clientSecret = clientSecret; this.sdk = sdk; this.applyTimeout = applyTimeout; if (resolveBaseUrl) { this.baseUrl = `${resolveBaseUrl}/v1`; } else { this.baseUrl = region ? `https://resolver.${region}.confidence.dev/v1` : 'https://resolver.confidence.dev/v1'; } this.resolveTimeout = resolveTimeout; } resolve(context: Context, flags: string[]): PendingResolution { const request: ResolveFlagsRequest = { clientSecret: this.clientSecret, evaluationContext: context, apply: false, sdk: this.sdk, flags: flags.map(name => FLAG_PREFIX + name), }; return PendingResolution.create(context, signal => { const signalWithTimeout = withTimeout( signal, this.resolveTimeout, new ResolveError('TIMEOUT', 'Resolve timeout'), ); const start = Date.now(); return this.resolveFlagsJson(request, signalWithTimeout) .then(response => FlagResolution.ready(context, response, this.createApplier(response.resolveToken))) .catch(error => { if (error instanceof ResolveError) { return FlagResolution.failed(context, error.code, error.message); } return FlagResolution.failed(context, 'GENERAL', error.message); }) .finally(() => { this.markLatency(Date.now() - start); }); }); } createApplier(resolveToken: Uint8Array): Applier { const applied = new Set<string>(); const pending: AppliedFlag[] = []; const flush = () => { timeoutId = 0; this.apply({ flags: pending.splice(0, pending.length), clientSecret: this.clientSecret, resolveToken, sdk: this.sdk, sendTime: new Date(), }); }; let timeoutId = 0; return (flagName: string) => { if (applied.has(flagName)) return; applied.add(flagName); pending.push({ flag: FLAG_PREFIX + flagName, applyTime: new Date(), }); if (timeoutId) { clearTimeout(timeoutId); } timeoutId = Number(setTimeout(flush, this.applyTimeout)); }; } async apply(request: ApplyFlagsRequest): Promise<void> { const resp = await this.fetchImplementation( new Request(`${this.baseUrl}/flags:apply`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(ApplyFlagsRequest.toJSON(request)), }), ); if (!resp.ok) { throw new Error(`${resp.status}: ${resp.statusText}`); } } async resolveFlagsJson(request: ResolveFlagsRequest, signal: AbortSignal): Promise<ResolveFlagsResponse> { const resp = await this.fetchImplementation( new Request(`${this.baseUrl}/flags:resolve`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(ResolveFlagsRequest.toJSON(request)), signal, }), ); if (!resp.ok) { throw new Error(`${resp.status}: ${resp.statusText}`); } return ResolveFlagsResponse.fromJSON(await resp.json()); } // async resolveFlagsProto(request: ResolveFlagsRequest): Promise<ResolveFlagsResponse> { // const resp = await this.fetchImplementation( // new Request('https://resolver.confidence.dev/v1/flags:resolve', { // method: 'POST', // headers: { // 'Content-Type': 'application/x-protobuf', // }, // body: ResolveFlagsRequest.encode(request).finish(), // }), // ); // if (!resp.ok) { // throw new Error(`${resp.status}: ${resp.statusText}`); // } // return ResolveFlagsResponse.decode(new Uint8Array(await resp.arrayBuffer())); // } } export class CachingFlagResolverClient implements FlagResolverClient { readonly #cache: Map<string, { timestamp: number; value: PendingResolution; refCount: number }> = new Map(); readonly #source: FlagResolverClient; readonly #ttl: number; constructor(source: FlagResolverClient, ttlMs: number) { this.#source = source; this.#ttl = ttlMs; } resolve(context: Context, flags: string[]): PendingResolution { this.evict(); const key = Value.serialize(context); let entry = this.#cache.get(key); if (!entry) { const value = this.#source.resolve(context, flags); entry = { refCount: 1, timestamp: Date.now(), value }; this.#cache.set(key, entry); value.signal.addEventListener( 'abort', () => { this.#cache.delete(key); }, { once: true }, ); } else { entry.refCount++; } return PendingResolution.create(context, signal => { signal.addEventListener( 'abort', () => { if (--entry!.refCount === 0) { entry!.value.abort(); } }, { once: true }, ); return entry!.value; }); } evict() { const now = Date.now(); for (const [key, { timestamp }] of this.#cache) { const age = now - timestamp; if (age < this.#ttl) return; this.#cache.delete(key); } } } export function withTelemetryData(fetchBuilder: FetchBuilder, telemetry: Telemetry): FetchBuilder { return fetchBuilder.modifyRequest(async request => { const monitoring = telemetry.getSnapshot(); if (monitoring.libraryTraces.length > 0) { const headers = new Headers(request.headers); const base64Message = btoa(String.fromCharCode(...Monitoring.encode(monitoring).finish())); headers.set('X-CONFIDENCE-TELEMETRY', base64Message); return new Request(request, { headers }); } return request; }); } export function withRequestLogic(fetchImplementation: (request: Request) => Promise<Response>): typeof fetch { const fetchResolve = new FetchBuilder() // infinite retries without delay until aborted by timeout .retry() .rejectNotOk() .rateLimit(1, { initialTokens: 3, maxTokens: 2 }) .build(fetchImplementation); const fetchApply = new FetchBuilder() .limitPending(1000) .timeout(30 * TimeUnit.MINUTE) .retry({ delay: 5 * TimeUnit.SECOND, backoff: 2, maxDelay: 5 * TimeUnit.MINUTE, jitter: 0.2 }) .rejectNotOk() .rateLimit(2) // update send-time before sending .modifyRequest(async request => { if (request.method === 'POST') { const body = JSON.stringify({ ...(await request.clone().json()), sendTime: new Date().toISOString() }); return new Request(request, { body }); } return request; }) .build(fetchImplementation); return ( new FetchBuilder() .route(url => url.endsWith('flags:resolve'), fetchResolve) .route(url => url.endsWith('flags:apply'), fetchApply) // throw so we notice changes in endpoints that should be handled here .build(request => Promise.reject(new Error(`Unexpected url: ${request.url}`))) ); } function withTimeout(signal: AbortSignal, timeout: number, reason?: any): AbortSignal { const controller = new AbortController(); const timeoutId: NodeJS.Timeout | number = setTimeout(() => controller.abort(reason), timeout); // in Node setTimeout returns an object, with an unref function which will prevent the timeout from keeping the process alive if (typeof timeoutId === 'object') timeoutId.unref(); signal.addEventListener('abort', () => { clearTimeout(timeoutId); controller.abort(signal.reason); }); return controller.signal; }