spotify_confidence/analysis/frequentist/confidence_computers/t_test_computer.py (83 lines of code) (raw):
from typing import Tuple, Dict
import numpy as np
from pandas import DataFrame, Series
from statsmodels.stats.weightstats import _tconfint_generic, _tstat_generic
from spotify_confidence.analysis.confidence_utils import power_calculation
from spotify_confidence.analysis.constants import (
NUMERATOR,
NUMERATOR_SUM_OF_SQUARES,
DENOMINATOR,
INTERVAL_SIZE,
POINT_ESTIMATE,
CI_LOWER,
CI_UPPER,
VARIANCE,
TWO_SIDED,
SFX1,
SFX2,
STD_ERR,
PREFERENCE_TEST,
NULL_HYPOTHESIS,
DIFFERENCE,
)
def point_estimate(df: DataFrame, **kwargs: Dict[str, str]) -> float:
numerator = kwargs[NUMERATOR]
denominator = kwargs[DENOMINATOR]
if (df[denominator] == 0).any():
raise ValueError("""Can't compute point estimate: denominator is 0""")
return df[numerator] / df[denominator]
def variance(df: DataFrame, **kwargs: Dict[str, str]) -> float:
numerator = kwargs[NUMERATOR]
denominator = kwargs[DENOMINATOR]
numerator_sumsq = kwargs[NUMERATOR_SUM_OF_SQUARES]
binary = df[numerator_sumsq] == df[numerator]
if binary.all():
# This equals row[POINT_ESTIMATE]*(1-row[POINT_ESTIMATE]) when the data is binary,
# and also gives a robust fallback in case it's not
variance = df[numerator_sumsq] / df[denominator] - df[POINT_ESTIMATE] ** 2
else:
variance = (df[numerator_sumsq] - np.power(df[numerator], 2) / df[denominator]) / (df[denominator] - 1)
if (variance < 0).any():
raise ValueError("Computed variance is negative. " "Please check your inputs.")
return variance
def std_err(df: DataFrame, **kwargs: Dict[str, str]) -> Series:
denominator = kwargs[DENOMINATOR]
return np.sqrt(df[VARIANCE + SFX1] / df[denominator + SFX1] + df[VARIANCE + SFX2] / df[denominator + SFX2])
def add_point_estimate_ci(df: DataFrame, **kwargs: Dict[str, str]) -> Series:
denominator = kwargs[DENOMINATOR]
interval_size = kwargs[INTERVAL_SIZE]
df[CI_LOWER], df[CI_UPPER] = _tconfint_generic(
mean=df[POINT_ESTIMATE],
std_mean=np.sqrt(df[VARIANCE] / df[denominator]),
dof=df[denominator] - 1,
alpha=1 - interval_size,
alternative=TWO_SIDED,
)
return df
def _dof(row: Series, **kwargs: Dict[str, str]) -> float:
denominator = kwargs[DENOMINATOR]
v1, v2 = row[VARIANCE + SFX1], row[VARIANCE + SFX2]
n1, n2 = row[denominator + SFX1], row[denominator + SFX2]
return (v1 / n1 + v2 / n2) ** 2 / ((v1 / n1) ** 2 / (n1 - 1) + (v2 / n2) ** 2 / (n2 - 1))
def p_value(df: Series, **kwargs: Dict[str, str]) -> Series:
_, p_value = _tstat_generic(
value1=df[POINT_ESTIMATE + SFX2],
value2=df[POINT_ESTIMATE + SFX1],
std_diff=df[STD_ERR],
dof=_dof(df, **kwargs),
alternative=df[PREFERENCE_TEST].values[0],
diff=df[NULL_HYPOTHESIS],
)
return p_value
def ci(df: DataFrame, alpha_column: str, **kwargs: Dict[str, str]) -> Tuple[Series, Series]:
return _tconfint_generic(
mean=df[DIFFERENCE],
std_mean=df[STD_ERR],
dof=_dof(df, **kwargs),
alpha=df[alpha_column],
alternative=df[PREFERENCE_TEST].values[0],
)
def achieved_power(df: DataFrame, mde: float, alpha: float, **kwargs: Dict[str, str]) -> DataFrame:
v1, v2 = df[VARIANCE + SFX1], df[VARIANCE + SFX2]
d1, d2 = kwargs[DENOMINATOR] + SFX1, kwargs[DENOMINATOR] + SFX2
n1, n2 = df[d1], df[d2]
var_pooled = ((n1 - 1) * v1 + (n2 - 1) * v2) / (n1 + n2 - 2)
return power_calculation(mde, var_pooled, alpha, n1, n2)