Learn practical skills, build real-world projects, and advance your career
Created 3 years ago
######################
## import libraries ##
######################
import pandas as pd
from scipy import stats
import time, sys
import numpy as np
np.random.seed(1081565)
########################################
## print progress bar helper function ##
########################################
def printProgressBar(i,max,postText):
n_bar =5 #size of progress bar
j= i/max
sys.stdout.write('\r')
sys.stdout.write(f"[{'=' * int(n_bar * j):{n_bar}s}] {int(100 * j)}%, {postText}")
sys.stdout.flush()
##############################
## create simulated dataset ##
##############################
def simulate_dataset(n=100, correlated='none', all_null=True, permutation=False, seed=123):
np.random.seed(seed)
if(correlated=='none'):
if(all_null):
mu = np.array(n*[0])
df = pd.DataFrame()
df['ID'] = list(range(1,mu.shape[0]+1))
df['true_effect'] = mu
else:
mu = np.array(int(n/4)*[-5,0,0,0])
df = pd.DataFrame()
df['ID'] = list(range(1,mu.shape[0]+1))
df['true_effect'] = mu
cov_matrix = np.eye(mu.shape[0], k=0)
df['measured_effect'] = np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=1)[0]
if(permutation):
if(all_null==False):
mu_corrected = mu.copy()
cov_matrix_corrected = cov_matrix.copy()
for i in range(mu.shape[0]-1, -1, -1):
if(mu[i]!=0):
mu_corrected = np.delete(mu_corrected, i)
cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 0)
cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 1)
perm_matrix = abs(np.random.multivariate_normal(mean=mu_corrected, cov=cov_matrix_corrected, size=10000))
else:
perm_matrix = abs(np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=10000))
df_perm = pd.DataFrame()
df_perm['test_stat'] = perm_matrix.max(axis=1)
del perm_matrix
del mu, cov_matrix
if(correlated=='positive'):
if(all_null):
mu = np.array(n*[0])
df = pd.DataFrame()
df['ID'] = list(range(1,mu.shape[0]+1))
df['true_effect'] = mu
else:
mu = np.array(int(n/4)*[-5,0,0,0])
df = pd.DataFrame()
df['ID'] = list(range(1,mu.shape[0]+1))
df['true_effect'] = mu
m=int(mu.shape[0]/1.15)
phi = 0.95
phi_matrix = (phi*np.eye(m, k=1)) + (phi*np.eye(m, k=-1))
cov_matrix = np.eye(mu.shape[0], k=0)
cov_matrix[0:phi_matrix.shape[0], 0:phi_matrix.shape[1]] = cov_matrix[0:phi_matrix.shape[0], 0:phi_matrix.shape[1]] + phi_matrix
for i in range(2,cov_matrix.shape[0],2):
cov_matrix[i,i-1]=0
cov_matrix[i-1,i]=0
df['measured_effect'] = np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=1)[0]
if(permutation):
if(all_null==False):
mu_corrected = mu.copy()
cov_matrix_corrected = cov_matrix.copy()
for i in range(mu.shape[0]-1, -1, -1):
if(mu[i]!=0):
mu_corrected = np.delete(mu_corrected, i)
cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 0)
cov_matrix_corrected = np.delete(cov_matrix_corrected, i, 1)
perm_matrix = abs(np.random.multivariate_normal(mean=mu_corrected, cov=cov_matrix_corrected, size=10000))
else:
perm_matrix = abs(np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=10000))
df_perm = pd.DataFrame()
df_perm['test_stat'] = perm_matrix.max(axis=1)
del perm_matrix
del mu, m, cov_matrix, phi, phi_matrix
if(permutation):
return(df, df_perm)
else:
return(df)
###########################################
## Family Wise Error Rate (FWER) Methods ##
###########################################
def FWER_adjustment(FWER_alpha=0.05, method='Bonferoni', all_null=True, correlated='none', trials=1000, n=100):
n = int(n/4)*4
## initialize dataframe to hold final results
df_results = pd.DataFrame()
df_results['trial'] = list(range(0,trials))
df_results['sig_p_value'] = None
## specify individual level alpha based on FWER contol method
if(method=='Bonferoni'):
alpha = FWER_alpha/n
if(method=='Sidak'):
alpha = 1 - ((1-FWER_alpha)**(1/n))
if((method=='Holm–Bonferroni') or (method=='Hochberg')):
alpha = [(FWER_alpha / x) for x in list(range(n, 0, -1))]
if(method=='Holm–Sidak'):
alpha = [(1 - ((1-FWER_alpha)**(1/x))) for x in list(range(n, 0, -1))]
## conduct simulations, and print results
for i in range(0, trials):
printProgressBar(i+1, trials, "")
time.sleep(0.1)
if(method=='Permutation'):
df, df_perm = simulate_dataset(n=n, correlated=correlated, all_null=all_null, permutation=True, seed=i)
df = df.loc[df['true_effect']==0,:].reset_index(drop=True)
test_stat = max(abs(df['measured_effect']))
p_value = df_perm.loc[df_perm['test_stat']>=test_stat].shape[0] / df_perm.shape[0]
if(p_value <= FWER_alpha):
df_results.loc[i, 'sig_p_value'] = 1
else:
df_results.loc[i, 'sig_p_value'] = 0
del df, df_perm, p_value
else:
df = simulate_dataset(n=n, correlated=correlated, all_null=all_null, permutation=False, seed=i)
df['cdf'] = stats.norm.cdf(df['measured_effect'], loc=0, scale=1)
df['1-cdf'] = 1-df['cdf']
df['p-value'] = 2*df[['cdf', '1-cdf']].min(axis=1)
df = df.sort_values(by=['p-value'], ascending=True).reset_index(drop=True)
df['alpha'] = alpha
if(method=='Hochberg'):
df['alpha_p-value_difference'] = df['alpha']-df['p-value']
df = df.loc[df['alpha_p-value_difference']>=0, :].reset_index(drop=True)
df = df.sort_values(by=['alpha_p-value_difference'], ascending=True).reset_index(drop=True)
df = df.loc[df['true_effect']==0,:].reset_index(drop=True)
if((df.shape[0]>0) and (df.loc[0, 'p-value'] <= df.loc[0, 'alpha'])):
df_results.loc[i, 'sig_p_value'] = 1
else:
df_results.loc[i, 'sig_p_value'] = 0
del df
mean = round(df_results['sig_p_value'].sum() / df_results.shape[0], 3)
sd = np.sqrt((mean*(1-mean))/trials)
l_bound = round(mean-(1.96*sd), 3)
u_bound = round(mean+(1.96*sd), 3)
if((all_null==True) and correlated=='none'):
print('FWER via ' + method + ' - all hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
if((all_null==True) and correlated=='positive'):
print('FWER via ' + method + ' - all hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
if((all_null==False) and correlated=='none'):
print('FWER via ' + method + ' - 75% hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
if((all_null==False) and correlated=='positive'):
print('FWER via ' + method + ' - 75% hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
########################################
## False Discovery Rate (FDR) Methods ##
########################################
def FDR_adjustment(FDR_alpha=0.05, method='Benjamini–Hochberg', all_null=True, correlated='none', trials=1000, n=1000):
n = int(n/4)*4
## initialize dataframe to hold final results
df_results = pd.DataFrame()
df_results['trial'] = list(range(0,trials))
df_results['sig_p_value'] = None
## specify individual level alpha based on FDR contol method
if(method=='Benjamini–Hochberg'):
alpha = [((FDR_alpha*x) / n) for x in list(range(1, n+1))]
if(method=='Benjamini–Yekutieli'):
C = sum([(1/x) for x in list(range(1, n+1))])
alpha = [((FDR_alpha*x) / (n*C)) for x in list(range(1, n+1))]
for i in range(0, trials):
printProgressBar(i+1, trials, "")
time.sleep(0.1)
df = simulate_dataset(n=n, correlated=correlated, all_null=all_null, permutation=False, seed=i)
df['cdf'] = stats.norm.cdf(df['measured_effect'], loc=0, scale=1)
df['1-cdf'] = 1-df['cdf']
df['p-value'] = 2*df[['cdf', '1-cdf']].min(axis=1)
df = df.sort_values(by=['p-value'], ascending=True).reset_index(drop=True)
df['alpha'] = alpha
df['alpha_p-value_difference'] = df['alpha']-df['p-value']
df['rank_order'] = list(range(0,df.shape[0]))
max_rank = df.loc[df['alpha_p-value_difference']>=0, 'rank_order'].max()
df = df.loc[df['rank_order']<=max_rank, :].reset_index(drop=True)
if(df.shape[0]==0):
df_results.loc[i, 'sig_p_value'] = 0
else:
df_results.loc[i, 'sig_p_value'] = df.loc[df['true_effect']==0, :].shape[0] / df.shape[0]
del df, max_rank
mean = round(df_results['sig_p_value'].sum() / df_results.shape[0], 3)
sd = sd = np.sqrt(np.var(df_results['sig_p_value']) / trials)
l_bound = round(mean-(1.96*sd), 3)
u_bound = round(mean+(1.96*sd), 3)
if((all_null==True) and correlated=='none'):
print('FDR via ' + method + ' - all hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
if((all_null==True) and correlated=='positive'):
print('FDR via ' + method + ' - all hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
if((all_null==False) and correlated=='none'):
print('FDR via ' + method + ' - 75% hypotheses null with no correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')
if((all_null==False) and correlated=='positive'):
print('FDR via ' + method + ' - 75% hypotheses null with positive correlation: ' + str(mean) + ' , 95% CI (' + str(l_bound) + ' , ' + str(u_bound) + ')')