spotify_confidence/analysis/frequentist/multiple_comparison.py (281 lines of code) (raw):

from _warnings import warn from typing import Iterable, Dict from pandas import DataFrame from statsmodels.stats.multitest import multipletests from spotify_confidence.analysis.confidence_utils import groupbyApplyParallel from spotify_confidence.analysis.constants import ( BONFERRONI, BONFERRONI_ONLY_COUNT_TWOSIDED, PREFERENCE_TEST, TWO_SIDED, HOLM, HOMMEL, SIMES_HOCHBERG, SIDAK, HOLM_SIDAK, FDR_BH, FDR_BY, FDR_TSBH, FDR_TSBKY, BONFERRONI_DO_NOT_COUNT_NON_INFERIORITY, SPOT_1, SPOT_1_HOLM, SPOT_1_HOMMEL, SPOT_1_SIMES_HOCHBERG, SPOT_1_SIDAK, SPOT_1_HOLM_SIDAK, SPOT_1_FDR_BH, SPOT_1_FDR_BY, SPOT_1_FDR_TSBH, SPOT_1_FDR_TSBKY, NIM, NUMBER_OF_COMPARISONS, FINAL_EXPECTED_SAMPLE_SIZE, ORDINAL_GROUP_COLUMN, CORRECTION_METHOD, METHOD, IS_SIGNIFICANT, P_VALUE, ADJUSTED_ALPHA, ADJUSTED_P, ALPHA, INTERVAL_SIZE, ZTEST, ZTESTLINREG, CI_LOWER, CI_UPPER, ADJUSTED_LOWER, ADJUSTED_UPPER, PREFERENCE, ADJUSTED_ALPHA_POWER_SAMPLE_SIZE, CORRECTION_METHODS_THAT_REQUIRE_METRIC_INFO, ADJUSTED_POWER, POWER, ) from spotify_confidence.analysis.frequentist.confidence_computers import confidence_computers def get_num_comparisons( df: DataFrame, correction_method: str, number_of_level_comparisons: int, groupby: Iterable, metric_column: str, treatment_column: str, single_metric: bool, segments: Iterable, ) -> int: if correction_method == BONFERRONI: return max( 1, number_of_level_comparisons * df.assign(_dummy_=1).groupby(groupby + ["_dummy_"], sort=False).ngroups, ) elif correction_method == BONFERRONI_ONLY_COUNT_TWOSIDED: return max( number_of_level_comparisons * df.query(f'{PREFERENCE_TEST} == "{TWO_SIDED}"') .assign(_dummy_=1) .groupby(groupby + ["_dummy_"], sort=False) .ngroups, 1, ) elif correction_method in [ HOLM, HOMMEL, SIMES_HOCHBERG, SIDAK, HOLM_SIDAK, FDR_BH, FDR_BY, FDR_TSBH, FDR_TSBKY, ]: return 1 elif correction_method in [ BONFERRONI_DO_NOT_COUNT_NON_INFERIORITY, SPOT_1, SPOT_1_HOLM, SPOT_1_HOMMEL, SPOT_1_SIMES_HOCHBERG, SPOT_1_SIDAK, SPOT_1_HOLM_SIDAK, SPOT_1_FDR_BH, SPOT_1_FDR_BY, SPOT_1_FDR_TSBH, SPOT_1_FDR_TSBKY, ]: if metric_column is None or treatment_column is None: return max( 1, number_of_level_comparisons * df[df[NIM].isnull()].assign(_dummy_=1).groupby(groupby + ["_dummy_"], sort=False).ngroups, ) else: if single_metric: if df[df[NIM].isnull()].shape[0] > 0: number_success_metrics = 1 else: number_success_metrics = 0 else: number_success_metrics = df[df[NIM].isnull()].groupby(metric_column, sort=False).ngroups number_segments = ( 1 if len(segments) == 0 or not all(item in df.index.names for item in segments) else df.groupby(segments, sort=False).ngroups ) return max(1, number_of_level_comparisons * max(1, number_success_metrics) * number_segments) else: raise ValueError(f"Unsupported correction method: {correction_method}.") def add_adjusted_p_and_is_significant(df: DataFrame, **kwargs: Dict) -> DataFrame: n_comparisons = kwargs[NUMBER_OF_COMPARISONS] if kwargs[FINAL_EXPECTED_SAMPLE_SIZE] is not None: if kwargs[CORRECTION_METHOD] not in [ BONFERRONI, BONFERRONI_ONLY_COUNT_TWOSIDED, BONFERRONI_DO_NOT_COUNT_NON_INFERIORITY, SPOT_1, ]: raise ValueError( f"{kwargs[CORRECTION_METHOD]} not supported for sequential tests. Use one of" f"{BONFERRONI}, {BONFERRONI_ONLY_COUNT_TWOSIDED}, " f"{BONFERRONI_DO_NOT_COUNT_NON_INFERIORITY}, {SPOT_1}" ) groups_except_ordinal = [ column for column in df.index.names if kwargs[ORDINAL_GROUP_COLUMN] is not None and column is not None and (column != kwargs[ORDINAL_GROUP_COLUMN] or kwargs[FINAL_EXPECTED_SAMPLE_SIZE] is None) ] df = groupbyApplyParallel( df.groupby(groups_except_ordinal + [kwargs[METHOD], "level_1", "level_2"], as_index=False, sort=False), lambda df: compute_sequential_adjusted_alpha(df, **kwargs), ) elif kwargs[CORRECTION_METHOD] in [ HOLM, HOMMEL, SIMES_HOCHBERG, SIDAK, HOLM_SIDAK, FDR_BH, FDR_BY, FDR_TSBH, FDR_TSBKY, SPOT_1_HOLM, SPOT_1_HOMMEL, SPOT_1_SIMES_HOCHBERG, SPOT_1_SIDAK, SPOT_1_HOLM_SIDAK, SPOT_1_FDR_BH, SPOT_1_FDR_BY, SPOT_1_FDR_TSBH, SPOT_1_FDR_TSBKY, ]: if kwargs[CORRECTION_METHOD].startswith("spot-"): correction_method = kwargs[CORRECTION_METHOD][7:] else: correction_method = kwargs[CORRECTION_METHOD] df[ADJUSTED_ALPHA] = df[ALPHA] / n_comparisons is_significant, adjusted_p, _, _ = multipletests( pvals=df[P_VALUE], alpha=1 - kwargs[INTERVAL_SIZE], method=correction_method ) df[ADJUSTED_P] = adjusted_p df[IS_SIGNIFICANT] = is_significant elif kwargs[CORRECTION_METHOD] in [ BONFERRONI, BONFERRONI_ONLY_COUNT_TWOSIDED, BONFERRONI_DO_NOT_COUNT_NON_INFERIORITY, SPOT_1, ]: df[ADJUSTED_ALPHA] = df[ALPHA] / n_comparisons df[ADJUSTED_P] = df[P_VALUE].map(lambda p: min(p * n_comparisons, 1)) df[IS_SIGNIFICANT] = df[P_VALUE] < df[ADJUSTED_ALPHA] else: raise ValueError("Can't figure out which correction method to use :(") return df def compute_sequential_adjusted_alpha(df: DataFrame, **kwargs: Dict) -> DataFrame: if df[kwargs[METHOD]].isin([ZTEST, ZTESTLINREG]).all(): adjusted_alpha = confidence_computers[ZTEST].compute_sequential_adjusted_alpha(df, **kwargs) df = df.merge(adjusted_alpha, left_index=True, right_index=True) df[IS_SIGNIFICANT] = df[P_VALUE] < df[ADJUSTED_ALPHA] df[P_VALUE] = None df[ADJUSTED_P] = None return df else: raise NotImplementedError("Sequential testing is only supported for z-test and z-testlinreg") def add_ci(df: DataFrame, **kwargs: Dict) -> DataFrame: lower, upper = confidence_computers[df[kwargs[METHOD]].values[0]].ci(df, ALPHA, **kwargs) if kwargs[CORRECTION_METHOD] in [ HOLM, HOMMEL, SIMES_HOCHBERG, SPOT_1_HOLM, SPOT_1_HOMMEL, SPOT_1_SIMES_HOCHBERG, ] and all(df[PREFERENCE_TEST] != TWO_SIDED): if all(df[kwargs[METHOD]] == "z-test"): adjusted_lower, adjusted_upper = confidence_computers["z-test"].ci_for_multiple_comparison_methods( df, kwargs[CORRECTION_METHOD], alpha=1 - kwargs[INTERVAL_SIZE] ) else: raise NotImplementedError(f"{kwargs[CORRECTION_METHOD]} is only supported for ZTests") elif kwargs[CORRECTION_METHOD] in [ BONFERRONI, BONFERRONI_ONLY_COUNT_TWOSIDED, BONFERRONI_DO_NOT_COUNT_NON_INFERIORITY, SPOT_1, SPOT_1_HOLM, SPOT_1_HOMMEL, SPOT_1_SIMES_HOCHBERG, SPOT_1_SIDAK, SPOT_1_HOLM_SIDAK, SPOT_1_FDR_BH, SPOT_1_FDR_BY, SPOT_1_FDR_TSBH, SPOT_1_FDR_TSBKY, ]: adjusted_lower, adjusted_upper = confidence_computers[df[kwargs[METHOD]].values[0]].ci( df, ADJUSTED_ALPHA, **kwargs ) else: warn(f"Confidence intervals not supported for {kwargs[CORRECTION_METHOD]}") adjusted_lower = None adjusted_upper = None return ( df.assign(**{CI_LOWER: lower}) .assign(**{CI_UPPER: upper}) .assign(**{ADJUSTED_LOWER: adjusted_lower}) .assign(**{ADJUSTED_UPPER: adjusted_upper}) ) def set_alpha_and_adjust_preference(df: DataFrame, **kwargs: Dict) -> DataFrame: alpha_0 = 1 - kwargs[INTERVAL_SIZE] return df.assign( **{ ALPHA: df.apply( lambda row: ( 2 * alpha_0 if kwargs[CORRECTION_METHOD] == SPOT_1 and row[PREFERENCE] != TWO_SIDED else alpha_0 ), axis=1, ) } ).assign(**{ADJUSTED_ALPHA_POWER_SAMPLE_SIZE: lambda df: df[ALPHA] / kwargs[NUMBER_OF_COMPARISONS]}) def get_preference(df: DataFrame, correction_method: str): return TWO_SIDED if correction_method == SPOT_1 else df[PREFERENCE] def add_adjusted_power(df: DataFrame, correction_method: str, metric_column: str, single_metric: bool) -> DataFrame: if correction_method in CORRECTION_METHODS_THAT_REQUIRE_METRIC_INFO: if metric_column is None: return df.assign(**{ADJUSTED_POWER: None}) else: number_total_metrics = 1 if single_metric else df.groupby(metric_column, sort=False).ngroups if single_metric: if df[df[NIM].isnull()].shape[0] > 0: number_success_metrics = 1 else: number_success_metrics = 0 else: number_success_metrics = df[df[NIM].isnull()].groupby(metric_column, sort=False).ngroups number_guardrail_metrics = number_total_metrics - number_success_metrics power_correction = ( number_guardrail_metrics if number_success_metrics == 0 else number_guardrail_metrics + 1 ) return df.assign(**{ADJUSTED_POWER: 1 - (1 - df[POWER]) / power_correction}) else: return df.assign(**{ADJUSTED_POWER: df[POWER]})