spotify_confidence/analysis/frequentist/confidence_computers/z_test_computer.py (412 lines of code) (raw):
from typing import Tuple, Union, Dict
import numpy as np
from pandas import DataFrame, Series
from scipy import optimize
from scipy import stats as st
try:
from scipy.stats._stats_py import _unequal_var_ttest_denom
except ImportError: # Fallback for scipy<1.8.0
from scipy.stats.stats import _unequal_var_ttest_denom
from statsmodels.stats.weightstats import _zconfint_generic, _zstat_generic
from spotify_confidence.analysis.confidence_utils import power_calculation
from spotify_confidence.analysis.constants import (
NUMERATOR,
NUMERATOR_SUM_OF_SQUARES,
DENOMINATOR,
INTERVAL_SIZE,
FINAL_EXPECTED_SAMPLE_SIZE,
ORDINAL_GROUP_COLUMN,
POINT_ESTIMATE,
CI_LOWER,
CI_UPPER,
ADJUSTED_LOWER,
ADJUSTED_UPPER,
VARIANCE,
NUMBER_OF_COMPARISONS,
TWO_SIDED,
SFX2,
SFX1,
STD_ERR,
PREFERENCE_TEST,
NULL_HYPOTHESIS,
DIFFERENCE,
ALPHA,
IS_SIGNIFICANT,
HOLM,
SPOT_1_HOLM,
HOMMEL,
SIMES_HOCHBERG,
SPOT_1_HOMMEL,
SPOT_1_SIMES_HOCHBERG,
NIM,
ADJUSTED_ALPHA,
ORIGINAL_POINT_ESTIMATE,
)
from spotify_confidence.analysis.frequentist.sequential_bound_solver import bounds
def sequential_bounds(t: np.array, alpha: float, sides: int, state: DataFrame = None):
return bounds(t, alpha, rho=2, ztrun=8, sides=sides, max_nints=1000, state=state)
def point_estimate(df: DataFrame, **kwargs: Dict[str, str]) -> float:
numerator = kwargs[NUMERATOR]
denominator = kwargs[DENOMINATOR]
if (df[denominator] == 0).any():
raise ValueError("""Can't compute point estimate: denominator is 0""")
return df[numerator] / df[denominator]
def variance(df: DataFrame, **kwargs: Dict[str, str]) -> float:
numerator = kwargs[NUMERATOR]
denominator = kwargs[DENOMINATOR]
numerator_sumsq = kwargs[NUMERATOR_SUM_OF_SQUARES]
binary = df[numerator_sumsq] == df[numerator]
if binary.all():
# This equals row[POINT_ESTIMATE]*(1-row[POINT_ESTIMATE]) when the data is binary,
# and also gives a robust fallback in case it's not
variance = df[numerator_sumsq] / df[denominator] - df[ORIGINAL_POINT_ESTIMATE] ** 2
else:
variance = (df[numerator_sumsq] - np.power(df[numerator], 2) / df[denominator]) / (df[denominator] - 1)
if (variance < 0).any():
raise ValueError("Computed variance is negative. " "Please check your inputs.")
return variance
def std_err(df: Series, **kwargs: Dict[str, str]) -> float:
denominator = kwargs[DENOMINATOR]
return np.sqrt(df[VARIANCE + SFX1] / df[denominator + SFX1] + df[VARIANCE + SFX2] / df[denominator + SFX2])
def add_point_estimate_ci(df: Series, **kwargs: Dict[str, str]) -> Series:
denominator = kwargs[DENOMINATOR]
interval_size = kwargs[INTERVAL_SIZE]
df[CI_LOWER], df[CI_UPPER] = _zconfint_generic(
mean=df[POINT_ESTIMATE],
std_mean=np.sqrt(df[VARIANCE] / df[denominator]),
alpha=1 - interval_size,
alternative=TWO_SIDED,
)
return df
def p_value(df: DataFrame, **kwargs: Dict[str, str]) -> Series:
_, p_value = _zstat_generic(
value1=df[POINT_ESTIMATE + SFX2],
value2=df[POINT_ESTIMATE + SFX1],
std_diff=df[STD_ERR],
alternative=df[PREFERENCE_TEST].values[0],
diff=df[NULL_HYPOTHESIS],
)
return p_value
def ci(df: DataFrame, alpha_column: str, **kwargs: Dict[str, str]) -> Tuple[Series, Series]:
return _zconfint_generic(
mean=df[DIFFERENCE], std_mean=df[STD_ERR], alpha=df[alpha_column], alternative=df[PREFERENCE_TEST].values[0]
)
def achieved_power(df: DataFrame, mde: float, alpha: float, **kwargs: Dict[str, str]) -> DataFrame:
denominator = kwargs[DENOMINATOR]
v1, v2 = df[VARIANCE + SFX1], df[VARIANCE + SFX2]
n1, n2 = df[denominator + SFX1], df[denominator + SFX2]
var_pooled = ((n1 - 1) * v1 + (n2 - 1) * v2) / (n1 + n2 - 2)
return power_calculation(mde, var_pooled, alpha, n1, n2)
def compute_sequential_adjusted_alpha(df: DataFrame, **kwargs: Dict[str, str]):
denominator = kwargs[DENOMINATOR]
final_expected_sample_size_column = kwargs[FINAL_EXPECTED_SAMPLE_SIZE]
ordinal_group_column = kwargs[ORDINAL_GROUP_COLUMN]
n_comparisons = kwargs[NUMBER_OF_COMPARISONS]
if not df.reset_index()[ordinal_group_column].is_unique:
raise ValueError("Ordinal values cannot be duplicated")
def adjusted_alphas_for_group(grp: DataFrame) -> Series:
return (
sequential_bounds(
t=grp["sample_size_proportions"].values,
alpha=grp[ALPHA].values[0] / n_comparisons,
sides=2 if (grp[PREFERENCE_TEST] == TWO_SIDED).all() else 1,
)
.df.set_index(grp.index)
.assign(
**{
ADJUSTED_ALPHA: lambda df: df.apply(
lambda row: (
2 * (1 - st.norm.cdf(row["zb"]))
if (grp[PREFERENCE_TEST] == TWO_SIDED).all()
else 1 - st.norm.cdf(row["zb"])
),
axis=1,
)
}
)
)[["zb", ADJUSTED_ALPHA]]
comparison_total_column = "comparison_total_" + denominator
return Series(
data=(
df.assign(**{comparison_total_column: df[denominator + SFX1] + df[denominator + SFX2]})
.assign(
max_sample_size=lambda df: df[[comparison_total_column, final_expected_sample_size_column]]
.max(axis=1)
.max()
)
.assign(sample_size_proportions=lambda df: df[comparison_total_column] / df["max_sample_size"])
.pipe(adjusted_alphas_for_group)[ADJUSTED_ALPHA]
),
name=ADJUSTED_ALPHA,
)
def ci_for_multiple_comparison_methods(
df: DataFrame,
correction_method: str,
alpha: float,
w: float = 1.0,
) -> Tuple[Union[Series, float], Union[Series, float]]:
if TWO_SIDED in df[PREFERENCE_TEST]:
raise ValueError(
"CIs can only be produced for one-sided tests when other multiple test corrections "
"methods than bonferroni are applied"
)
m_scal = len(df)
num_significant = sum(df[IS_SIGNIFICANT])
r = m_scal - num_significant
def _aw(W: float, alpha: float, m_scal: float, r: int):
return alpha * (1 - (1 - W) * (m_scal - r) / m_scal)
def _bw(W: float, alpha: float, m_scal: float, r: int):
return 1 - (1 - alpha) / np.power((1 - (1 - W) * (1 - np.power((1 - alpha), (1 / m_scal)))), (m_scal - r))
if correction_method in [HOLM, SPOT_1_HOLM]:
adjusted_alpha_rej_equal_m = 1 - alpha / m_scal
adjusted_alpha_rej_less_m = 1 - (1 - w) * (alpha / m_scal)
adjusted_alpha_accept = 1 - _aw(w, alpha, m_scal, r) / r if r != 0 else 0
elif correction_method in [HOMMEL, SIMES_HOCHBERG, SPOT_1_HOMMEL, SPOT_1_SIMES_HOCHBERG]:
adjusted_alpha_rej_equal_m = np.power((1 - alpha), (1 / m_scal))
adjusted_alpha_rej_less_m = 1 - (1 - w) * (1 - np.power((1 - alpha), (1 / m_scal)))
adjusted_alpha_accept = 1 - _bw(w, alpha, m_scal, r) / r if r != 0 else 0
else:
raise ValueError(
"CIs not supported for correction method. "
f"Supported methods: {HOMMEL}, {HOLM}, {SIMES_HOCHBERG},"
f"{SPOT_1_HOLM}, {SPOT_1_HOMMEL} and {SPOT_1_SIMES_HOCHBERG}"
)
def _compute_ci_for_row(row: Series) -> Tuple[float, float]:
if row[IS_SIGNIFICANT] and num_significant == m_scal:
alpha_adj = adjusted_alpha_rej_equal_m
elif row[IS_SIGNIFICANT] and num_significant < m_scal:
alpha_adj = adjusted_alpha_rej_less_m
else:
alpha_adj = adjusted_alpha_accept
ci_sign = -1 if row[PREFERENCE_TEST] == "larger" else 1
bound1 = row[DIFFERENCE] + ci_sign * st.norm.ppf(alpha_adj) * row[STD_ERR]
if ci_sign == -1:
bound2 = max(row[NULL_HYPOTHESIS], bound1)
else:
bound2 = min(row[NULL_HYPOTHESIS], bound1)
bound = bound2 if row[IS_SIGNIFICANT] else bound1
lower = bound if row[PREFERENCE_TEST] == "larger" else -np.inf
upper = bound if row[PREFERENCE_TEST] == "smaller" else np.inf
row[ADJUSTED_LOWER] = lower
row[ADJUSTED_UPPER] = upper
return row
ci_df = df.apply(_compute_ci_for_row, axis=1)[[ADJUSTED_LOWER, ADJUSTED_UPPER]]
return ci_df[ADJUSTED_LOWER], ci_df[ADJUSTED_UPPER]
def ci_width(
z_alpha, binary, non_inferiority, hypothetical_effect, control_avg, control_var, control_count, treatment_count
) -> Union[Series, float]:
treatment_var = _get_hypothetical_treatment_var(
binary, non_inferiority, control_avg, control_var, hypothetical_effect
)
_, std_err = _unequal_var_ttest_denom(control_var, control_count, treatment_var, treatment_count)
return 2 * z_alpha * std_err
def powered_effect(
df: DataFrame,
z_alpha: float,
z_power: float,
binary: bool,
non_inferiority: bool,
avg_column: float,
var_column: float,
) -> Series:
if binary and not non_inferiority:
effect = df.apply(
lambda row: _search_MDE_binary_local_search(
control_avg=row[avg_column],
control_var=row[var_column],
non_inferiority=False,
kappa=row["kappa"],
proportion_of_total=row["proportion_of_total"],
current_number_of_units=row["current_number_of_units"],
z_alpha=z_alpha,
z_power=z_power,
)[0],
axis=1,
)
else:
treatment_var = _get_hypothetical_treatment_var(
binary_metric=binary,
non_inferiority=df[NIM] is not None,
control_avg=df[avg_column],
control_var=df[var_column],
hypothetical_effect=0,
)
n2_partial = np.power((z_alpha + z_power), 2) * (df[var_column] / df["kappa"] + treatment_var)
effect = np.sqrt(
(1 / (df["current_number_of_units"] * df["proportion_of_total"])) * (n2_partial + df["kappa"] * n2_partial)
)
return effect
def required_sample_size(
binary: Union[Series, bool],
non_inferiority: Union[Series, bool],
hypothetical_effect: Union[Series, float],
control_avg: Union[Series, float],
control_var: Union[Series, float],
z_alpha: float = None,
kappa: float = None,
proportion_of_total: Union[Series, float] = None,
z_power: float = None,
) -> Union[Series, float]:
if kappa is None:
raise ValueError("kappa is None, must be postive float")
if proportion_of_total is None:
raise ValueError("proportion_of_total is None, must be between 0 and 1")
treatment_var = np.vectorize(_get_hypothetical_treatment_var)(
binary, non_inferiority, control_avg, control_var, hypothetical_effect
)
n2 = _treatment_group_sample_size(
z_alpha=z_alpha,
z_power=z_power,
hypothetical_effect=hypothetical_effect,
control_var=control_var,
treatment_var=treatment_var,
kappa=kappa,
)
required_sample_size = np.ceil((n2 + n2 * kappa) / proportion_of_total)
return required_sample_size
def _search_MDE_binary_local_search(
control_avg: float,
control_var: float,
non_inferiority: bool,
kappa: float,
proportion_of_total: float,
current_number_of_units: float,
z_alpha: float = None,
z_power: float = None,
):
def f(x):
return _find_current_powered_effect(
hypothetical_effect=x,
control_avg=control_avg,
control_var=control_var,
binary=True,
non_inferiority=non_inferiority,
kappa=kappa,
proportion_of_total=proportion_of_total,
current_number_of_units=current_number_of_units,
z_alpha=z_alpha,
z_power=z_power,
)
max_val = 1 - control_avg
min_val = min(10e-9, max_val)
if min_val == max_val:
# corner case that crashes the optimizer
return min_val, f(min_val)
max_iter = 100 # max number of iterations before falling back to slow grid search
# we stop immediately if a solution was found that is "good enough". A threshold of
# 1 indicates that
# the approximated number of units (based on the current effect candidate) is off by
# at most 1.0
goodness_threshold = 1.0
curr_iter = 0
best_x = None
best_fun = float("inf")
bounds_queue = [(min_val, max_val)]
while curr_iter < max_iter and best_fun > goodness_threshold:
# take next value from queue
interval = bounds_queue.pop(0)
# conduct a bounded local search, using a very small tol value improved
# performance during tests
# result = optimize.minimize_scalar(f, bounds=(interval[0], interval[1]),
# method='bounded', tol=10e-14)
result = optimize.minimize_scalar(
f, bounds=(interval[0], interval[1]), method="bounded", options={"xatol": 10e-14, "maxiter": 50}
)
if result.fun < best_fun:
best_x = result.x
best_fun = result.fun
curr_iter += 1
# add new bounds to the queue
interval_split = (interval[0] + interval[1]) / 2
bounds_queue.append((interval[0], interval_split))
bounds_queue.append((interval_split, interval[1]))
if best_fun <= goodness_threshold:
return best_x, best_fun
else: # check if grid search finds a better solution
alt_result_x, alt_result_fun = _search_MDE_binary(
control_avg,
control_var,
non_inferiority,
kappa,
proportion_of_total,
current_number_of_units,
z_alpha,
z_power,
return_cost_val=True,
)
return (alt_result_x, alt_result_fun) if alt_result_fun < best_fun else (best_x, best_fun)
def _search_MDE_binary(
control_avg: float,
control_var: float,
non_inferiority: bool,
kappa: float,
proportion_of_total: float,
current_number_of_units: float,
z_alpha: float = None,
z_power: float = None,
return_cost_val=False,
):
candidate_effects = np.linspace(10e-9, 1 - control_avg, num=2000)
for i in range(2):
test = []
for effect in candidate_effects:
test.append(
_find_current_powered_effect(
hypothetical_effect=effect,
control_avg=control_avg,
control_var=control_var,
binary=True,
non_inferiority=non_inferiority,
kappa=kappa,
proportion_of_total=proportion_of_total,
current_number_of_units=current_number_of_units,
z_alpha=z_alpha,
z_power=z_power,
)
)
test = np.array(test)
index = [idx for idx, element in enumerate(test) if element == test.min()]
if len(index) != 1:
index = [index[int(np.ceil(len(index) / 2))]]
if i == 0:
if index[0] == 9999:
return np.inf
lower_effect_bound = 10e-9 if index[0] == 0 else candidate_effects[index[0] - 1]
candidate_effects = np.linspace(lower_effect_bound, candidate_effects[index[0]], num=10000)
index = [idx for idx, element in enumerate(test) if element == test.min()]
return candidate_effects[index[0]], test[index[0]] if return_cost_val else candidate_effects[index[0]]
def _treatment_group_sample_size(
z_alpha: float,
z_power: float,
hypothetical_effect: float,
control_var: float,
treatment_var: float,
kappa: float,
) -> float:
return np.ceil(np.power((z_alpha + z_power) / abs(hypothetical_effect), 2) * (control_var / kappa + treatment_var))
def _find_current_powered_effect(
hypothetical_effect: float,
control_avg: float,
control_var: float,
binary: bool,
non_inferiority: bool,
kappa: float,
proportion_of_total: float,
current_number_of_units: float,
z_power: float = None,
z_alpha: float = None,
) -> float:
treatment_var = _get_hypothetical_treatment_var(
binary_metric=binary,
non_inferiority=non_inferiority,
control_avg=control_avg,
control_var=control_var,
hypothetical_effect=hypothetical_effect,
)
n2 = _treatment_group_sample_size(
z_alpha,
z_power,
hypothetical_effect,
control_var,
treatment_var,
kappa,
)
return np.power(current_number_of_units - ((n2 + n2 * kappa) / proportion_of_total), 2)
def _get_hypothetical_treatment_var(
binary_metric: bool,
non_inferiority: bool,
control_avg: float,
control_var: float,
hypothetical_effect: float,
) -> float:
if binary_metric and not non_inferiority:
# For binary metrics, the variance can be derived from the average. However,
# we do *not* do this for
# non-inferiority tests because for non-inferiority tests, the basic assumption
# is that the
# mean of the control group and treatment group are identical.
return (control_avg + hypothetical_effect) * (1 - (control_avg + hypothetical_effect))
else:
return control_var