spotify_confidence/analysis/frequentist/confidence_computers/z_test_linreg_computer.py (116 lines of code) (raw):

from functools import reduce from typing import Union, Dict import numpy as np from pandas import DataFrame, Series from spotify_confidence.analysis.confidence_utils import unlist, dfmatmul from spotify_confidence.analysis.constants import ( REGRESSION_PARAM, FEATURE, FEATURE_SUMSQ, FEATURE_CROSS, NUMERATOR, DENOMINATOR, ) from spotify_confidence.analysis.frequentist.confidence_computers import z_test_computer def estimate_slope(df, **kwargs: Dict) -> DataFrame: if kwargs[FEATURE] not in df: return df def col_sum(x): return reduce(lambda x, y: x + y, x) def dimension(x): return x.shape[0] if isinstance(x, np.ndarray) and x.size > 1 else 1 k = df[kwargs[FEATURE_SUMSQ]].apply(dimension).iloc[0] XX0 = np.zeros((k + 1, k + 1)) XX0[1 : (k + 1), 1 : (k + 1)] = col_sum(df[kwargs[FEATURE_SUMSQ]]) XX0[0, 0] = col_sum(df[kwargs[DENOMINATOR]]) XX0[0, 1 : (k + 1)] = col_sum(df[kwargs[FEATURE]]) XX0[1 : (k + 1), 0] = col_sum(df[kwargs[FEATURE]]) Xy0 = np.zeros((k + 1, 1)) Xy0[0,] = col_sum(df[kwargs[NUMERATOR]]) Xy0[1 : (k + 1),] = np.atleast_2d(col_sum(df[kwargs[FEATURE_CROSS]])).reshape(-1, 1) try: b = np.matmul(np.linalg.inv(XX0), Xy0) except np.linalg.LinAlgError: b = np.zeros((k + 1, 1)) out = b[1 : (k + 1)] if out.size == 1: out = out.item() outseries = Series(index=df.index, dtype=df[kwargs[FEATURE]].dtype) df[REGRESSION_PARAM] = outseries.apply(lambda x: out) return df def point_estimate(df: Series, **kwargs) -> float: df = estimate_slope(df, **kwargs) point_estimate = df[kwargs[NUMERATOR]] / df[kwargs[DENOMINATOR]] if REGRESSION_PARAM in df: feature_mean = df[kwargs[FEATURE]].sum() / df[kwargs[DENOMINATOR]].sum() def lin_reg_point_estimate_delta(row: Series, feature_mean: float, **kwargs: Dict) -> Series: return dfmatmul( row[REGRESSION_PARAM], row[kwargs[FEATURE]] - feature_mean * row[kwargs[DENOMINATOR]], outer=False ) return ( point_estimate - df.apply(lin_reg_point_estimate_delta, feature_mean=feature_mean, axis=1, **kwargs) / df[kwargs[DENOMINATOR]] ) return point_estimate def lin_reg_variance_delta(row, **kwargs): y = row[kwargs[NUMERATOR]] n = row[kwargs[DENOMINATOR]] XX = unlist(row[kwargs[FEATURE_SUMSQ]]) X = unlist(row[kwargs[FEATURE]]) Xy = unlist(row[kwargs[FEATURE_CROSS]]) sample_var = XX / n - dfmatmul(X / n, X / n) sample_cov = Xy / n - dfmatmul(X / n, y / n) b = np.atleast_2d(row[REGRESSION_PARAM]) variance2 = np.matmul(np.transpose(b), np.matmul(sample_var, b)).item() variance3 = -2 * np.matmul(np.transpose(b), sample_cov).item() return variance2 + variance3 def variance(df: DataFrame, **kwargs) -> Series: variance1 = z_test_computer.variance(df, **kwargs) if kwargs[FEATURE] in df: computed_variances = variance1 + df.apply(lin_reg_variance_delta, axis=1, **kwargs) if (computed_variances < 0).any(): raise ValueError("Computed variance is negative, please check sufficient " "statistics.") return computed_variances else: return variance1 def add_point_estimate_ci(df: DataFrame, **kwargs: Dict) -> DataFrame: return z_test_computer.add_point_estimate_ci(df, **kwargs) def std_err(df: DataFrame, **kwargs: Dict) -> DataFrame: return z_test_computer.std_err(df, **kwargs) def p_value(df: DataFrame, **kwargs: Dict) -> DataFrame: return z_test_computer.p_value(df, **kwargs) def ci(df: DataFrame, alpha_column: str, **kwargs: Dict) -> DataFrame: return z_test_computer.ci(df, alpha_column, **kwargs) def powered_effect( df: DataFrame, z_alpha: float, z_power: float, binary: bool, non_inferiority: bool, avg_column: float, var_column: float, ) -> Series: return z_test_computer.powered_effect(df, z_alpha, z_power, binary, non_inferiority, avg_column, var_column) def required_sample_size( binary: Union[Series, bool], non_inferiority: Union[Series, bool], hypothetical_effect: Union[Series, float], control_avg: Union[Series, float], control_var: Union[Series, float], z_alpha: float = None, kappa: float = None, proportion_of_total: Union[Series, float] = None, z_power: float = None, ) -> Union[Series, float]: return z_test_computer.required_sample_size( binary, non_inferiority, hypothetical_effect, control_avg, control_var, z_alpha, kappa, proportion_of_total, z_power, )