Learn practical skills, build real-world projects, and advance your career
######################
## import libraries ##
######################
import pandas as pd
from scipy import stats
import time, sys
import numpy as np
np.random.seed(1081565)
########################################
## print progress bar helper function ##
########################################
def printProgressBar(i,max,postText):
    n_bar =5 #size of progress bar
    j= i/max
    sys.stdout.write('\r')
    sys.stdout.write(f"[{'=' * int(n_bar * j):{n_bar}s}] {int(100 * j)}%,  {postText}")
    sys.stdout.flush()
##############################
## create simulated dataset ##
##############################
def simulate_dataset(n=100, correlated='none', all_null=True, permutation=False, seed=123):
    np.random.seed(seed)
    if(correlated=='none'):
        if(all_null):       
            mu = np.array(n*[0])
            df = pd.DataFrame()
            df['ID'] = list(range(1,mu.shape[0]+1))
            df['true_effect'] = mu
        else:
            mu = np.array(int(n/4)*[-5,0,0,0])
            df = pd.DataFrame()
            df['ID'] = list(range(1,mu.shape[0]+1))  
            df['true_effect'] = mu
        cov_matrix = np.eye(mu.shape[0], k=0)
        df['measured_effect'] = np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=1)[0]
        if(permutation):
            if(all_null==False):
                mu_corrected = mu.copy()
                cov_matrix_corrected = cov_matrix.copy()
                for i in range(mu.shape[0]-1, -1, -1):
                    if(mu[i]!=0):
                        mu_corrected = np.delete(mu_corrected, i)
                        cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 0)
                        cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 1)
                perm_matrix = abs(np.random.multivariate_normal(mean=mu_corrected, cov=cov_matrix_corrected, size=10000))
            else:
                perm_matrix = abs(np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=10000))
            df_perm = pd.DataFrame()
            df_perm['test_stat'] = perm_matrix.max(axis=1)
            del perm_matrix
            
        del mu, cov_matrix
    
    if(correlated=='positive'):
        if(all_null):       
            mu = np.array(n*[0])
            df = pd.DataFrame()
            df['ID'] = list(range(1,mu.shape[0]+1))
            df['true_effect'] = mu
        else:
            mu = np.array(int(n/4)*[-5,0,0,0])
            df = pd.DataFrame()
            df['ID'] = list(range(1,mu.shape[0]+1))  
            df['true_effect'] = mu
        m=int(mu.shape[0]/1.15)
        phi = 0.95
        phi_matrix = (phi*np.eye(m, k=1)) + (phi*np.eye(m, k=-1))
        cov_matrix = np.eye(mu.shape[0], k=0)
        cov_matrix[0:phi_matrix.shape[0], 0:phi_matrix.shape[1]] = cov_matrix[0:phi_matrix.shape[0], 0:phi_matrix.shape[1]] + phi_matrix
        for i in range(2,cov_matrix.shape[0],2):
            cov_matrix[i,i-1]=0
            cov_matrix[i-1,i]=0
        df['measured_effect'] = np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=1)[0]
        
        if(permutation):
            if(all_null==False):
                mu_corrected = mu.copy()
                cov_matrix_corrected = cov_matrix.copy()
                for i in range(mu.shape[0]-1, -1, -1):
                    if(mu[i]!=0):
                        mu_corrected = np.delete(mu_corrected, i)
                        cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 0)
                        cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 1)
                perm_matrix = abs(np.random.multivariate_normal(mean=mu_corrected, cov=cov_matrix_corrected, size=10000))
            else:
                perm_matrix = abs(np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=10000))
            df_perm = pd.DataFrame()
            df_perm['test_stat'] = perm_matrix.max(axis=1)
            del perm_matrix
        
        del mu, m, cov_matrix, phi, phi_matrix

    if(permutation):
        return(df, df_perm)
    else:
        return(df)
###########################################
## Family Wise Error Rate (FWER) Methods ##
###########################################
def FWER_adjustment(FWER_alpha=0.05, method='Bonferoni', all_null=True, correlated='none', trials=1000, n=100):
    
    n = int(n/4)*4
    ## initialize dataframe to hold final results 
    df_results = pd.DataFrame()
    df_results['trial'] = list(range(0,trials))
    df_results['sig_p_value'] = None
    
    ## specify individual level alpha based on FWER contol method
    if(method=='Bonferoni'):   
        alpha = FWER_alpha/n
    if(method=='Sidak'):   
        alpha = 1 - ((1-FWER_alpha)**(1/n))
    if((method=='Holm–Bonferroni') or (method=='Hochberg')):
        alpha = [(FWER_alpha / x) for x in list(range(n, 0, -1))]
    if(method=='Holm–Sidak'):
        alpha = [(1 - ((1-FWER_alpha)**(1/x))) for x in list(range(n, 0, -1))]
        
    ## conduct simulations, and print results
    for i in range(0, trials):
        printProgressBar(i+1, trials, "")
        time.sleep(0.1) 
        
        if(method=='Permutation'):
            df, df_perm = simulate_dataset(n=n, correlated=correlated, all_null=all_null, permutation=True, seed=i)
            df = df.loc[df['true_effect']==0,:].reset_index(drop=True)
            test_stat = max(abs(df['measured_effect']))
            p_value = df_perm.loc[df_perm['test_stat']>=test_stat].shape[0] / df_perm.shape[0]
            if(p_value <= FWER_alpha):
                df_results.loc[i, 'sig_p_value'] = 1
            else:
                df_results.loc[i, 'sig_p_value'] = 0
            del df, df_perm, p_value
        else:
            df = simulate_dataset(n=n, correlated=correlated, all_null=all_null, permutation=False, seed=i)
            df['cdf'] = stats.norm.cdf(df['measured_effect'], loc=0, scale=1)
            df['1-cdf'] = 1-df['cdf']
            df['p-value'] = 2*df[['cdf', '1-cdf']].min(axis=1)
            df = df.sort_values(by=['p-value'], ascending=True).reset_index(drop=True)
            df['alpha'] = alpha
            if(method=='Hochberg'):
                df['alpha_p-value_difference'] = df['alpha']-df['p-value']
                df = df.loc[df['alpha_p-value_difference']>=0, :].reset_index(drop=True)
                df = df.sort_values(by=['alpha_p-value_difference'], ascending=True).reset_index(drop=True)                       
            df = df.loc[df['true_effect']==0,:].reset_index(drop=True)
            if((df.shape[0]>0) and (df.loc[0, 'p-value'] <= df.loc[0, 'alpha'])):
                df_results.loc[i, 'sig_p_value'] = 1
            else:
                df_results.loc[i, 'sig_p_value'] = 0
            del df
    
    mean = round(df_results['sig_p_value'].sum() / df_results.shape[0], 3)
    sd = np.sqrt((mean*(1-mean))/trials)
    l_bound = round(mean-(1.96*sd), 3)
    u_bound = round(mean+(1.96*sd), 3)
    
    if((all_null==True) and  correlated=='none'):
        print('FWER via ' + method + ' - all hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
    if((all_null==True) and  correlated=='positive'):
        print('FWER via ' + method + ' - all hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
    if((all_null==False) and  correlated=='none'):
        print('FWER via ' + method + ' - 75% hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
    if((all_null==False) and  correlated=='positive'):
        print('FWER via ' + method + ' - 75% hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
########################################
## False Discovery Rate (FDR) Methods ##
########################################
def FDR_adjustment(FDR_alpha=0.05, method='Benjamini–Hochberg', all_null=True, correlated='none', trials=1000, n=1000):
    
    n = int(n/4)*4
    ## initialize dataframe to hold final results 
    df_results = pd.DataFrame()
    df_results['trial'] = list(range(0,trials))
    df_results['sig_p_value'] = None
    
    ## specify individual level alpha based on FDR contol method
    if(method=='Benjamini–Hochberg'):
        alpha = [((FDR_alpha*x) / n) for x in list(range(1, n+1))]
    if(method=='Benjamini–Yekutieli'):
        C = sum([(1/x) for x in list(range(1, n+1))])
        alpha = [((FDR_alpha*x) / (n*C)) for x in list(range(1, n+1))]
        
    for i in range(0, trials):
        printProgressBar(i+1, trials, "")
        time.sleep(0.1)     
        
        df = simulate_dataset(n=n, correlated=correlated, all_null=all_null, permutation=False, seed=i)
        df['cdf'] = stats.norm.cdf(df['measured_effect'], loc=0, scale=1)
        df['1-cdf'] = 1-df['cdf']
        df['p-value'] = 2*df[['cdf', '1-cdf']].min(axis=1)
        df = df.sort_values(by=['p-value'], ascending=True).reset_index(drop=True)
        df['alpha'] = alpha
        df['alpha_p-value_difference'] = df['alpha']-df['p-value']
        df['rank_order'] = list(range(0,df.shape[0]))
        max_rank = df.loc[df['alpha_p-value_difference']>=0, 'rank_order'].max()
        df = df.loc[df['rank_order']<=max_rank, :].reset_index(drop=True)
        if(df.shape[0]==0):
            df_results.loc[i, 'sig_p_value'] = 0
        else:
            df_results.loc[i, 'sig_p_value'] = df.loc[df['true_effect']==0, :].shape[0] / df.shape[0]
        del df, max_rank
        
    mean = round(df_results['sig_p_value'].sum() / df_results.shape[0], 3)
    sd = sd = np.sqrt(np.var(df_results['sig_p_value']) / trials)
    l_bound = round(mean-(1.96*sd), 3)
    u_bound = round(mean+(1.96*sd), 3)
    
    if((all_null==True) and  correlated=='none'):
        print('FDR via ' + method + ' - all hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
    if((all_null==True) and  correlated=='positive'):
        print('FDR via ' + method + ' - all hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
    if((all_null==False) and  correlated=='none'):
        print('FDR via ' + method + ' - 75% hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
    if((all_null==False) and  correlated=='positive'):
        print('FDR via ' + method + ' - 75% hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')